kafka(五)——消费者流程分析(c++)

概念

​ 消费者组(Consumer Group):由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

整体流程

在这里插入图片描述

流程说明:

  • 消费者组包括多个消费者,每个消费者只能消费分区中的一部分数据;
  • 当一个消费者组中的消费者读取一个分区中的数据时,其他消费者就不能再读取该分区中的数据;
  • 一个消费者组可以有多个消费者,每个消费者只能消费分配给该消费者组的某些主题的某些分区;
  • 同一个分区只会被一个消费者组中的一个消费者消费,不同消费者组之间可以重复消费
  • 当消费者组中的某个消费者宕机后,Kafka会将该消费者所消费的分区重新分配给其他消费者,从而实现消费者的高可用性;
  • 消费者组中的消费者可以动态加入和退出,Kafka会自动重新分配分区;
  • 在同一个消费者组内,消费者之间可以进行负载均衡,以此来提高消息的吞吐量和消费的效率;
  • 消费者组可以通过消费者组ID(groupid)来标识,一个消费者组ID可以同时消费多个主题;

配置参数说明

参数名称描述
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

分区策略

  • Range
# 特点
确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
# 算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个。

在这里插入图片描述

  • RoundRobin
# 特点
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。

在这里插入图片描述

  • Sticky
# 特点
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
  • CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+ CooperativeSticky。

回调函数说明

事件回调

  • 设置回调
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("Conf set(event_cb) failed, errorStr:%s\n", errorStr.c_str());
    return;
}
  • 回调处理
// 设置事件回调
class ConsumerEventCb : public RdKafka::EventCb 
{
public:
	void event_cb(RdKafka::Event &event) 
	{
		switch (event.type()) 
		{
		case RdKafka::Event::EVENT_ERROR:
			break;
		case RdKafka::Event::EVENT_STATS:
			break;
		case RdKafka::Event::EVENT_LOG:
			break;
		case RdKafka::Event::EVENT_THROTTLE:
			break;
		default:
			break;
		}
	}
};

消费者组再平衡回调

  • 设置回调
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    ELogError(("%s|Conf set(rebalance_cb) failed, errorStr:%s", GET_CODE_INFO(), errorStr.c_str()));
    break;
}
  • 回调处理
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
private:
	// 打印当前获取的分区
	static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions) 
	{
		for (unsigned int i = 0; i < partitions.size(); i++) 
		{
			printf("count:%d, topic:%s,partition:%d\n",
				i, 
				partitions[i]->topic().c_str(),
				partitions[i]->partition());
		}
	}

public:
	// 消费者组再平衡回调
	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition *> &partitions) 
	{
		printf("RebalanceCb: %s", RdKafka::err2str(err).c_str());
		printTopicPartition(partitions);

		// 分区分配成功
		if (RdKafka::ERR__ASSIGN_PARTITIONS == err) 
		{
			// 消费者订阅这些分区
			consumer->assign(partitions);
			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
			m_partitionCount = (int)partitions.size();
		} 
		else   // 分区分配失败
		{
			// 消费者取消订阅所有的分区
			consumer->unassign();
			// 消费者订阅分区的数量为0
			m_partitionCount = 0;
		}
	}

private:
	int m_partitionCount;    // 消费者组本次订阅的分区数量
};

流程(c++)

  • 配置消费者客户端;
  • 订阅主题和分区;
  • 拉取消息;
  • 处理消息;
  • 提交消费位移;

配置消费者客户端

int CKafkaConsumer::Create()
{
	std::string errorStr;
	RdKafka::Conf::ConfResult errorCode;

	do 
	{
		// 1、创建配置对象
		// 1.1、构造 consumer conf 对象
		m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
		if(nullptr == m_config)
		{
            printf("Create RdKafka Conf failed.\n");
			break;
		}

		// 必要参数1:指定 broker 地址列表
		errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 必要参数2:设置消费者组 id
		errorCode = m_config->set("group.id", m_groupID, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(group.id) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置事件回调
		m_event_cb = new ConsumerEventCb;
		errorCode = m_config->set("event_cb", m_event_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(event_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置消费者组再平衡回调
		m_rebalance_cb = new ConsumerRebalanceCb;
		errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
		errorCode = m_config->set("enable.partition.eof", "false", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 每次最大拉取的数据大小
		errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置分区分配策略:range、roundrobin、cooperative-sticky
		errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳探活超时时间---1s
		errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳保活间隔
		errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 1.2、创建 topic conf 对象
		m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
		if (nullptr == m_topicConfig) 
		{
            printf("Create RdKafka Topic Conf failed.\n");
			break;
		}

		// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
		errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 默认 topic 配置,用于自动订阅 topics
		errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 2、创建 Consumer 对象
		m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
		if (nullptr == m_consumer) 
		{
            printf("Create KafkaConsumer failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

        printf("Created consumer success, consumerName:%s.\n",
                  m_consumer->name().c_str());
		return 0;
	} while (0);

	Destroy();
	return -1;
}

订阅主题和分区

std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");

RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode) 
{
    printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
    return;
}

拉取消息

// 可放到线程中处理

while (m_running) 
{
    RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
    if(NULL != msg)
    {
        // 消费消息
        ConsumeMsg_(msg, NULL);

        m_consumer->commitAsync(); 
        delete msg;
    }
}

处理消息

void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{
	switch (msg->err()) 
	{
	case RdKafka::ERR__TIMED_OUT: // 超时
		break;
	case RdKafka::ERR_NO_ERROR:   // 有消息进来
		printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",
			msg->topic_name().c_str(), 
			msg->partition(), 
			msg->key()->c_str(), 
			(char *)msg->payload());
            
        // 消息处理
		break;
	default:
		break;
	}
}

提交消费位移

m_consumer->commitAsync(); 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/532453.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

猫头虎博主深度探索:Amazon Q——2023 re:Invent 大会的 AI 革新之星

摘要 大家好&#xff0c;我是猫头虎博主&#xff01;今天&#xff0c;我要带大家深入了解2023年 re:Invent 大会上发布的一款革命性产品——Amazon Q。让我们一起探索这个引领未来工作方式的新型工具吧&#xff01; 引言 在2023年的 re:Invent 大会上&#xff0c;亚马逊云科…

RAG 修炼手册|一文讲透 RAG 背后的技术

在之前的文章中《RAG 修炼手册&#xff5c;RAG敲响丧钟&#xff1f;大模型长上下文是否意味着向量检索不再重要》&#xff0c;我们已经介绍过 RAG 对于解决大模型幻觉问题的不可或缺性&#xff0c;也回顾了如何借助向量数据库提升 RAG 实战效果。 今天我们继续剖析 RAG&#xf…

docker-compose 之 OpenGauss

使用 docker 启动高斯数据库的示范脚本如下&#xff1a; docker-compose.yml version: 3.7 services:opengauss:image: enmotech/opengauss:5.1.0container_name: opengaussnetwork_mode: "host"privileged: truevolumes:- ./opengauss:/var/lib/opengaussenvironm…

前端mock数据——使用mockjs进行mock数据

前端mock数据——使用mockjs进行mock数据 一、安装二、mockjs的具体使用 一、安装 首选需要有nodejs环境安装mockjs&#xff1a;npm install mockjs 若出现像上图这样的错&#xff0c;则只需npm install mockjs --legacy-peer-deps即可 src下新建mock文件夹&#xff1a; mo…

微服务-网关

在微服务架构中&#xff0c;每个服务都是一个可以独立开发和运行的组件&#xff0c;而一个完整的微服务架构由一系列独立运行的微服务组成。其中每个服务都只会完成特定领域的功能&#xff0c;比如订单服务提供与订单业务场景有关的功能、商品服务提供商品展示功能等。各个微服…

SpringBoot文件上传--头像上传

目录 1.在配置文件中写好物理路径和访问路径 2.写配置文件 3.页面上传 4.控制层 5.效果 1.在配置文件中写好物理路径和访问路径 &#xff08;自定义&#xff09;file:uploadPath: D:/upload/img/ 物理路径path: /file/** 访问路径 2.写配置文件 package com.example…

BCLinux8U6系统基线加固致无法su的问题分析

本文对BCLinux8U6系统进行基线加固致无法su的问题分析。 一、问题现象 对BCLinux8U6系统进行基线加固&#xff0c;su切换用户失败&#xff0c;报错信息如下&#xff1a; [ABCDlocalhost ~]$ su - 密码&#xff1a; su: 模块未知 二、问题分析 1、错误排查 出错前&#xf…

探索进程控制第一弹(进程终止、进程等待)

文章目录 进程创建初识fork函数fork函数返回值fork常规用法fork调用失败的原因 写时拷贝进程终止进程终止是在做什么&#xff1f;进程终止的情况代码跑完&#xff0c;结果正确/不正确代码异常终止 如何终止 进程等待概述进程等待方法wait方法waitpid 进程创建 初识fork函数 在…

自定义校验(这里是Validation)

1.自定义注解State package com.itheima.anno;import com.itheima.Validator.StateValidator; import jakarta.validation.Constraint; import jakarta.validation.Payload;import java.lang.annotation.*;Documented//元注解 Target(ElementType.FIELD) Retention(RetentionP…

力扣HOT100 - 239. 滑动窗口最大值

解题思路&#xff1a; class Solution {public int[] maxSlidingWindow(int[] nums, int k) {if(nums.length 0 || k 0) return new int[0];Deque<Integer> deque new LinkedList<>();int[] res new int[nums.length - k 1];// 未形成窗口for(int i 0; i <…

瑞山集团携手数环通iPaaS,实现ERP与CRM无缝对接

01 客户背景 瑞山集团位于粤港澳大湾区核心商圈----佛山市&#xff0c;是一家致力于各种新型材料添加剂应用推广&#xff0c;集科研、销售和服务于一身的新材料企业&#xff0c;产品覆盖了塑料黏胶剂、水性涂料、3D打印材料、电子材料、纳米材料等各种行业。 公司旗下拥有几间不…

pdffactory pro 8注册码序列号下载 附教程

PdfFactory Pro可以说是一款行业专业且技术领先的的PDF虚拟打印机软件。其不仅占用系统内存小巧&#xff0c;功能强大&#xff0c;可支持用户无需使用Acrobat来创建Adobe PDF即可以进行PDF组件的创建和打印。同时&#xff0c;现在全新的PdfFactory Pro 8也正式上线来袭&#xf…

雄安建博会:中矿雄安新区的总部开工建设

中矿落位雄安&#xff1a;助力国家战略与新区发展 雄安新区&#xff0c;作为中国未来发展的重要战略支点&#xff0c;正迎来一系列央企总部的疏解与建设。最近&#xff0c;中国矿产资源集团有限公司&#xff08;简称“中矿”&#xff09;在雄安新区的总部项目正式开工建设&…

在win10上虚拟一个LoongOS系统(类似虚拟机)作为开发环境

文章目录 1.安装1.1.下载这三个东西1.2.安装好qemu。1.3.创建一个启动脚本startup_mate.bat&#xff0c;然后把三部分东西放到一起1.4.然后双击startup.bat就可以启动了。 2.文件的传输2.1.使能虚拟机系统的ssh2.2.连接ssh 3.Qt相关安装Qt安装opencv 1.安装 注意&#xff0c;一…

Web前端—属性描述符

属性描述符 假设有一个对象obj var obj {a:1 }观察这个对象&#xff0c;我们如何来描述属性a&#xff1a; 值为1可以重写可以遍历 我们可以通过Object.getOwnPropertyDescriptor得到它的属性描述符 var desc Object.getOwnPropertyDescriptor(obj, a); console.log(desc);我…

uniapp 2.0可视化开发工具:提升跨平台应用开发效率的新篇章

摘要 随着移动互联网的迅猛发展&#xff0c;跨平台应用开发成为前端开发者关注的热点。uniapp作为一款优秀的跨平台应用框架&#xff0c;其2.0版本的发布为开发者带来了更多的便利和可能性。其中&#xff0c;可视化开发工具的出现更是为前端开发带来了革命性的变革&#xff0c…

Windows下docker-compose部署DolphinScheduler

参照&#xff1a;快速上手 - Docker部署(Docker) - 《Apache DolphinScheduler v3.1.0 使用手册》 - 书栈网 BookStack 下载源文件 地址&#xff1a;https://dolphinscheduler.apache.org/zh-cn/download/3.2.1 解压到指定目录&#xff0c;进入apache-dolphinscheduler-xxx-…

【Qt 学习笔记】Qt信号和槽的其他说明及Lambda表达式

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt信号和槽的其他说明及Lambda表达式 文章编号&#xff1a;Qt 学习笔记…

ctfshow--web入门--文件上传--web168--web170

web168 法一免杀脚本 还是检查&#xff0c;准备上传图片马 我写的是<?php eval($_POST[a]);?> 上传之后没反应 那么查一下&#xff0c;原来是发现对eval,system还有$_POST和$_GET进行过滤,$_REQUEST还可以用 那么再写一个马&#xff08;免杀脚本&#xff09; <?…

ht1622不显示无反应问题解决

如果你正在写ht1622 驱动时&#xff0c;怎么看程序都没问题&#xff0c;抓取波形&#xff0c;示波器分析波形&#xff0c;如果都没有问题&#xff0c;那么很大可能是硬件问题&#xff0c;检测看看 ht1622 RD是不是接地了。 RD 低会进入读取模式&#xff0c;所以不用RD 请将RD悬…