消息中间件之Kafka(二)

1.Kafka线上常见问题

1.1 为什么要对topic下数据进行分区存储?

  • 1.commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,
    相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据
  • 2.提高并行度

1.2 如何在多个partition中保证顺序消费?

  • 方案一:首先将需要保证顺序的消息收集起来,然后交给一个consumer去进行处理,然后内部维护一个线程池,让其中某一个线程去顺序执行这些消息eg:用户下单流程,支付成功消息 -> 库存消息
  • 方案二:让多个消息构造一个特殊结构的顺序消息,当consumer收到时,在一个线程中依次进行消费

1.3 消息丢失

在这里插入图片描述
以上4个步骤都有可能会造成消息丢失
Producer

  • acks=0,表示producer不需要等待任何broker确认收到消息的回复,就可以发送下一条消息,性能最高,但是最容易丢消息,大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种
  • acks=1,表示至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入,就可以继续发送,下一条消息,这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
  • ack=-1或者all,这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,
    这种策略会保证只要由一个备份存活就不会丢失数据,这是最强的数据保证,一般除非是金融级别,或跟钱打交道的场景才会使用这种配置,当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似

Consumer

  • 如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了

1.4 消费重复

Producer

  • 发送消息如果配置了重试机制,比如网络抖动事件过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
    Consumer
    如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,
    下次重启又会拉取相同的一批数据重复处理,一般消费端都是要做消息幂等处理的

1.5 消息乱序

  • 如果发送端配置了重试机制,Kafka不会等之前那条消息完全成功了才去发送下一条消息,这样就可能出现
    发送了1,2,3条2消息,第一条超时了,后面两条发送成功,再重试发送第一条消息,这时消息在broker端的顺序就是2,3,1了
    所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式取发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序
    kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消息发到内存队列(可以多搞几个),一个内存队列开启一个线程顺序消费处理
  • 一个parition同一时刻在一个consumer group中只能有一个consumer实例在消费
    ,从而保证消费顺序。consumer group中的consumer数量不能比一个topic中的partion数量还要多,否则多出来的consumer消费不到消息。Kafka只在parition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费性如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用

1.6 消息积压

  • 1.线上有时因为发送方发送消息速度过快,或者消费放处理消息过慢,可能会导致broker挤压大量未消费消息,此种情况如果挤压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到地消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题地不同分区
  • 2.由于消息数据格式变动或者消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息.此种情况可以将这些消费不成功地消息转发到其他队列里去(类似私信队列),后面再慢慢分析死信队列里地消息处理问题

1.7 延时队列

  • 延时队列存储的对象是延时消息,所谓的"延时消息"是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多。比如:
    1.在订单系统中,一个用户下单之后通常有30分钟的时候进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了
    2.订单完成1小时后通知用户进行评价
  • 实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s、topic_5s、topic_10s…2h)这个一般不能支持任意时间段的延时),然后通过定时器进行轮询这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了

1.8 消息回溯

  • 如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序buf修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetForTimes、seek等方法指定从某个offset偏移量的消息开始消费

1.9 分区数越多吞吐量越高吗

  • 可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量
    往test里发送一百万消息,每条设置1kb
    throughout用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时,当发送的吞吐量大于该值时就会被
    阻塞一段时间
    bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughout -1
    –producer-props bootstrap.servers=192.168.65.60:9092 acks=1

  • 网络上很多资料都说分区数越多吞吐量很高,但从压测结果来看,分区数到达某个值,吞吐量反而开始下降
    实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同,
    一般情况下跟集群机器数量相当就差不多了,类似于Redis集群数量不能超过1000个,当超过1000个时,
    整体的网络心跳时间将会边长,还会导致心跳的数据报过大,比较容易产生网络分区
    当然吞吐量的数值和走势还会和磁盘、文件系统、IO调度策略等因素有关
    注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台可能会报错
    “java.io.IOException: Too many open files"异常中最关键的信息是"Too many open files”,
    这是一种常见的Linux系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建socket、
    打开文件这些场景下,在Linux系统的默认设置下,这个文件描述符的个数不是很多,通过ulimit -n命令可以查看一般默认是1024,可以将该值调大比如 ulimit -n 65535

1.10 消息传递保障

  • at most once(消费者最多收到一次消息, 0-1次) acks = 0可以实现
    at least once(消费者至少收到一次消息,1-多次) acks = all可以实现
    exactly once(消费者刚好收到一次消息): at least once加上消费者幂等可以实现,还可以用kafka生产者的幂等性
    来实现

  • kafka生产者的幂等性:
    因为发送端充值导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息指接收一次,只需要在生产者加上参数props.put(“enable.idempotence”,true)即可,默认是false不开启
    具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查,PID和Sequence Number,如果相同不会再接收

1.11 Kafka的事务

  • Kafka的事务不同于RocketMQ,RocketMQ是保障本地事务(比如数据库)与MQ消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)
    一般在Kafka的流式计算场景用得多一点,比如,kafka需要对于给topic里的消息做不同的流式计算处理处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如HBase、Redis、ES等)
    这种我们肯定希望系统发送到多个topic的数据保持事务一致性

1.12 Kafka高性能的原因

  • kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是
    追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘的顺序写,反观MySQL却是会经常
    需要产生随机IO
  • 数据传输的零拷贝
    在这里插入图片描述
    网上很多人说sendfile是直接从内核读取缓冲区拷贝到网卡接口里面,也有人说拷贝到socket缓冲区当中,我们通过mand systemcalls sendfile会发现,在Linux 内核2.6.33之前是拷贝到socket缓冲区当中,之后的版本是直接拷贝到了网卡接口
    在这里插入图片描述
  • 读写数据的批量batch处理以及压缩传输

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/336367.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

初识计算机图形学

初识计算机图形学 笔记来源:【老奇】阴差阳错 撼动世界的游戏引擎 1.MVP Transformation 详见本人博客: 1.Transformation 2.梳理从MVP变换到光栅化的过程 MVP变换将空间中3D物体投影到2D屏幕 2.Rasterization 详见本人博客: 1.Rasteri…

教您如何下载保存钉钉视频到电脑本地

教您如何下载保存钉钉视频到电脑和手机相册地瓜网络技术 大家好,我们这边是地瓜网络技术!30秒教会你下载钉钉视频!现在很多群管理员把视频设置为禁止下载,导致我们无法正常的下载群直播回放视频, 今天我们就教大家如何…

【核心复现】基于改进鲸鱼优化算法的微网系统能量优化管理matlab

目录 一、主要内容 1 冷热电联供型微网系统 2 长短期记忆网络(Long Short Term Memory, LSTM) 3 改进鲸鱼优化算法 二、部分代码 三、运行结果 四、下载链接 一、主要内容 该程序为《基于改进鲸鱼优化算法的微网系统能量优化管理》matlab代码,主要内容如下&…

易优demo网站测试结果

易优demo网站测试结果-06 1、信息收集 网站账号:admin 密码:Aa123456 2、存在的漏洞 2.1 后台弱口令漏洞 http://eyoucms-s347fqn.gxalabs.com/login.php?sAdmin/login网站账号:admin 密码:Aa123456 成功登陆 2.2 代码远程…

51单片机8*8点阵屏

8*8点阵屏 8*8点阵屏是一种LED显示屏,它由8行和8列的LED灯组成。每个LED灯的开闭状态都可以独立控制,从而可以显示出数字、字母、符号、图形等信息。 8*8点阵屏的原理是通过行列扫描的方式,控制LED灯的亮灭,从而显示出所需的图案或…

蓝桥杯-dfs(一)

📑前言 本文主要是【算法】——dfs使用的文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是听风与他🥇 ☁️博客首页:CSDN主页听风与他 🌄每日一句&#xff1…

【Linux上创建一个LVM卷组,将多个物理卷添加到卷组中使用】

Linux上创建一个LVM卷组,将多个物理卷添加到卷组中使用 目录1.列出当前系统中所有的块设备信息,包括磁盘、分区、逻辑卷等2.对磁盘进行分区操作3.创建了一个名为 vg_data 的卷组4.将物理卷添加到已经存在的卷组5.在卷组中创建一个逻辑卷6.查看已创建的 L…

一个开源的文档管理系统Paperless-ngx私有化部署教程

目录 简介 2.安装 2.2解压 2.3启动 3.使用 3.1文档管理 3.2搜索 3.3文档预览 4.源码等地址 简介 Paperless-ngx:一个开源的文档管理系统,可以将你的物理文档转换成可搜索的在线档案,从而减少纸张的使用。 它内置了OCR功能&#xf…

Idea 开发环境不断切换git代码分支导致冲掉别人代码

问题分析 使用git reflog查看执行命令,以下是发生事故的切换和提交动作 46f72622e1 HEAD{41}: commit: feat: 【Sales - 6.3】小程序端不登录也可以录入客户线索 c5e7d9f6e1 HEAD{42}: fetch origin feature/20240102_Sales6.3_xingang:feature/20240102_Sales6.3…

OJ常用函数/机试常用STL模板

目录 机试涉及到的算法一、字符串二、vector二、map三、set四、queue五、并查集五、cmath六、读入数据6.1 示例16.2 示例26.3 示例36.4 示例46.5 示例56.6 示例66.7 示例76.8 示例86.9 示例96.10 示例106.11 示例11 七、输入输出八、排序九、数学相关十、大数的表示十一、IDE 机…

解决git错误:error: failed to push some refs to ‘git xxx xxxx‘

目录 第一章、问题分析1.1)报错提示1.2)报错分析 第二章、解决方式2.1)方式1:直接pull2.2)方式2:直接pull2.3)方式三 友情提醒: 先看文章目录,大致了解文章知识点结构,点…

Android Matrix绘制PaintDrawable设置BitmapShader,手指触点为圆心scale放大原图,Kotlin(二)

Android Matrix绘制PaintDrawable设置BitmapShader,手指触点为圆心scale放大原图,Kotlin(二) 在 Android Matrix绘制PaintDrawable设置BitmapShader,手指触点为圆心scale放大原图,Kotlin-CSDN博客 基础上&…

ZYNQ 调用AXI WR RD ip及其代码

首先调用ip 值得注意的是:zynq支持axi4.0 ,但是创建的ip是属于axi3.0,其区别主要是在数据位宽以及突发长度的区别。 下面附读写控制模块(稍作修改就可使用,数据位宽是64bit 突发长度是256): as…

C语言从入门到实战——编译和链接

编译和链接 前言一、 翻译环境和运行环境二、 翻译环境2.1 预处理(预编译)2.2 编译2.2.1 词法分析2.2.2 语法分析2.2.3 语义分析 2.3 汇编2.4 链接 三、 运行环境 前言 在C语言中,编译和链接是将源代码转换为可执行文件的两个主要步骤。 编…

【Linux】信号量基于环形队列的生产消费模型

信号量 信号量的本质是一个计数器,可以用来衡量临界资源中资源数量多少 信号量的PV操作 P操作:申请信号量称为P操作,P操作的本质就是让计数器减1。 V操作:释放信号量称为V操作,V操作的本质就是让计数器加1 POSIX信号量…

phpmyadmin 创建服务器

phpmyadmin默认的服务器是localhost 访问setup,创建新的服务器 添加服务器信息 点击应用,服务器创建成功 下载配置文件config.inc.php,放到WWW目录下 可再次访问setup,发现已配置过了 访问登录页面,发现可选…

x-cmd pkg | yt-dlp - 专注于 YouTube 的下载工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 yt-dlp 是一款强大的命令行下载工具,专注于下载 YouTube 视频和音频。它是 youtube-dl 的一个改进和拓展版本,提供了更多功能和修复了一些问题。 yt-dlp 具有灵活的支持,可下载 Yo…

用Jmeter进行性能测试

项目背景 我们的平台为全国某行业监控平台,经过3轮功能测试、接口测试后,98%的问题已经关闭,决定对省平台向全国平台上传数据的接口进行性能测试。01测试步骤1、编写性能测试方案 由于我是刚进入此项目组不久,只参与了其中3个模块…

HTTP 协议和 TCP/IP 协议之间有什么区别?

HTTP(超文本传输协议)和TCP/IP(传输控制协议/互联网协议)是两种在互联网通信中广泛使用的协议,它们之间的区别和联系对许多人来说可能还不是很清晰,今天我们就带大家来一起了解一下HTTP和TCP/IP协议这2者之…

用C语言实现简单的三子棋游戏

目录 1 -> 模块简介 2 -> test.c 3 -> game.c 4 -> game.h 1 -> 模块简介 test.c:测试游戏逻辑 game.c: 函数的实现 game.h:函数的声明 2 -> test.c #define _CRT_SECURE_NO_WARNINGS 1#include "game.h";void menu() {printf("****…