kafka(七)——消息偏移(消费者)

概念

消费者消费完消息后,向_consumer_offset主题发送消息,用来保存每个分区的偏移量。

在这里插入图片描述

流程说明

  1. consumer发送JoinGroup请求;
  2. coordinator选出一个consumer作为leader,并将topics发送给leader消费者;
  3. leader consumer负责制定消费方案;
  4. leader consumer将消费方案发送给coordinator;
  5. coordinator将消费方案发送给CG中的每个consumer;
  6. 每个consumer与coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该consumer被移除,触发再平衡,或者消费者处理消息过长(max.poll.interval.ms=300s),也会触发再平衡;

适用场景

消费者数量发生变化、消费者订阅主题发生变化或者分区数量发生变化时,会触发kafka的再平衡(Rebalance),再平衡后,消费者可能被分到新的分区,为保证高可用和伸缩性,消费者需要读取每个分区最后一次偏移量。

注意:再平衡期间,群组不可用,消费者无法读取消息。

再平衡(Rebalance)

再平衡(Rebalance),是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

触发场景

  • 消费者个数发生变化,有新的消费者或分组中的消费者停止消费;
  • 订阅的主题(topic)个数发生变化;
  • 订阅的主题分区发生变化(partition);

影响

  • 再平衡时,消费者组下的所有消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配;
  • 再平衡过程会对消费者组产生非常严重的影响,所有的消费者都将停止工作,直到再平衡执行完成;

分区分配策略

Range范围分配策略

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RangeAssignor
算法

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

图解

n = 2 = 8/3

m = 2 = 8%3

前2个消费者消费(2+1)个,剩余消费者消费2个。

在这里插入图片描述

RoundRobin轮询策略

将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.RoundRobinAssignor
图解

在这里插入图片描述

Stricky粘性分配策略

在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

参数配置
partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor
图解
  • 故障前

在这里插入图片描述

  • 故障后

在这里插入图片描述

代码示例

// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
public:
	// 消费者组再平衡回调
	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition *> &partitions) 
	{
		if (RdKafka::ERR__ASSIGN_PARTITIONS == err)  // 分区分配成功
		{
			// 消费者订阅这些分区
			consumer->assign(partitions);
			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
			m_partitionCount = (int)partitions.size();
		} 
		else   // 分区分配失败
		{
			// 消费者取消订阅所有的分区
			consumer->unassign();
			// 消费者订阅分区的数量为0
			m_partitionCount = 0;
		}
	}

private:
	int m_partitionCount;    // 消费者组本次订阅的分区数量
};


RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

std::string errorStr = ""; 
RdKafka::RebalanceCb* rebalance_cb = new ConsumerRebalanceCb;
RdKafka::Conf::ConfResult errorCode = t_config->set("rebalance_cb", rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(rebalance_cb) failed, err:%s\n", errorStr.c_str());
    delete t_config;
    return;
}

提交方式

自动提交

参数配置

# 默认自动提交,消费者close时也会自动提交
enable.auto.comnit=true

# 自动提交周期,默认5s
auto.commit.interval.ms=5000

代码示例

RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
	// 消费消息
	ConsumeMsg_(msg);

    // 消息消费完后无需手动处理,kafka自动提交偏移
    delete msg;
}

存在的问题

如果在周期5s内发生再平衡,导致偏移量未提交,未提交的消息会被重复消费。

手动提交

参数配置

RdKafka::Conf* t_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(NULL == t_config)
{
    printf("create conf failed\n");
    return;
}

RdKafka::Conf* topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (NULL == topicConfig) 
{
    printf("create topic conf failed\n");
    delete t_config;
    return;
}

std::string errorStr = ""; 
RdKafka::Conf::ConfResult errorCode = topicConfig->set("enable.auto.commit", " false", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(enable.auto.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = topicConfig->set("auto.offset.commit", " earliest", errorStr);
if(RdKafka::Conf::CONF_OK != errorCode)
{
    printf("set topic conf(auto.offset.commit) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

// 默认 topic 配置,用于自动订阅 topics
errorCode = t_config->set("default_topic_conf", topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("set conf(default_topic_conf) failed, err:%s\n", errorStr.c_str());
    delete topicConfig;
    delete t_config;
    return;
}

同步提交

  • 消息消费完,手动调用commitSync;
  • 在同步提交未完成的情况下发生再平衡,消息会被重复消费;
  • commitSync会阻塞直到偏移提交成功;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitSync(); 
    delete msg;
}

异步提交

  • 消息消费完,手动调用commitAsync;
  • commitAsync不会重试提交偏移量;
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
    // 消费消息
    ConsumeMsg_(msg, NULL);

    // 开启手动提交
    m_consumer->commitAsync(); 
    delete msg;
}

存在的问题

重复消费(同步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为earliest;
  • 上次提交的偏移量为1;
  • 由于网络故障、超时等原因,2~7已消费完的情况下,8未提交成功,由于设置了参数auto.offset.commit=earliest,分区再平衡后会继续从2开始消费,会导致消息重复消费的问题;
消息丢失(异步提交)

在这里插入图片描述

  • auto.offset.commit参数设置为latest;
  • 上次提交的偏移量为1;
  • 本次消费的偏移量范围为27,消费者立马提交了偏移量8,由于网络故障、超时等原因,27未消费完,由于设置了参数auto.offset.commit=latest,再平衡后会继续从8开始消费,会导致消息重复丢失的问题;

解决方案

根据实际场景选择同步提交还是异步提交。如果对消息可靠性要求比较高,不允许数据丢失,建议选择同步提交+“auto.offset.commit=earliest”,性能略差。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/609616.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

4. Python的深拷贝、浅拷贝

文章目录 0、先说结论1、浅拷贝修改元素值2、深拷贝修改元素值学习链接 0、先说结论 无论深拷贝还是浅拷贝都会为新对象分配一块新的内存&#xff0c;因此新老对象id不相同。 对于浅拷贝&#xff0c;新老对象内部的可变and不可变元素id都是相同的(在没修改元素值之前)。 对于深…

springboot -多数据源管理方案

多数据源的配置有多种方式 方式一 、依赖dataSource的配置 1.建立多数据源配置 spring:# 数据源配置datasource:pdm:driver-class-name: oracle.jdbc.driver.OracleDriverjdbc-url: jdbc:oracle:thin:10.216.xxx.xxx:3000:orclusername: cfpdmpassword: capecapp:driver-cla…

移动安全测试框架-MobSF window环境配置

一. 介绍&#xff1a; MOBSF&#xff08;Mobile Security Framework&#xff09;是一个开源的移动安全渗透测试框架&#xff0c;用于评估移动应用程序的安全性。它提供了一组功能强大的工具和技术&#xff0c;帮助安全专业人员和开发人员发现和修复移动应用程序中的安全漏洞。 …

React 第二十六章 Hook useCallback

useCallback 是 React 提供的一个 Hook 函数&#xff0c;用于优化性能。它的作用是返回一个记忆化的函数&#xff0c;当依赖发生变化时&#xff0c;才会重新创建并返回新的函数。 在 React 中&#xff0c;当一个组件重新渲染时&#xff0c;所有的函数都会被重新创建。这可能会…

【npm】解决npm包突然消失MODULE_NOT_FOUND

今天折腾新特性时需要升级nodejs&#xff0c;安装新版后npm离奇消失了。C:\Users\**用户名\AppData\Roaming\npm\node_modules下只有cnpm&#xff0c;没有npm的目录。重装nodejs也不好使。 机智如我&#xff0c;试了下cnpm -v是正常的&#xff0c;而且能看到nodejs&#xff0c;…

CSP-j 2022csp-j完善程序易错题

易错题 答案23&#xff1a; 对 解析23&#xff1a; 函数 g 就是把函数 f 改成递推的形式 答案24&#xff1a; 对 解析23&#xff1a; 无。 答案25&#xff1a; C 解析25&#xff1a; m n ( m - 1 ) * ( 1 2 3 4 ... n ) O(mn^2) 答案26&#xff1a; C 解析26&#x…

跨境电商行业蓬勃发展,武汉星起航引领卖家孵化新潮流

近年来&#xff0c;我国跨境电商行业在政府的大力扶持下呈现出强劲的发展势头。随着国内制造业结构的加速调整与居民消费需求升级态势的持续凸显&#xff0c;跨境出口规模占比稳步提升&#xff0c;跨境进口规模同样不断扩大&#xff0c;行业市场规模持续增长。在这一背景下&…

vue3+ant design实现表格数据导出Excel

提示:实现表格数据导出Excel 文章目录 前言 一、安装ant design? 二、引用ant design 1.搭建框架 2.获取表格数据 三、封装导出表格的代码 四、导出 1.获取导出地址 2.在下载导出事件中添加导出代码 五、全部代码 前言 今天终于有时间来更新文章了,最近公司项目比较紧…

【ArcGIS Pro微课1000例】0058:玩转NetCDF多维数据集

一、NetCDF介绍 NetCDF(network Common Data Form)网络通用数据格式是由美国大学大气研究协会(University Corporation for Atmospheric Research,UCAR)的Unidata项目科学家针对科学数据的特点开发的,是一种面向数组型并适于网络共享的数据的描述和编码标准。NetCDF广泛应…

pgsql查看指定模式的存储过程

pgsql查看指定模式的存储过程 在 PostgreSQL 中&#xff0c;如果你想要查看指定模式的存储过程&#xff08;也称为函数&#xff09;&#xff0c;你可以使用 \df 或 \df 命令在 psql 命令行工具中&#xff0c;或者使用 SQL 查询来从 pg_catalog 系统模式中查询。 \df命令行查询…

吴恩达2022机器学习专项课程C2(高级学习算法)W1(神经网络):Lab01 神经元和层

目录 导入Tensorflow的库无激活函数 vs 有激活函数&#xff1f;1.无激活函数2.有激活函数 无激活函数的神经元-回归/线性模型1.创建训练集散点图2.创建层3.层输入4.获取层参数5.层参数的形状6.手动设置层的参数7.层计算vs线性回归模型计算 有激活函数sigmoid的神经元1.创建训练…

武汉星起航深耕亚马逊跨境电商,引领中国卖家开拓全球市场新篇章

在全球经济深度融合的当下&#xff0c;跨境电商已成为连接中国与世界市场的重要桥梁。作为跨境电商领域的佼佼者&#xff0c;武汉星起航电子商务有限公司凭借对亚马逊平台的深入了解和丰富经验&#xff0c;成功引领了中国卖家开拓全球市场的新篇章。 亚马逊&#xff0c;这家起…

计算机发展史故事【7】

二战建奇勋 布雷契莱庄园当然不信德寇的邪说&#xff0c;他们把大约200 名精干人员集中在“3号棚”&#xff0c;四班轮换&#xff0c;24 小时值守&#xff0c;专门对付德国的“斯芬克司之谜”。图林则带着副手、象棋冠军亚历山大&#xff0c;领导着“8 号棚”&#xff0c;进行…

安卓开发--新建工程,新建虚拟手机,按键事件响应

安卓开发--新建工程&#xff0c;新建虚拟手机&#xff0c;按键事件响应 1.前言2.运行一个工程2.1布局一个Button2.2 button一般点击事件2.2 button属性点击事件2.2 button推荐点击事件 本篇博客介绍安卓开发的入门工程&#xff0c;通过使用按钮Buton来了解一个工程的运作机制。…

【论文合集1】- 存内计算加速机器学习

本章节论文合集&#xff0c;存内计算已经成为继冯.诺伊曼传统架构后&#xff0c;对机器学习推理加速的有效解决方案&#xff0c;四篇论文从存内计算用于机器学习&#xff0c;模拟存内计算&#xff0c;对CNN/Transformer架构加速角度阐述存内计算。 【1】WWW: What, When, Where…

Web实时通信的学习之旅:WebSocket入门指南及示例演示

文章目录 WebSocket的特点1、工作原理2、特点3、WebSocket 协议介绍4、安全性 WebSocket的使用一、服务端1、创建实例&#xff1a;创建一个webScoket实例对象1.1、WebSocket.Server(options[&#xff0c;callback])方法中options对象所支持的参数1.2、同样也有一个加密的 wss:/…

2024第九届数维杯数学建模论文模板(内附LaTeX+Word)

一年一度的2024年第九届数维杯国赛报名进行中&#xff01;相信很多同学们已经摩拳擦掌蓄势待发了&#xff01; 经历三天比赛&#xff0c;最后提交的论文就是最终答卷&#xff0c;那么一篇数模论文&#xff0c;包括哪些内容呢&#xff1f; 一篇完整的数模论文&#xff0c;包括…

【初阶数据结构】单链表经典OJ题

目录标题 原题展现题目解析代码展现1.创建新节点2.拷贝random指针3.将新节点尾插 原题展现 该题是力扣上的第138题&#xff0c;题目链接如下&#xff1a;随机链表的复制。 题目解析 我们发现这个链表和一般的链表存在着一点点区别&#xff0c;那就是每个节点多了一个random指…

遥控挖掘机之ESP8266调试心得(1)

ESP8266调试心得 1. 前言2.遇到的问题2.1 ESP8266模块建立TCP连接时候报错2.2 指令异常问题 3. 更新ESP8266固件3. ESP8266的部分AT指令3. 连接步骤3.1 模块与电脑连接3.2.1 电脑上的设置3.2.2 ESP8266模块作为客户机&#xff08;TCP Cilent&#xff09;的设置步骤 3.2 模块与模…

Python深度学习基于Tensorflow(3)Tensorflow 构建模型

文章目录 数据导入和数据可视化数据集制作以及预处理模型结构低阶 API 构建模型中阶 API 构建模型高阶 API 构建模型保存和导入模型 这里以实际项目CIFAR-10为例&#xff0c;分别使用低阶&#xff0c;中阶&#xff0c;高阶 API 搭建模型。 这里以CIFAR-10为数据集&#xff0c;C…