【Kafka】Kafka的重复消费和消息丢失问题

文章目录

    • 前言
    • 一、重复消费
      • 1.1 重复消费出现的场景
        • 1.1.1 Consumer消费过程中,进程挂掉/异常退出
        • 1.1.2 消费者消费时间过长
      • 1.2 重复消费解决方案
        • 1.2.1 针对于消费端挂掉等原因造成的重复消费问题
        • 1.2.2 针对于Consumer消费时间过长带来的重复消费问题
    • 二、消息丢失
      • 2.1 生产端问题
      • 2.2 消费端问题
    • 三、参考

前言

在Kafka中,生产者(Producer)和消费者(Consumer)是通过发布订阅模式进行协作的,生产者将消息发送到Kafka集群,而消费者从Kafka集群中拉取消息进行消费,无论是生产者发送消息到Kafka集群还是消费者从Kafka集群中拉取消息进行消费,都是容易出现问题的,比较典型的就是消费端的重复消费问题、生产端和消费端产生的消息丢失问题。下面将对这两个问题出现的场景以及常见的解决方案进行讲解。

一、重复消费

1.1 重复消费出现的场景

重复消费出现的常见场景主要分为两种,一种是 Consumer在消费过程中,应用进程被强制kill掉或者发生异常退出(挂掉…),另一种则是Consumer消费的时间过长。

1.1.1 Consumer消费过程中,进程挂掉/异常退出

在Kafka消费端的使用中,位移(Offset)的提交有两种方式,自动提交和手动提交。自动提交情况下,当消费者拉取一批消息进行消费后,需要进行Offset的提交,在消费端提交Offset之前,Consumer挂掉了,当Consumer重启后再次拉取Offset,这时候拉取的依然是挂掉之前消费的Offset,因此造成重复消费的问题。在手动提交模式下,在提交代码调用之前,Consumer挂掉也会造成重复消费。

1.1.2 消费者消费时间过长

Kafka消费端的参数max.poll.interval.ms定义了两次poll的最大间隔,它的默认值是 5 分钟,表示 Consumer 如果在 5 分钟之内无法消费完 poll方法返回的消息,那么Consumer 会主动发起“离开组”的请求。

在离开消费组后,开始Rebalance,因此提交Offset失败。之后重新Rebalance,消费者再次分配Partition后,再次poll拉取消息依然从之前消费过的消息处开始消费,这样就造成重复消费。而且若不解决消费单次消费时间过长的问题,这部分消息可能会一直重复消费。

整体上来说,如果我们在消费中将消息数据处理入库,但是在执行Offset提交时,Kafka宕机或者网络原因等无法提交Offset,当我们重启服务或者Rebalance过程触发,Consumer将再次消费此消息数据。

1.2 重复消费解决方案

1.2.1 针对于消费端挂掉等原因造成的重复消费问题

这部分主要集中在消费端的编码层面,需要我们在设计代码时以幂等性的角度进行开发设计,保证同一数据无论进行多少次消费,所造成的结果都一样。处理方式可以在消息体中添加唯一标识(比如将消息生成md5保存到Mysql或者是Redis中,在处理消息前先检查下Mysql/Redis是否已经处理过该消息了),消费端进行确认此唯一标识是否已经消费过,如果消费过,则不进行之后处理。从而尽可能的避免了重复消费。
幂等角度大概两种实现:

  • 将唯一标识存入第三方介质(如Redis),要操作数据的时候先判断第三方介质(数据库或者缓存)有没有这个唯一标识。
  • 将版本号(offset)存入到数据里面,然后再要操作数据的时候用这个版本号做乐观锁,当版本号大于原先的才能操作。
1.2.2 针对于Consumer消费时间过长带来的重复消费问题
  • 提高单条消息的处理速度。例如对消息处理中比较耗时的操作可通过异步的方式进行处理、利用多线程处理等。
  • 其次,在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

二、消息丢失

在Kafka中,消息丢失在Kafka的生产端和消费端都会出现。在此之前我们先来了解一下生产者和消费者的原理。

2.1 生产端问题

生产者原理
Kafka生产者生产消息后,会将消息发送到Kafka集群的Leader中,然后Kafka集群的Leader收到消息后会返回ACK确认消息给生产者Producer。主要拆解为以下几个步骤。

  • Producer先从Kafka集群找到该Partition的Leader。
  • Producer将消息发送给Leader,Leader将该消息写入本地。
  • Follwer从Leader pull消息,写入本地Log后Leader发送ACK。
  • Leader 收到所有 ISR 中的 Replica 的 ACK 后,增加High Watermark,并向 Producer 发送 ACK。
    在这里插入图片描述
    因此,Kafka集群(其实是分区的Leader)最终会返回一个ACK来确认Producer推送消息的结果,这里Kafka提供了三种模式:
  • NoResponse RequiredAcks = 0:这个代表的就是不进行消息推送是否成功的确认。
  • WaitForLocal RequiredAcks = 1:当local(Leader)确认接收成功后,就可以返回了。
  • WaitForAll RequiredAcks = -1:当所有的Leader和Follower都接收成功时,才会返回。

因此这个配置的影响也分为下面三种情况:

  • 设置为0,Producer不进行消息发送的确认,Kafka集群(Broker)可能由于一些原因并没有收到对应消息,从而引起消息丢失。
  • 设置为1,Producer在确认到 Topic Leader 已经接收到消息后,完成发送,此时有可能 Follower 并没有接收到对应消息。此时如果 Leader 突然宕机,在经过选举之后,没有接到消息的 Follower 晋升为 Leader,从而引起消息丢失。
  • 设置为-1,可以很好的确认Kafka集群是否已经完成消息的接收和本地化存储,并且可以在Producer发送失败时进行重试。

生产端解决消息丢失方案:

  • 通过设置RequiredAcks模式来解决,选用WaitForAll(对应值为-1)可以保证数据推送成功,不过会影响延时。
  • 引入重试机制,设置重试次数和重试间隔。
  • 当然,最后就是使用Kafka的多副本机制保证Kafka集群本身的可靠性,确保当Leader挂掉之后能进行Follower选举晋升为新的Leader。

2.2 消费端问题

消费端的消息丢失问题
消费端的消息丢失主要是因为在消费过程中出现了异常,但是对应消息的 Offset 已经提交,那么消费异常的消息将会丢失。
前面介绍过,Offset的提交包括手动提交和自动提交,可通过kafka.consumer.enable-auto-commit进行配置。
手动提交可以灵活的确认是否将本次消费数据的Offset进行提交,可以很好的避免消息丢失的情况。
自动提交是引起消息丢失的主要诱因。因为消息的消费并不会影响到Offset的提交。
大部分的解决方案为了尽可能的保证数据的完整性,都是尽量去选用手动提交的方式,当数据处理完之后再进行提交。
当然,在golang中我们主要使用sarama包的Kafka,sarama自动提交的原理是先进行标记,再进行提交,如下代码所示:

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("Message topic:%q partition:%d offset:%d
", msg.Topic, msg.Partition, msg.Offset)
      // 标记消息已处理,sarama会自动提交
      // 处理数据(如真正持久化mysql...)
      sess.MarkMessage(msg, "")
   }
   return nil

因此,我们完全可以在标记之前进行数据的处理,例如插入Mysql等,当出现插入成功后程序崩溃,下一次最多重复消费一次(因为还没标记,Offset没有提交),而不会因为Offset超前,导致应用层消息丢失了。

手动提交模式下当然是很灵活的控制的,但确实已经没必要了:

consumerConfig := sarama.NewConfig()
consumerConfig.Version = sarama.V2_8_0_0
consumerConfig.Consumer.Return.Errors = false
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自动提交,改为手动
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s
", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      // 插入mysql....
      // 手动提交模式下,也需要先进行标记
      sess.MarkMessage(msg, "")
      consumerCount++
      if consumerCount%3 == 0 {
         // 手动提交,不能频繁调用
         t1 := time.Now().Nanosecond()
         sess.Commit()
         t2 := time.Now().Nanosecond()
         fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")
      }
   }
   return nil
}

三、参考

1、Go语言如何操纵Kafka保证无消息丢失
2、kafka消息重复消费和消息丢失
3、sarama Kafka客户端生产者与消费者梳理
4、Kafka丢数据、重复消费、顺序消费的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/250808.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Qt5】QVersionNumber

2023年12月10日,周日上午 QVersionNumber 是 Qt 框架中用于表示版本号的类。它提供了一种方便的方式来处理和比较版本号,特别是在应用程序或库需要与特定版本的依赖项进行交互时。 以下是一个简单的示例,演示了如何使用 QVersionNumber&…

Spring-temp

IOC/DI实现步骤 1.配置元数据 2.实例化IOC 3.获取Bean 基于XML配置方式 管理组件 1.基于构造函数:有参、无参 2.基于静态工厂方法:有参、无参 依赖注入 1.构造函数 2.setter方法 Bean组件高级特性 1.作用域 2.生命周期 FactoryBean 基于注解 IOC Bean作…

Python如何匹配库的版本

目录 1. 匹配库的版本 2. Python中pip,库,编译环境的问题回答总结 2.1 虚拟环境 2.2 pip,安装库,版本 1. 匹配库的版本 (别的库的版本冲突同理) 在搭建pyansys环境的时候,安装grpcio-tools…

加权准确率WA,未加权平均召回率UAR和未加权UF1

加权准确率WA,未加权平均召回率UAR和未加权UF1 1.加权准确率WA,未加权平均召回率UAR和未加权UF12.参考链接 1.加权准确率WA,未加权平均召回率UAR和未加权UF1 from sklearn.metrics import classification_report from sklearn.metrics impor…

Python中的程序逻辑经典案例详解

我的博客 文章首发于公众号:小肖学数据分析 Python作为一种强大的编程语言,以其简洁明了的语法和强大的标准库,成为了理想的工具来构建这些解决方案。 本文将通过Python解析几个经典的编程问题。 经典案例 水仙花数 问题描述&#xff1a…

三勾商城新功能-电子面单发货

商家快递发货时可以选择在线下单,在线获取和打印电子面单。免去手写面单信息以及避免填写运单号填错,系统会自动填写对应发货商品的运单信息 快递100电子面单1、进入快递100,点击登录 2、登录成功后,点击“电子面单与云打印” 3、进入电子面单与云打印后…

Druid-spring-boot-starter源码阅读-其余组件自动装配

前面我们看完了整个DruidDataSource初始化流程,但是其实Druid除了最核心的数据源之外,还有其他需要自动配置的,细心的人可能看到了,就是利用Import注解导入的四个类。 DruidFilterConfiguration public class DruidFilterConfigu…

解决:TypeError: write() argument must be str, not tuple

解决:TypeError: write() argument must be str, not tuple 文章目录 解决:TypeError: write() argument must be str, not tuple背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使用之前的代码时,报错&…

缓存的定义及重要知识点

文章目录 缓存的意义缓存的定义缓存原理缓存的基本思想缓存的优势缓存的代价 缓存的重要知识点 缓存的意义 在互联网高访问量的前提下,缓存的使用,是提升系统性能、改善用户体验的唯一解决之道。 缓存的定义 缓存最初的含义,是指用于加速 …

联邦边缘学习中的知识蒸馏综述

联邦边缘学习中的知识蒸馏综述 移动互联网的快速发展伴随着智能终端海量用户数据的产生。如何在保护数据隐私的前提下,利用它们训练出性能优异的机器学习模型,一直是业界关注的难点。为此,联邦学习应运而生,它允许在终端本地训练并协同边缘服务器进行模型聚合来实现分布式机器…

下午好~ 我的论文【yolo1~4】(第二期)

写在前面:本来是一期的,我看了太多内容了,于是分成三期发吧 TAT (捂脸) 文章目录 YOLO系列v1v2v3v4 YOLO系列 v1 You Only Look Once: Unified, Real-Time Object Detection 2015 ieee computer society 12.3 CCF-C…

网站提示“不安全”

当你在浏览网站时,有时可能会遇到浏览器提示网站不安全的情况。这通常是由于网站缺乏SSL证书所致。那么,从SSL证书的角度出发,我们应该如何解决这个问题呢? 首先,让我们简单了解一下SSL证书。SSL证书是一种用于保护网站…

论文阅读——Semantic-SAM

Semantic-SAM可以做什么: 整合了七个数据集: 一般的分割数据集,目标级别分割数据集:MSCOCO, Objects365, ADE20k 部分分割数据集:PASCAL Part, PACO, PartImagenet, and SA-1B The datasets are SA-1B, COCO panopt…

Grafana Loki 快速尝鲜

Grafana Loki 是一个支持水平扩展、高可用的聚合日志系统,跟其他的聚合日志系统不同,Loki只对日志的元数据-标签进行索引,日志数据会被压缩并存储在对象存储中,甚至可以存储在本地文件系统中,能够有效降低成本&#xf…

Python基础09-学生管理系统

零、文章目录 Python基础09-学生管理系统 1、学员管理系统功能概述 (1)最终效果图 (2)功能概述 需求:进入系统显示系统功能界面,功能如下: 【1】添加学员信息->add_student【2】删除学员…

Python基础01-环境搭建与输入输出

零、文章目录 Python基础01-环境搭建与输入输出 1、Python概述 (1)为什么要学习Python 技术趋势:Python自带明星属性,热度稳居编程语言界前三 简单易学:开发代码少,精确表达需求逻辑;33个关…

SDN之Python编程创建多数据中心网络

文章目录 1.拓扑结构2.具体步骤 1.拓扑结构 通过python编程创建一个包含2台核心交换机、4台汇聚交换机、8台边缘交换机和16台主机的网络拓扑,如图示: 2.具体步骤 首先ctrlaltT(或右键)打开终端,在/home/shy/minine…

关于学习计算机的心得与体会

也是隔了一周没有发文了,最近一直在准备期末考试,后来想了很久,学了这么久的计算机,这当中有些收获和失去想和各位正在和我一样在学习计算机的路上的老铁分享一下,希望可以作为你们碰到困难时的良药。先叠个甲&#xf…

Ansys Lumerical | 采用一维光栅的出瞳扩展器的优化

附件下载 联系工作人员获取附件 本文演示了一种仿真方法,并举例说明了使用一维光栅的出瞳扩张器(EPE)系统的优化示例。 在此工作流程中,我们使用 Lumerical 构建光栅模型,并使用 RCWA 求解器模拟其响应。完整的EPE系…

云演CTF Blog

1、啥也搞不了,扫目录。出来个console 2、有显示锁掉了 3、抓包,改返回包 改成true,放包 不好意思,不会了,哈哈哈哈哈哈哈哈哈 你会的话,请告诉我,大佬