Kafka生产问题总结及性能优化实践

1、消息丢失情况

消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

2、消息重复消费

消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。

消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。

3、消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

4、消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

5、延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。

实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

6、消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

7、分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量

# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka‐producer‐perf‐test.sh ‐‐topic test ‐‐num‐records 1000000 ‐‐record‐size 1024 ‐‐throughput ‐1
‐‐producer‐props bootstrap.servers=192.168.65.60:9092 acks=1

在这里插入图片描述
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

8、消息传递保障

at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。
exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实
现。

kafka生产者的幂等性::因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和
Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

9、kafka的事务

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
kafka的事务处理可以参考官方文档:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事务
producer.initTransactions();

try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis‐topic", Integer.toString(i), Integer.toString(i)));
}
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滚事务
producer.abortTransaction();
}
producer.close();

10、kafka高性能的原因

磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
数据传输的零拷贝。
读写数据的批量batch处理以及压缩传输。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/237408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML常用表单元素使用?

目录 一、常用表单元素使用的关键字二、常用表单元素使用的效果与作用&#xff08;1&#xff09;password : 保护用户的隐私(2) email: 输入邮件&#xff08;比如QQ邮件&#xff09;(3)、number : 输入框只能输入数字&#xff08;4&#xff09;、tel : 常用于输入电话号&#x…

一、微前端目标、前端架构的前生今世、微前端架构优势和劣势、软件设计原则与分层

1、目标 2、前端架构的前世今生 ① 初始&#xff1a;无架构&#xff0c;前端代码内嵌到后端应用中 ② 后端 MVC 架构&#xff1a;将视图层、数据层、控制层做分离 缺点&#xff1a;重度依赖开发环境&#xff0c;代码混淆严重&#xff08;在调试时&#xff0c;需要启动后端所有…

Flink之迟到的数据

迟到数据的处理 推迟水位线推进: WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))设置窗口延迟关闭&#xff1a;.allowedLateness(Time.seconds(3))使用侧流接收迟到的数据: .sideOutputLateData(lateData) public class Flink12_LateDataC…

前端框架(Front-end Framework)和库(Library)的区别

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

WPF实现更加灵活绑定复杂Command(使用Microsoft XAML Behaviors 库)

1、安装NuGet 2、在XAML的命名空间引入&#xff1a; xmlns:i"http://schemas.microsoft.com/xaml/behaviors" 3、使用&#xff1a; <Canvas Background"Aqua"><Rectangle Stroke"Red" Width"{Binding RectModel.RectangleWidth}…

Docker基础概念解析:镜像、容器、仓库

当谈到容器化技术时&#xff0c;Docker往往是第一个被提及的工具。Docker的基础概念涵盖了镜像、容器和仓库&#xff0c;它们是理解和使用Docker的关键要素。在这篇文章中&#xff0c;将深入探讨这些概念&#xff0c;并提供更丰富的示例代码&#xff0c;帮助大家更好地理解和应…

【AI绘图】 学习 prompt 画图,收集网站

文章目录 在线画图网站Prompt模型下载AI 工具箱 在线画图网站 【强推】搜图&#xff1f;也可以在线画图&#xff0c;质量很高&#xff01;&#xff1a;https://lexica.art/ Lexica 是一个搜索 AI 生成图片的网站&#xff0c;可以根据图片本身关联性或描述文本&#xff08;prom…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含python、JS工程源码)+数据集+模型(五)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理2. 创建模型并编译3. 模型训练及保存4. 上传结果5. 小程序开发1&#xff09;查询图片2&#xff09;查询识别结…

计算机网络——期末考试复习资料

什么是计算机网络 将地理位置不同的具有独立功能的多台计算机及其外部设备通过通信线路和通信设备连接起来&#xff1b;实现资源共享和数据传递的计算机的系统。 三种交换方式 报文交换&#xff1a;路由器转发报文&#xff1b; 电路交换&#xff1a;建立一对一电路 分组交换&a…

大数据驱动下的人口普查:新时代下的新变革

人口普查数据大屏&#xff0c;是指一种通过大屏幕显示人口普查数据的设备&#xff0c;可以将人口普查数据以可视化的形式呈现出来&#xff0c;为决策者提供直观、准确的人口数据。这种大屏幕的出现&#xff0c;让人口普查数据的利用变得更加高效、便捷。 如果您需要制作一张直观…

手写VUE后台管理系统10 - 封装Axios实现异常统一处理

目录 前后端交互约定安装创建Axios实例拦截器封装请求方法业务异常处理 axios 是一个易用、简洁且高效的http库 axios 中文文档&#xff1a;http://www.axios-js.com/zh-cn/docs/ 前后端交互约定 在本项目中&#xff0c;前后端交互统一使用 application/json;charsetUTF-8 的请…

appium安卓app自动化,遇到搜索框无搜索按钮元素时无法搜索的解决方案

如XX头条&#xff0c;搜索框后面有“搜索”按钮&#xff0c;这样实现搜索操作较为方便。 但有些app没有设置该搜索按钮&#xff0c;初学者就要花点时间去学习怎么实现该功能了&#xff0c;如下图。 这时候如果定位搜索框&#xff0c;再点击操作&#xff0c;再输入文本后&#x…

【QT入门】基础知识

一.认识Qt qt是一套应用程序开发库&#xff0c;与MFC不同是跨平台的开发类库&#xff0c;主要用来开发图形界面。完全面向对象容易扩展。 优点&#xff1a;1.封装性强&#xff0c;简单易学 2.跨平台 3.独立编译为本地代码 二.qt工程 1.常见的工程文件有这两种…

2024 年 SEO 现状

搜索引擎优化&#xff08;SEO&#xff09;一直以来都是网络知名度和成功的基石。随着我们踏上 2024 年的征程&#xff0c;SEO领域正在经历重大变革&#xff0c;有些变革已经开始&#xff0c;这对企业、创作者和营销人员来说既是挑战也是机遇。 语音搜索 语音搜索曾是一个未来…

HeartBeat监控Mysql状态

目录 一、概述 二、 安装部署 三、配置 四、启动服务 五、查看数据 一、概述 使用heartbeat可以实现在kibana界面对 Mysql 服务存活状态进行观察&#xff0c;如有必要&#xff0c;也可在服务宕机后立即向相关人员发送邮件通知 二、 安装部署 参照章节&#xff1a;监控组件…

【小白专用】MySQL查询数据库所有表名及表结构其注释

一、先了解下INFORMATION_SCHEMA 1、在MySQL中&#xff0c;把INFORMATION_SCHEMA看作是一个数据库&#xff0c;确切说是信息数据库。其中保存着关于MySQL服务器所维护的所有其他数据库的信息。如数据库名&#xff0c;数据库的表&#xff0c;表栏的数据类型与访问权 限等。在INF…

HarmonyOS编译开源native库(OpenSSL实例)

前言 近期项目要开始做鸿蒙版本&#xff0c;有一部分依赖native的代码也需要迁移&#xff0c;某个native模块依赖openssl&#xff0c;需要在鸿蒙下重新编译openssl才行。一开始找了很多相关文档都没有得到方法&#xff0c;无奈只能自己凭经验慢慢试&#xff0c;最后还是成功了…

.net 安装Postgresql驱动程序ngpsql

.net 安装Postgresql驱动程序ngpsql 最近搞一个物联网项目&#xff0c;需要采集fanuc数控机床的数据&#xff0c;厂家提供的API只支持windows&#xff0c;所以就决定C#开发&#xff0c;数据库用postgresql&#xff0c; 安装数据库驱动一波三折。 作为一个讨厌微软的老程序猿&…

Postman高级应用——变量、流程控制、调试、公共函数、外部数据文件

Postman 提供了四种类型的变量 环境变量&#xff08;Environment Variable&#xff09; 不同的环境&#xff0c;使用不同的环境变量&#xff0c;例如&#xff1a;测试过程中经常会用到 测试环境&#xff0c;外网环境等 全局变量&#xff08;Global Variable&#xff09; 所有的…

使用PyTorch II的新特性加快LLM推理速度

Pytorch团队提出了一种纯粹通过PyTorch新特性在的自下而上的优化LLM方法&#xff0c;包括: Torch.compile: PyTorch模型的编译器 GPU量化:通过降低精度操作来加速模型 推测解码:使用一个小的“草稿”模型来加速llm来预测一个大的“目标”模型的输出 张量并行:通过在多个设备…