kafka(四)——生产者流程分析(c++)

前言

  • kafka生产者负责将数据发布到kafka集群的主题;
  • kafka生产者消息发送方式有两种:
    • 同步发送
    • 异步+回调发送

流程

在这里插入图片描述

流程说明:

  • Kafka Producer整体可看作是一个异步处理操作;
  • 消息发送过程中涉及两个线程:main线程和sender线程;
  • main线程负责将消息发送至一个双端队列,sender线程负责从双端队列取消息并发送至kafka broker;

消息可靠性

producer的acks参数表示生产者生产消息时,写入到副本的严格程度。决定了生产者的性能与可靠性

  • 0:生产者发送过来的数据,不等待broker确认,直接发送下一条数据,性能最高,但可能存在丢数据;

在这里插入图片描述

  • 1:生产者发送过来的数据,等待Leader副本确认后发送下一条数据,性能中等;
    在这里插入图片描述

  • -1(all):生产者发送过来的数据,等待所有副本将数据同步后发送下一条数据,性能最慢,安全性最高;

在这里插入图片描述

消息有序性

消息保序策略:按key分区,可以实现局部有序,但这又可能会导致数据倾斜,可根据实际情况选择。

示例:

// 指定消息key,即倒数第二个参数,当有相同的两条消息先后存储同一个key,消费者可按顺序消费到

RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

Main线程与Sender线程

Main线程

流程

  • 创建消息
// librdkafka源码 rdkafka_msg.c

/* Create message */
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,
                        key, keylen, msg_opaque, &err, &errnox, NULL, 0,
                        rd_clock());
if (unlikely(!rkm)) {
    /* errno is already set by msg_new() */
    rd_kafka_set_last_error(err, errnox);
    return -1;
}
  • 选择分区
/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err)) {
    rd_kafka_set_last_error(0, 0);
    return 0;
}
  • 调用拦截器
/* Interceptor: unroll failing messages by triggering on_ack.. */
rkm->rkm_err = err;
rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
                                         &rkm->rkm_rkmessage);

Sender线程

参数说明

batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks见“消息可靠性”章节
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。

流程

  • 达到batch.size大小或满足linger.ms时间发送消息;
  • 消息发送至的kafka服务器后,如果kafka没有应答,默认每个broker节点队列最多缓存 5 个请求,与“max.in.flight.requests.per.connection”参数有关;
  • 如配置了“retries”、“ retry.backoff.ms”参数,消息发送失败由kafka内部自动重试,无需手动在回调函数中重试;

同步和异步流程

同步流程

流程说明

  • 通过produce方法将消息推送至双端队列;
  • 通过flush方法等待发送结果,如outq_len()大于0,说明存在未发送成功的消息;

代码示例

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{
	int32_t len = (int32_t)str.length();
	void *payload = const_cast<void *>(static_cast<const void *>(str.data()));

	// produce 方法,生产和发送单条消息到 Broker
	// 如果不加时间戳,内部会自动加上当前的时间戳
	RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

	if (RdKafka::ERR_NO_ERROR != errorCode) 
	{
		// kafka 队列满,等待 100 ms
		if (RdKafka::ERR__QUEUE_FULL == errorCode) 
		{
			m_producer->poll(100);
		}
        
        return -1;
	}
    
    // 同步等待200ms
    m_producer->flush(200);
    if(m_producer->outq_len() > 0)  // 用于调试
    {
        printf("Existed not send message.size:%d\n", m_producer->outq_len());
        return -1;
    }
    
    return 0;
}

异步流程

流程说明

  • 设置生产者投递报告回调
  • 设置生产者自定义分区策略回调
  • 消息发送

代码示例

  • 设置生产者投递回调
// 生产者投递报告回调
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb 
{
public:
	void dr_cb(RdKafka::Message& message)
	{	
		if (message.err())   // 出错回调
		{
			// TODO
		} 
		else                 // 正常回调
		{  
			// TODO
		}
	}
};

// 设置生产者投递报告回调
m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);    // 异步方式发送数据
if (RdKafka::Conf::CONF_OK != errCode) 
{
    printf("Conf set(dr_cb) failed, errorStr:%s", errorStr.c_str());
    break;
}
  • 设置生产者自定义分区策略回调
// 生产者自定义分区策略回调:partitioner_cb
class HashPartitionerCb : public RdKafka::PartitionerCb 
{
public:
	// @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL
	// @return 返回分区,(0, partition_cnt)
	int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
		int32_t partition_cnt, void *msg_opaque) 
	{
		// 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cnt
		int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
		return partition_id;
	}

private:
	// 自定义哈希函数 
	static inline unsigned int generate_hash(const char *str, size_t len) 
	{
		unsigned int hash = 5381;
		for (size_t i = 0; i < len; i++)
			hash = ((hash << 5) + hash) + str[i];
		return hash;
	}
};

// 设置生产者自定义分区策略回调
m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errCode) 
{
    printf("Conf set(partitioner_cb) failed, errorStr:%s", errorStr.c_str());
    break;
}
  • 消息发送

注意:此处produce执行成功不代表消息发送成功,需根据dr_cb消息回调结果判断消息是否发送成功。

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{
	int32_t len = (int32_t)str.length();
	void *payload = const_cast<void *>(static_cast<const void *>(str.data()));

	// produce 方法,生产和发送单条消息到 Broker
	// 如果不加时间戳,内部会自动加上当前的时间戳
	RdKafka::ErrorCode errorCode = m_producer->produce(
		m_topic,                      // 指定发送到的主题
		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
		// partitioner_cb的回调选择合适的分区
		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
		payload,                        // 消息本身
		len,                            // 消息长度
		&key,                           // 消息key
		NULL
		);

	// 轮询处理
	m_producer->poll(0);
	if (RdKafka::ERR_NO_ERROR != errorCode) 
	{
		// kafka 队列满,等待 100 ms
		if (RdKafka::ERR__QUEUE_FULL == errorCode) 
		{
			m_producer->poll(100);
		}
        
        return -1;
	}
    
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/527562.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JetBrains IDE(IDEA/WebStorm)配置GitHub Copilot

关于 GitHub Copilot 和 JetBrains IDE GitHub Copilot 在编写代码时提供 AI 对程序员的自动完成样式的建议。 有关详细信息&#xff0c;请参阅“关于 GitHub Copilot Individual”。 如果使用 JetBrains IDE&#xff0c;可以直接在编辑器中查看并合并来自 GitHub Copilot 的…

DXP学习002-PCB编辑器的环境参数及电路板参数相关设置

目录 一&#xff0c;dxp的pcb编辑器环境 1&#xff0c;创建新的PCB设计文档 2&#xff0c;PCB编辑器界面 1&#xff09;布线工具栏 2&#xff09;公用工具栏 3&#xff09;层标签栏 ​编辑 3&#xff0c;PCB设计面板 1&#xff09;打开pcb设计面板 4&#xff0c;PCB观…

线程的666种状态

文章目录 在Java中&#xff0c;线程有以下六种状态&#xff1a; NEW&#xff1a;新建状态&#xff0c;表示线程对象已经被创建但还未启动。RUNNABLE&#xff1a;可运行状态&#xff0c;表示线程处于就绪状态&#xff0c;等待系统分配CPU资源执行。BLOCKED&#xff1a;阻塞状态…

抖音在线点赞任务发布接单运营平台PHP网站源码 多个支付通道+分级会员制度

源码介绍 1、三级代理裂变&#xff0c;静态返佣/动态返佣均可设置。&#xff08;烧伤制度&#xff09;。 2、邀请二维码接入防红跳转。 3、自动机器人做任务&#xff0c;任务时间可设置&#xff0c;机器人价格时间可设置。 4、后台可设置注册即送X天机器人。 5、不同级别会…

LLM大语言模型助力DataEase小助手,新增气泡地图,DataEase开源数据可视化分析平台v2.5.0发布

2024年4月8日&#xff0c;DataEase开源数据可视化分析平台正式发布v2.5.0版本。 这一版本的功能升级包括&#xff1a;新增DataEase小助手支持&#xff0c;通过结合智能算法和LLM&#xff08;即Large Language Model&#xff0c;大语言模型&#xff09;能力&#xff0c;DataEas…

Servlet(一)

文章目录 1.Servlet整体框架2.Servlet快速入门1.创建项目配置基本环境2.添加jar包1.在WEB-INF下创建目录lib&#xff0c;添加文件2.添加到项目中3.配置代码提示 3.src下创建文件4.实现Servlet接口5.在web.xml配置HelloServlet6.通过浏览器访问HelloServlet 3.浏览器访问Servlet…

axios是什么?axios使用axios和ajax

Axios 是一个基于 Promise 的 HTTP 客户端&#xff0c;用于浏览器和 Node.js 环境。它是由 GitHub 用户 mzabriskie 开发的&#xff0c;并且得到了广泛的社区支持。Axios 的设计目标是提供一种简洁、易用且功能强大的 HTTP 请求方式&#xff0c;以替代传统的 Ajax&#xff08;A…

ssm031社区管理与服务的设计与实现+jsp

社区管理与服务系统的设计与实现 摘 要 Abstract 第一章 绪论 1.1研究背景 1.2 研究现状 1.3 研究内容 第二章 系统关键技术 2.1 Java简介 2.2 MySql数据库 2.3 B/S结构 2.4 Tomcat服务器 第三章 系统分析 3.1可行性分析 3.1.1技术可行性 3.1.2经济可行性 3.1.…

usbserial驱动流程解析_Part3_write_read数据流分析

usb转串口设备需要进行收发&#xff0c;主机接到uart rx线上的数据&#xff0c;把主机数据发送到 uart tx线上&#xff0c;对应的就是read函数和write函数&#xff0c;以上两个Part提到了tty有关的函数&#xff0c;在以下结构体被定义&#xff0c;write被指定为serial_write&a…

在 VS Code 中使用 GitHub Copilot

Code 结合使用。 GitHub Copilot 是什么 GitHub Copilot 是一个可以帮助你更简单、更快速地编写代码的工具&#xff0c;由 GPT-3 提供支持。你只需编写所需代码的描述——例如&#xff0c;编写一个函数来生成一个随机数&#xff0c;或对一个数组进行排序——Copilot 就会为你…

ATM04-6P 安费诺汽车连接器6芯压线端子胶壳

ATM04-6P是一款压线端子胶壳&#xff0c;属于Amphenol&#xff08;安费诺&#xff09;品牌 ATM04-6P 规格信息&#xff1a; 制造商:Amphenol 产品种类:汽车连接器 RoHS:是 产品:Connectors 位置数量:6 Position 型式:Receptacle (Female) 线规量程:22 AWG to 16 AWG 系列:ATM 颜…

蓝桥杯真题Day48 倒计时7天 练了几道真题小程序+回溯剪枝应用一个小程序

[蓝桥杯 2023 省 A] 更小的数 题目描述 小蓝有一个长度均为 n 且仅由数字字符 0∼9 组成的字符串&#xff0c;下标从0到 n−1&#xff0c;你可以将其视作是一个具有n位的十进制数字num&#xff0c;小蓝可以从num 中选出一段连续的子串并将子串进行反转&#xff0c;最多反转一次…

RuleEngine规则引擎底层改造AviatorScript 之函数执行

https://gitee.com/aizuda/rule-engine-open 需求&#xff1a;使用上述开源框架进行改造&#xff0c;底层更换成AviatorScript &#xff0c;函数实现改造。 原本实现方式 Overridepublic Object run(ExecuteFunctionRequest executeTestRequest) {Integer functionId executeT…

5G如何助力物流智能化转型

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;你的老朋友&#xff0c;老K。行业群 新书《智能物流系统构成与技术实践》人俱乐部 整版PPT和更多学习资料&#xff0c;请球友到知识星球 【智能仓储物流技术研习社】自行下载 智能制造-话题精读 1、西门子…

移植内核linux-2.6.32.24遇见的问题和解决方法

目录 概述 1 配置编译环境 2 编译内核 2.1 配置内核 2.2 编译存在的问题 2.3 验证zImage 3 移植 yaffs2 3.1 下载yaffs2 3.2 为内核打上 yaffs2 补丁 3.3 配置和编译带 YAFFS2 支持的内核 3.3.1 配置 YAFFS2内核 3.3.2 编译带YAFFS2 支持的内核 3.4 验证带YAFFS2 支…

Mudbus协议CRC校验码C#

Mudbus协议CRC校验码C# 什么是modbus协议特点协议格式modbus-crc16校验原理方法帧校验CRC计算方法&#xff1a;例子 C#代码Demo源码下载 什么是modbus Modbus是一种串行通信协议&#xff0c;最初由Modicon&#xff08;目前属于施耐德电气公司&#xff09;于1979年开发 Modbus协…

机器学习知识点

1鸢尾花分类 鸢尾花分类问题是一个经典的机器学习问题&#xff0c;旨在根据鸢尾花的花萼长度、花萼宽度、花瓣长度和花瓣宽度等特征&#xff0c;将鸢尾花分成三个品种&#xff1a;山鸢尾&#xff08;setosa&#xff09;、变色鸢尾&#xff08;versicolor&#xff09;和维吉尼亚…

使用 proxySQL 来代理 Mysql

我有若干台云主机&#xff0c; 但是只有1个台vm 具有外部ip 而在另1台vm上我安装了1个mysql instance, 正常来讲&#xff0c; 我在家里的电脑是无法连接上这个mysql 尝试过用nginx 代理&#xff0c; 但是nginx只能代理http协议的&#xff0c; mysql 3306 并不是http协议 解决…

Leetcode面试经典150_Q14最长公共前缀

题目&#xff1a; 编写一个函数来查找字符串数组中的最长公共前缀。如果不存在公共前缀&#xff0c;返回空字符串 ""。 思路A&#xff1a;横向/纵向扫描 Python&#xff1a; class Solution:def longestCommonPrefix(self, strs: List[str]) -> str:s "…

Mac 每次重启终端都要重新配置mysql环境变量解决办法

1、问题 Mac 每次关闭终端后&#xff0c;mysql环境配置就失效了&#xff0c;需要重新配置mysql环境变量 2、解决方法 在 " ~/.zshrc "文件添加" source ~/.bash_profile "即可 vim ~/.zshrc source ~/.bash_profile 3、验证 退出终端后重新打开终端 mys…