Kafka 消息 0 丢失的最佳实践

文章目录

  • Kafka 消息 0 丢失的最佳实践
  • 生产者端的最佳实践
    • 使用带有回调的 producer.send(msg, callback) 方法
    • 设置 acks = all
    • 设置 retries 为一个较大的值
    • 启用幂等性与事务(Kafka 0.11+)
    • 正确关闭生产者与 flush() 方法
  • Broker 端的最佳实践
    • 设置 unclean.leader.election.enable = false
    • 设置 replication.factor >= 3
    • 设置 min.insync.replicas > 1
    • 确保 replication.factor > min.insync.replicas
    • 优化 Broker 存储与磁盘配置
  • 消费者端的最佳实践
    • 确保消息消费完成再提交
    • 处理 Rebalance 事件
    • 异常重试与死信队列(DLQ)
  • 业务维度的 0 丢失架构
    • 本地消息表 + 定时扫描
  • 监控与告警
  • 结论


Kafka 消息 0 丢失的最佳实践

在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者Broker消费者三方面的配置与优化。


生产者端的最佳实践

使用带有回调的 producer.send(msg, callback) 方法

Kafka 的 producer.send(msg) 方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback) 方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。

示例代码:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def callback(record_metadata, exception):
    if exception:
        print(f"Message failed to send: {exception}")
    else:
        print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")

producer.send('my-topic', b'Hello, Kafka!', callback=callback)

设置 acks = all

acks 参数用于控制 Kafka 消息发送的确认机制。当 acks=all 时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。

配置方法:

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    acks='all'  # 设置为 all 以确保所有副本都成功接收消息
)

acks = 0 (No acknowledgment)

在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。

优点:

  • 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
  • 延迟最小,适用于对消息丢失容忍度较高的场景。

缺点:

  • 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
  • 对于大多数生产环境不建议使用,因为会丢失数据。

适用场景:

  • 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。

acks = 1 (Leader acknowledgment)

在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。

优点:

  • 相对于 acks=0,可靠性更高,因为至少 Leader 节点会确认收到消息。
  • 仍然保持较好的性能,延迟比 acks=all 要低。

缺点:

  • 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
  • 不能保证消息最终会被所有副本保存。

适用场景:

  • 对消息丢失容忍度较高,但仍希望比 acks=0 更加可靠的场景。

acks = all (All acknowledgments)

在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。

优点:

  • 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
  • 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。

缺点:

  • 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
  • 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。

适用场景:

  • 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。

总结

  • acks=0:适合对数据丢失不敏感且要求极高性能的场景。
  • acks=1:适合对性能要求高,但也需要一定可靠性的场景。
  • acks=all:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。

设置 retries 为一个较大的值

在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries 参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。

配置方法:

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    retries=10  # 设置重试次数,确保网络波动时消息不会丢失
)

启用幂等性与事务(Kafka 0.11+)

在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。

配置方法:

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    acks='all',
    enable_idempotence=True,
    transactional_id='my-transaction-id'
)
producer.init_transactions()
try:
    producer.begin_transaction()
    producer.send('my-topic', b'Transactional message')
    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()

正确关闭生产者与 flush() 方法

在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush() 方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。

示例代码:

producer.send('my-topic', b'Final message')
producer.flush()  # 确保所有消息发送完成
producer.close()

Broker 端的最佳实践

设置 unclean.leader.election.enable = false

unclean.leader.election.enable 参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false

配置方法:

unclean.leader.election.enable=false

设置 replication.factor >= 3

通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3,即每个分区有至少三个副本。

配置方法:

replication.factor=3

设置 min.insync.replicas > 1

min.insync.replicas 参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。

配置方法:

min.insync.replicas=2

确保 replication.factor > min.insync.replicas

为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor 应该大于 min.insync.replicas。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。

推荐配置:

replication.factor=3
min.insync.replicas=2

优化 Broker 存储与磁盘配置

  • 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
  • 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数 fsync 配置)。
  • 日志刷写策略:调整 log.flush.interval.messageslog.flush.interval.ms(默认不推荐修改,但在极端情况下可适当调整)。

消费者端的最佳实践

确保消息消费完成再提交

Kafka 的 Consumer 端提供了 enable.auto.commit 配置项来控制位移提交。将其设置为 false,并结合 commitSync()commitAsync() 方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。

配置方法:

consumer = KafkaConsumer('my-topic', enable_auto_commit=False)

# 手动提交位移
consumer.commitSync()

处理 Rebalance 事件

消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener 并在失去分区所有权前提交偏移量。

示例代码:

from kafka import ConsumerRebalanceListener

class RebalanceListener(ConsumerRebalanceListener):
    def on_partitions_revoked(self, revoked):
        consumer.commitSync()

    def on_partitions_assigned(self, assigned):
        pass

consumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())

异常重试与死信队列(DLQ)

在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。

示例代码:

for message in consumer:
    try:
        process_message(message)
        consumer.commitSync()
    except Exception as e:
        send_to_dlq(message)
        consumer.commitSync()  # 避免重复消费

业务维度的 0 丢失架构

本地消息表 + 定时扫描

在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。


监控与告警

  • 生产者监控:跟踪 record-error-raterequest-latency 等指标。
  • Broker 监控:关注 UnderReplicatedPartitionsIsrShrinksPerSecOfflinePartitionsCount
  • 消费者监控:监控 Consumer Lag(滞后量),确保消费进度正常。
  • 告警规则:当 ISR 数量小于 min.insync.replicas 或副本不足时触发告警。

结论

通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/978889.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linux中安装部署Jenkins,成功构建springboot项目详细教程

参考别人配置Jenkins的git地址为https,无法连上github拉取项目,所以本章节介绍通过配置SSH地址来连github拉取项目 目录: 1、springboot项目 1.1 创建名为springcloudproject的springboot项目工程 1.2 已将工程上传到github中,g…

idea + Docker + 阿里镜像服务打包部署

一、下载docker desktop软件 官网下载docker desktop,需要结合wsl使用 启动成功的画面(如果不是这个画面例如一直处理start或者是stop需要重新启动,不行就重启电脑) 打包成功的镜像在这里,如果频繁打包会导致磁盘空间被占满,需…

cpp中的继承

一、继承概念 在cpp中,封装、继承、多态是面向对象的三大特性。这里的继承就是允许已经存在的类(也就是基类)的基础上创建新类(派生类或者子类),从而实现代码的复用。 如上图所示,Person是基类&…

MySQL - Navicat自动备份MySQL数据

对于从事IT开发的工程师,数据备份我想大家并不陌生,这件工程太重要了!对于比较重要的数据,我们希望能定期备份,每天备份1次或多次,或者是每周备份1次或多次。 如果大家在平时使用Navicat操作数据库&#x…

el-input实现金额输入

需求&#xff1a;想要实现一个输入金额的el-input&#xff0c;限制只能输入数字和一个小数点。失焦数字转千分位&#xff0c;聚焦转为数字&#xff0c;超过最大值&#xff0c;红字提示 效果图 失焦 聚焦 报错效果 // 组件limitDialog <template><el-dialog:visible.s…

基于 C++ Qt 的 Fluent Design 组件库 QFluentWidgets

简介 QFluentWidgets 是一个基于 Qt 的 Fluent Designer 组件库&#xff0c;内置超过 150 个开箱即用的 Fluent Designer 组件&#xff0c;支持亮暗主题无缝切换和自定义主题色。 编译示例 以 Qt5 为例&#xff08;Qt6 也支持&#xff09;&#xff0c;将 libQFluentWidgets.d…

Metal学习笔记八:纹理

到目前为止&#xff0c;您已经学习了如何使用片段函数和着色器为模型添加颜色和细节。另一种选择是使用图像纹理&#xff0c;您将在本章中学习如何操作。更具体地说&#xff0c;您将了解&#xff1a; • UV 坐标&#xff1a;如何展开网格&#xff0c;以便可以对其应用纹理。 •…

JWT使用教程

目录 JWT (JSON Web Token)1. JWT简介(1) 什么是JWT(2) JWT有什么用(3) JWT认证方式 2. JWT的组成部分3. 签名的目的4. JWT与Token的区别5 JWT的优势6 JJWT签发与验证token(1) 引入依赖(2) 创建 Token(3) 解析Token(4) 设置过期时间(5) 自定义claims 7. JWT自定义工具类 JWT (J…

SpringBoot整合SpringSecurity、MyBatis-Plus综合实例:认证、授权

Spring Security 安全框架,系列文章: 《SpringSecurity创建一个简单的自定义表单的认证应用》 《SpringSecurity中的过滤器链与自定义过滤器》 《SpringSecurity实现自定义用户认证方案》 《SpringSecurity密码编码器:使用BCrypt算法加密、自定义密码编码器》 《SpringSecur…

CSDN年度评选揭晓,永洪科技AI技术与智能应用双星闪耀

近日&#xff0c;永洪科技在CSDN&#xff08;中国专业开发者社区&#xff09;的年度评选中&#xff0c;凭借在人工智能技术创新与vividime在行业应用中的卓越表现&#xff0c;一举斩获“人工智能企业”及“智能应用”双料大奖。这一荣誉不仅彰显了永洪科技在AI领域的领先地位&a…

深度学习之“雅可比矩阵与黑塞矩阵”

在深度学习中&#xff0c;导数在优化算法、梯度计算、反向传播等方面起着至关重要的作用。雅可比矩阵&#xff08;Jacobian Matrix&#xff09;和黑塞矩阵&#xff08;Hessian Matrix&#xff09;是多元微积分中的两个重要概念&#xff0c;理解它们的计算方法及应用对掌握深度学…

【MySQL】InnoDB中的Buffer Pool

目录 1、背景2、Buffer Pool【1】含义【2】组成【3】free链表【4】哈希查找缓存页【5】flush链表【6】LRU链表【7】刷新脏页到磁盘【8】Buffer Pool实例【9】chunk【10】Buffer Pool状态信息 3、总结 1、背景 mysql数据是存储在磁盘上的&#xff0c;但是从磁盘上读取数据的速度…

可狱可囚的爬虫系列课程 15:防盗链反爬虫的处理

一、防盗链了解 防盗链是一种技术手段&#xff0c;主要用于防止其他网站通过直接链接的方式使用本网站的资源&#xff08;如图片、文件等&#xff09;&#xff0c;从而节省带宽和服务器资源。当其他网站尝试直接链接到受保护的资源时&#xff0c;服务器会根据设置的规则判断请求…

Dashboard-frps

通过浏览器查看 frp的状态以及代理统计信息展示。 注&#xff1a;Dashboard 尚未针对大量的 proxy 数据展示做优化&#xff0c;如果出现 Dashboard 访问较慢的情况&#xff0c;请不要启用此功能。 需要在 frps.ini中指定 dashboard服务使用的端口&#xff0c;即可开启此功能&…

Element-Plus,使用 El-form中 的 scroll-to-error 没有效果问题记录

因业务需要表单组件中嵌套着表格列表&#xff0c;内容比较多&#xff1b; 所以需要表单校验不通过时&#xff0c;自动定位到不通过的节点&#xff1b; 但发现这个像是没有起到效果一样&#xff0c;后面就是排查的思路了&#xff1a; 容器高度问题&#xff1a;如果表单容器的高度…

实现 Leaflet 多类型点位标记与聚合功能的实战经验分享

在现代的地理信息系统&#xff08;GIS&#xff09;应用中&#xff0c;地图功能是不可或缺的一部分。无论是展示商业网点、旅游景点还是公共服务设施&#xff0c;地图都能以直观的方式呈现数据。然而&#xff0c;当数据量较大时&#xff0c;地图上可能会出现大量的标记点&#x…

次日留存率——mysql计算过程

次日留存率——mysql计算过程 问题&#xff1a;有一张表&#xff0c;有用户id、用户浏览时间a_time&#xff0c;计算每天的用户数、以及次日留存率、三日留存率 创建表user() CREATE TABLE user (id INT, a_time DATE );插入 10 条随机数据 INSERT INTO user (id, a_time) …

老旧android项目编译指南(持续更)

原因 编译了很多项目&#xff0c;找到了一些可观的解决办法 1. android studio里面的jdk版本切换 jdk版本切换在这里&#xff0c;一般安卓开发需要用到4个版本的jdk,jdk8, jdk11, jdk17, jdk21新版的android stuio是默认使用高版本的jdk,所以切换版本是很有必要的 2. 命令…

将VsCode变得顺手好用(1

目录 设置中文 配置调试功能 提效和增强相关插件 主题和图标相关插件 创建js文件 设置中文 打开【拓展】 输入【Chinese】 下载完成后重启Vs即可变为中文 配置调试功能 在随便一个位置新建一个文件夹&#xff0c;用于放置调试文件以及你未来写的代码&#xff0c;随便命名但…

unity学习59: 滑动条 和 滚动条 滚动区域

目录 1 滑动条 slider 1.1 创建slider 1.2 构成的子物体 1.2.1 找到 某个UI的 方法 1.3 构成的component&#xff0c;主体就是 slider 2 核心属性 2.1 value 2.2 direction 3 作用 3.1 由于是fill back 可以实现血条效果 3.2 可以取得 slider.value 数值 1 滑动条…