Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"

int main()
{
	std::string brokers = "172.18.4.96:9092";
	std::string errorStr;
	RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	if (!conf) {
		std::cout << "Create RdKafka Conf failed" << std::endl;
		return -1;
	}

	conf->set("message.max.bytes", "10240000", errorStr); //最大字节数
	conf->set("replica.fetch.max.bytes", "20485760", errorStr);
	conf->set("bootstrap.servers", brokers, errorStr);

	RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);
	if (!producer) {
		std::cout << "Create Producer failed" << std::endl;
		return -1;
	}
	//创建Topic
	RdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);
	if (!topic) {
		std::cout << "Create Topic failed" << std::endl;
	}
	int count = 0;
	while (true)
	{   //发送消息
		RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);
		std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;
		if (resCode != RdKafka::ERR_NO_ERROR) {
			std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;
		}
		count += 1;
		std::this_thread::sleep_for(std::chrono::seconds(1));
	}

	delete conf;
	delete tconf;
	delete topic;
	delete producer;

	RdKafka::wait_destroyed(5000);
	return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>

void consume_cb(RdKafka::Message &message, void *opaque)
{
	switch (message.err()) {
	case RdKafka::ERR__TIMED_OUT:
		std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;
		break;
	case RdKafka::ERR_NO_ERROR:
		/* Real message */

		RdKafka::MessageTimestamp ts;
		ts = message.timestamp();
		if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
			std::string timeprefix;
			if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
				timeprefix = "created time";
			}
			else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
				timeprefix = "log append time";
			}


			unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改
			auto mTime = std::chrono::milliseconds(milli);
			auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);
			auto tt = std::chrono::system_clock::to_time_t(tp);
			tm timeinfo;
			::gmtime_s(&timeinfo, &tt);

			//char s[60]{ 0 };
			//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);
			// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0
			std::stringstream ss;
			std::string dateStr;

			ss << timeinfo.tm_year + 1900 << "-"
				<< timeinfo.tm_mon + 1 << "-"
				<< timeinfo.tm_mday;
			ss >> dateStr;

			ss.clear();
			ss << timeinfo.tm_hour << ":"
				<< timeinfo.tm_min << ":"
				<< timeinfo.tm_sec;
			std::string timeStr;
			ss >> timeStr;

			std::string dateTimeStr;
			dateTimeStr += dateStr;
			dateTimeStr.push_back(' ');
			dateTimeStr += timeStr;
#endif // 0

			//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;
			std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
		}

		std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;
		break;

	case RdKafka::ERR__PARTITION_EOF:
		/* Last message */
		std::cout << "EOF reached for" << std::endl;
		break;

	case RdKafka::ERR__UNKNOWN_TOPIC:
	case RdKafka::ERR__UNKNOWN_PARTITION:
		std::cout << "Consume failed: " << message.errstr();
		break;

	default:
		/* Errors */
		std::cout << "Consume failed: " << message.errstr();
		break;
	}
}
int main()
{
	std::string brokers = "172.18.4.96:9092";
	std::string errstr;
	std::vector<std::string> topics{ "koala-stqf-03",
		"klai-seim-alert-koala-test-03"
	};
	std::string group_id = "whl-consumer-group";

	RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	if (conf->set("group.id", group_id, errstr)) {
		std::cout << errstr << std::endl;
		return -1;
	}

	conf->set("bootstrap.servers", brokers, errstr);
	conf->set("max.partition.fetch.bytes", "1024000", errstr);
	//conf->set("enable-auto-commit", "true", errstr);
	RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	tconf->set("auto.offset.reset", "latest", errstr);
	conf->set("default_topic_conf", tconf, errstr);

	RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
	if (!m_consumer) {
		std::cout << "failed to create consumer " << errstr << std::endl;
		return -1;
	}

#if 0 //从上一次消费结束的位置开始消费
	RdKafka::ErrorCode err = m_consumer->subscribe(topics);
	if (err != RdKafka::ERR_NO_ERROR) {
		std::cout << RdKafka::err2str(err) << std::endl;
		return -1;
	}
#else //指定每个topic的每个分区开始消费的位置

	//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费
	RdKafka::Metadata *metadataMap{ nullptr };
	RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);
	if (err != RdKafka::ERR_NO_ERROR) {
		std::cout << RdKafka::err2str(err) << std::endl;
	}
	const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();
	std::cout << "broker topic size: " << topicList->size() << std::endl;
	RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;
	std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {
		return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();
	});
	std::vector<RdKafka::TopicPartition*> topicpartions;
	std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {
		auto parVec = data->partitions();
		std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {
			std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;
			topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));
		});
	});
	m_consumer->assign(topicpartions);
#endif // 0
	RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);
	if (errccc != RdKafka::ERR_NO_ERROR) {
		std::cout << RdKafka::err2str(errccc) << std::endl;
		return -1;
	}

	while (true)
	{
		RdKafka::Message *msg = m_consumer->consume(6000);
		consume_cb(*msg, nullptr); //消息一条消息
		delete msg;
	}
	return 0;
}

6、发布 订阅展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/977401.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Deepseek引爆AI热潮 防静电地板如何守护数据中心安全

近期&#xff0c;Deepseek的爆火将人工智能推向了新的高度&#xff0c;也引发了人们对AI背后基础设施的关注。作为AI运行的“大脑”&#xff0c;数据中心承载着海量数据的存储、处理和传输&#xff0c;其安全稳定运行至关重要。而在这背后&#xff0c;防静电地板扮演着不可或缺…

Spring框架基本使用(Maven详解)

前言&#xff1a; 当我们创建项目的时候&#xff0c;第一步少不了搭建环境的相关准备工作。 那么如果想让我们的项目做起来方便快捷&#xff0c;应该引入更多的管理工具&#xff0c;帮我们管理。 Maven的出现帮我们大大解决了管理的难题&#xff01;&#xff01; Maven&#xf…

QSplashScreen --软件启动前的交互

目录 QSplashScreen 类介绍 使用方式 项目中使用 THPrinterSplashScreen头文件 THPrinterSplashScreen实现代码 使用代码 使用效果 QSplashScreen 类介绍 QSplashScreen 是 Qt 中的一个类&#xff0c;用于显示启动画面。它通常在应用程序启动时显示&#xff0c;以向用户显…

【Vscode 使用】集合1

一、使用make工具管理工程 windows下&#xff0c;下载mingw64&#xff0c;配置好mingw64\bin 为 Win10系统全局变量后。 在mingw64/bin目录下找到mingw32-make.exe工具。复制一份改名为&#xff1a;make.exe&#xff0c;没错&#xff0c;就是那么简单&#xff0c;mingw64自带m…

PHP-create_function

[题目信息]&#xff1a; 题目名称题目难度PHP-create_function2 [题目考点]&#xff1a; create_function ( string args , string args , string code )[Flag格式]: SangFor{wWx5dEGHHhDUwmST4bpXwfjSzq43I6cz}[环境部署]&#xff1a; docker-compose.yml文件或者docker …

golang内存泄漏

golang也用了好几年了&#xff0c;趁着有空 整理归纳下&#xff0c;以后忘了好看下 一般认为 Go 10次内存泄漏&#xff0c;8次goroutine泄漏&#xff0c;1次是真正内存泄漏&#xff0c;还有1次是cgo导致的内存泄漏 1:环境 go1.20 win10 2:goroutine泄漏 单个Goroutine占用内存&…

Python Seaborn库使用指南:从入门到精通

1. 引言 Seaborn 是基于 Matplotlib 的高级数据可视化库,专为统计图表设计。它提供了更简洁的 API 和更美观的默认样式,能够轻松生成复杂的统计图表。Seaborn 在数据分析、机器学习和科学计算领域中被广泛使用。 本文将详细介绍 Seaborn 的基本概念、常用功能以及高级用法,…

修改与 Git 相关的邮箱

要修改与 Git 相关的邮箱信息&#xff0c;需要区分以下两种情况&#xff1a; 1. 修改 Git 提交时使用的邮箱&#xff08;影响提交记录&#xff09; Git 提交记录中的邮箱由本地 Git 配置的 user.email 决定&#xff0c;与 SSH 密钥无关。修改方法如下&#xff1a; 全局修改&a…

用PyTorch从零构建 DeepSeek R1:模型架构和分步训练详解

DeepSeek R1 的完整训练流程核心在于&#xff0c;在其基础模型 DeepSeek V3 之上&#xff0c;运用了多种强化学习策略。 本文将从一个可本地运行的基础模型起步&#xff0c;并参照其技术报告&#xff0c;完全从零开始构建 DeepSeek R1&#xff0c;理论结合实践&#xff0c;逐步…

基于SpringBoot的“流浪动物救助系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“流浪动物救助系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统功能结构图 局部E-R图 系统首页界面 系统…

从零开始玩转TensorFlow:小明的机器学习故事 5

图像识别的挑战 1 故事引入&#xff1a;小明的“图像识别”大赛 小明从学校里听说了一个有趣的比赛&#xff1a;“美食图像识别”。参赛者需要训练计算机&#xff0c;看一张食物照片&#xff08;例如披萨、苹果、汉堡等&#xff09;&#xff0c;就能猜出这是什么食物。听起来…

学习笔记--电磁兼容性EMC

一、基本概念 电磁兼容性&#xff08;Electromagnetic Compatibility&#xff0c;EMC&#xff09;是电子电气设备在特定电磁环境中正常工作的能力&#xff0c;同时不会对其他设备产生不可接受的电磁干扰。其核心目标是确保设备在共享的电磁环境中既能抵抗干扰&#xff0c;又能避…

unity学习51:所有UI的父物体:canvas画布

目录 1 下载资源 1.1 在window / Asset store下下载一套免费的UI资源 1.2 下载&#xff0c;导入import 1.3 导入后在 project / Asset下面可以看到 2 画布canvas&#xff0c;UI的父物体 2.1 创建canvas 2.1.1 画布的下面是 event system是UI相关的事件系统 2.2 canvas…

ArcGIS Pro中创建最低成本路径的详尽教程

一、引言 在地理信息系统&#xff08;GIS&#xff09;的应用场景中&#xff0c;路径分析扮演着至关重要的角色。而最低成本路径分析&#xff0c;则是路径分析中的一种高级应用&#xff0c;它综合考虑了地形、植被、土地利用类型等多种因素&#xff0c;通过加权计算得出一条从起…

地铁站内导航系统:基于蓝牙Beacon与AR技术的动态路径规划技术深度剖析

本文旨在分享一套地铁站内导航系统技术方案&#xff0c;通过蓝牙Beacon技术与AI算法的结合&#xff0c;解决传统导航定位不准确、路径规划不合理等问题&#xff0c;提升乘客出行体验&#xff0c;同时为地铁运营商提供数据支持与增值服务。 如需获取校地铁站内智能导航系统方案文…

在VSCode中接入deepseek

注册就送14元2000万tokens。 https://cloud.siliconflow.cn/i/rnbA6i6U各种大模型 下面介绍我是如如接入vscode的 左边生成一个key&#xff0c;呆会vscode要用&#xff0c;不然401. 打开vscod&#xff0c;电脑能上网。下插件。 下好要配置 点它一下。 要配置&#xff0c;全…

【Java项目】基于Spring Boot的简历系统

【Java项目】基于Spring Boot的简历系统 技术简介&#xff1a;采用Spring Boot框架、Java技术、MySQL数据库等实现。 系统简介&#xff1a;系统主要实现了管理员模块、用户模块二大部分。管理员登录进入简历系统可以查看首页、个人中心、用户管理、简历模板管理、模板类型管理、…

汽车零部件工厂如何通过ESD监控系统闸机提升产品质量

在汽车零部件工厂的生产过程中&#xff0c;静电带来的危害不容小觑。从精密的电子元件到复杂的机械部件&#xff0c;静电都可能成为影响产品质量的 “隐形杀手”。而 ESD 监控系统闸机的出现&#xff0c;为汽车零部件工厂解决静电问题、提升产品质量提供了关键的技术支持。 一、…

记录:Docker 安装记录

今天在安装 ollama 时发现无法指定安装目录&#xff0c;而且它的命令行反馈内容很像 docker &#xff0c;而且它下载的模型也是放在 C 盘&#xff0c;那么如果我 C 盘空间不足&#xff0c;就装不了 deepseek-r1:70b &#xff0c;于是想起来之前安装 Docker 的时候也遇到过类似问…

DPVS-5: 后端服务监控原理与测试

后端监控原理 被动监测 DPVS自带了被动监控&#xff0c;通过监控后端服务对外部请求的响应情况&#xff0c;判断服务器是否可用。 DPVS的被动监测&#xff0c;并不能获取后端服务器的详细情况&#xff0c;仅仅通过丢包/拒绝情况来发觉后端服务是否可用。 TCP session state…