在Go项目中二次封装Kafka客户端功能

1.摘要

在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。

2.功能结构组织

为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

kafka (目录)
  |
  ----- consumer.go   (消费者方法实现)
  |
  ----- producer.go   (生产者方法实现)
  |
  ----- kafka.go      (定义接口)
  |
  ----- kafka_test.go (单元功能测试)

为方便项目使用,在此基础上做了二次封装。

3.消费者实现

第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

type KafkaConsumer struct {
	Hosts    string          // Kafka主机IP:端口,例如:192.168.201.206:9092
	Ctopic   string          // topic名称
	Kchan    chan string     // 接收信息通道
	Consumer sarama.Consumer // 消费者对象
}

接下来是消费者初始化函数:

func (k *KafkaConsumer) kafkaInit() {
  // 定义配置选项 
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个消费对象
	consumer, err := sarama.NewConsumer(k.Hosts, config)
	if err != nil {
		err = errors.New("NewConsumer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 获取所有Topic
	topics, err := consumer.Topics()
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	
	// 判断是否有自定义的Topic
	var topicsName = ""
	for _, e := range topics {
		if e == k.Ctopic {
			topicsName = e
			break
		}
	}
	
	// 没有自定义的Topic则报错
	if topicsName == "" {
		err = errors.New("找不到topics内容")
		fmt.Println(err.Error())
		return
	}
	
	// 将消费对象保存到结构体以备后面使用
	k.Consumer = consumer
}

在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

接下来是消费监控函数实现,代码如下:

func (k *KafkaConsumer) kafkaProcess() {
	var wg sync.WaitGroup
	
	// 遍历指定Topic分区持续监控消息
	Partitions, _ := k.Consumer.Partitions(k.Ctopic)
	for _, subPartitions := range Partitions {
		pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)
		if err != nil {
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 这里进入另一个函数可以过滤消息内容
			k.processPartition(pc)
		}()
	}
	wg.Wait()
}

函数processPartition()的实现代码如下:

func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {
	defer pc.AsyncClose()
	for msg := range pc.Messages() {
	  // 这里可以过滤不需要的Topic的信息
		if strings.Contains(string(msg.Value), "group_state2") {
			continue
		}
		// 这里将获取到的Topic信息发送到通道
		k.Kchan <- string(msg.Value)
	}
}

4.生产者实现

为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

定义生产者的结构体如下:

type KafkaProducer struct {
	hosts         string               // Kafka主机
	sendmsg       string               // 消费方返回给生产方的消息
	ptopic        string               // Topic
	AsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
}

对应的生产者初始化函数实现如下:

func (k *KafkaProducer) kafkaInit() {
  // 定义配置参数
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个生产者对象
	producer, err := sarama.NewAsyncProducer(k.hosts, config)
	if err != nil {
		err = errors.New("NewAsyncProducer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 保存对象到结构体
	k.AsyncProducer = producer
}

给生产者回复信息的函数实现如下:

func (k *KafkaProducer) kafkaProcess() {
	msg := &sarama.ProducerMessage{
		Topic: k.ptopic,
	}
	// 信息编码
	msg.Value = sarama.ByteEncoder(k.sendmsg)
	
	// 将信息发送给通道
	k.AsyncProducer.Input() <- msg
}

5.接口定义实现

首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

// Kafka方法接口
type IKafkaMethod interface {
	kafkaInit()     // 初始化方法
	kafkaProcess()  // 执行方法
}

为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

// 接口管理结构体
type KafkaManager struct {
	kafkaMethod IKafkaMethod  // 接口对象
}

// 定义实现Set方法
func (km *KafkaManager) Set(m IKafkaMethod) {
	km.kafkaMethod = m  // 将指定的方法赋给接口
}

// 定义实现Run方法
func (km *KafkaManager) Run() {
  km.kafkaMethod.kafkaInit()
  go km.kafkaMethod.kafkaProcess()
}

最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

type KafkaMessager struct {
	KafkaManager  *KafkaManager   // 接口管理对象指针
	KafkaProducer *KafkaProducer  // 生产者对象指针
	KafkaConsumer *KafkaConsumer  // 消费者对象指针
	Hosts         string          // Kafka主机
	topic         string          // topic
}

// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
func NewKafkaMessager(hosts, topic string) *KafkaMessager {
	km := &KafkaMessager{
		KafkaManager:  new(KafkaManager),
		KafkaProducer: new(KafkaProducer),
		KafkaConsumer: new(KafkaConsumer),
		Hosts:         hosts,
		topic:         topic,
	}
	return km
}

6.功能调用和验证

在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

func TestKafka(t *testing.T) {
    ....
}

使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

下面是我模拟用户调用的客户端代码片段:

// 这里选择我自己搭建的Kafka所在服务器,Topic为test123
// 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
hosts := "192.168.201.206:9092"
topic := "test123"

// 调用初始化函数,并将上面的内容作为参数传进去
nkm := NewKafkaMessager(hosts, topic)

// 初始化消费者,当生产者发出消息,消费者自动消费
nkm.KafkaConsumer.Hosts = hosts             // 消费者host赋值
nkm.KafkaConsumer.Ctopic = topic            // 消费者topic赋值
nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
nkm.KafkaManager.Set(nkm.KafkaConsumer)     // 接口赋值,设置成操作消费者方法
nkm.KafkaManager.Run()                  // 执行消费者初始化方法


// 监听通道,接收生产客户端发过来的消息
recv := <- nkm.KafkaConsumer.Kchan
fmt.Println(recv)  // 打印接收到的消息

现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

--- END --

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/106917.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

禁止chrome浏览器更新方式

1、禁用更新服务 WinR调出运行&#xff0c;输入services.msc&#xff0c;进入服务。 在服务中有两个带有Google Update字样&#xff0c;双击打开后禁用&#xff0c;并把恢复选项设置为无操作。 2、删除计划任务 运行taskschd.msc&#xff0c;打开计划任务程序库&#xff0c;在…

FL Studio21水果编曲软件如何切换成官方中文版

FL studio又被国内网友称之为水果音乐制作软件21版本&#xff0c;是Image-Line公司成立23周年而发布的一个版本&#xff0c;FL studio中文版是目前互联网上最优秀的完整的软件音乐制作环境或数字音频工作站&#xff0c;FL Studio包含了编排&#xff0c;录制&#xff0c;编辑&am…

Leetcode刷题详解——点名

1. 题目链接&#xff1a;LCR 173. 点名 2. 题目描述&#xff1a; 某班级 n 位同学的学号为 0 ~ n-1。点名结果记录于升序数组 records。假定仅有一位同学缺席&#xff0c;请返回他的学号。 示例 1: 输入: records [0,1,2,3,5] 输出: 4示例 2: 输入: records [0, 1, 2, 3, 4,…

MATLAB中polyvalm函数用法

目录 语法 说明 示例 特征多项式的矩阵计算 polyvalm函数的功能是矩阵多项式计算。 语法 Y polyvalm(p,X) 说明 Y polyvalm(p,X) 以矩阵方式返回多项式 p 的计算值。此计算方式等同于使用多项式 p 替换矩阵 X。 示例 特征多项式的矩阵计算 求解 4 阶帕斯卡矩阵的特征…

Android-登录注册页面(第三次作业)

第三次作业 - 登录注册页面 题目要求 嵌套布局。使用线性布局的嵌套结构&#xff0c;实现登录注册的页面。&#xff08;例4-3&#xff09; 创建空的Activity 项目结构树如下图所示&#xff1a; 注意&#xff1a;MainActivity.java文件并为有任何操作&#xff0c;主要功能集中…

docker应用部署---Tomcat的部署配置

1. 搜索tomcat镜像 docker search tomcat2. 拉取tomcat镜像 docker pull tomcat3. 创建容器&#xff0c;设置端口映射、目录映射 # 在/root目录下创建tomcat目录用于存储tomcat数据信息 mkdir ~/tomcat cd ~/tomcatdocker run -id --namec_tomcat \ -p 8080:8080 \ -v $PWD:…

项目管理工具ConceptDraw PROJECT mac中文版自定义列功能

ConceptDraw PROJECT Mac是一款专业的项目管理工具&#xff0c;适用于MacOS平台。它提供了成功规划和执行项目所需的完整功能&#xff0c;包括任务和资源管理、报告和变更控制。 这款软件可以与ConceptDraw office集成&#xff0c;利用思维导图和数据可视化的强大功能来改进项目…

极米科技H6 Pro 4K、H6 4K高亮定焦版——开启家用投影4K普及时代

智能投影产业经过几年发展&#xff0c;市场规模正在快速扩大。洛图数据显示&#xff0c;预计今年中国投影出货量有望超700万台&#xff0c;2027年达950万台&#xff0c;可见智能投影产业规模将逐渐壮大&#xff0c;未来可期。2023年&#xff0c;投影行业呈现出全新面貌&#xf…

2016年亚太杯APMCM数学建模大赛A题基于光学信息数据的温度及关键元素含量预测求解全过程文档及程序

2016年亚太杯APMCM数学建模大赛 A题 基于光学信息数据的温度及关键元素含量预测 原题再现 光含有能量&#xff0c;在一定条件下可以转化为热。燃烧是一种常见的现象&#xff0c;既能发光又能发热。光和热通常是同时存在的&#xff0c;一般来说&#xff0c;光强度越高&#xf…

【ETL工具】Datax-ETL-SqlServerToHDFS

&#x1f984; 个人主页——&#x1f390;个人主页 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; 感谢点赞和关注 &#xff0c;每天进步一点点&#xff01;加油&#xff01;&…

ATV32变频器在堆垛机应用

一、机型介绍&#xff1a; 目前国内物流行业发展速度很快&#xff0c;特别是在自动仓库这一块&#xff0c;自动仓库用的最多是堆垛机&#xff0c;自动仓库目前驱动用得基本上变频器。品牌基本是丹佛斯、日系及其他等重载系列变频器。设备主要包括&#xff1a;提升机、货叉及行…

【鸿蒙软件开发】ArkTS基础组件之Gauge(环形图表)、LoadingProgress(动态加载)

文章目录 前言一、Gauge环形图表1.1 子组件1.2 接口参数介绍 1.2 属性1.3 示例代码二、LoadingProgress2.1 子组件2.2 接口2.3 属性2.4 示例代码 总结 前言 Gauge&#xff1a;数据量规图表组件&#xff0c;用于将数据展示为环形图表。 LoadingProgress&#xff1a;用于显示加载…

DevOps持续集成-Jenkins(2)

文章目录 DevOpsDevOps概述Integrate工具&#xff08;centos7-jenkins主机&#xff09;Integrate概述Jenkins介绍CI/CD介绍Linux下安装最新版本的Jenkins⭐Jenkins入门配置安装必备插件⭐安装插件&#xff08;方式一&#xff1a;可能有时会下载失败&#xff09;安装插件&#x…

Python 算法高级篇:归并排序的优化与外部排序

Python 算法高级篇&#xff1a;归并排序的优化与外部排序 引言 1. 归并排序的基本原理2. 归并排序的优化2.1 自底向上的归并排序2.2 最后优化 3. 外部排序4. 性能比较5. 结论 引言 在计算机科学中&#xff0c;排序是一项基本的任务&#xff0c;而归并排序&#xff08; Merge S…

MTK AEE_EXP调试方法及user版本打开方案

一、AEE介绍 AEE (Android Exception Engine)是安卓的一个异常捕获和调试信息生成机制。 手机发生错误(异常重启/卡死)时生成db文件(一种被加密过的二进制文件)用来保存和记录异常发生时候的全部内存信息,经过调试和仿真这些信息,能够追踪到异常的缘由。 二、调试方法…

【C++】Linux下如何查看opencv的版本

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

职业技术认证:《研发效能(DevOps)工程师》——开启职业发展新篇章

在互联网行业中&#xff0c;资质认证可以证明在该领域内的专业能力和知识水平。各种技术水平认证也是层出不穷&#xff0c;而考取具有公信力和权威性的认证是从业者的首选。同时&#xff0c;随着国内企业技术实力的提升和国家对于自主可控的重视程度不断提高&#xff0c;国产证…

buuctf_练[CSCCTF 2019 Qual]FlaskLight

[CSCCTF 2019 Qual]FlaskLight 文章目录 [CSCCTF 2019 Qual]FlaskLight掌握知识解题思路关键paylaod 掌握知识 内置函数的过滤&#xff0c;globals变量的过滤&#xff0c;调用内部变量或函数的OS函数进行命令执行 解题思路 打开题目链接&#xff0c;很明显看标题和内容是fla…

代码随想录算法训练营第三十五天丨 贪心算法part06

738.单调递增的数字 思路 暴力解法 题意很简单&#xff0c;那么首先想的就是暴力解法了【超时】。 贪心算法 题目要求小于等于N的最大单调递增的整数&#xff0c;那么拿一个两位的数字来举例。 例如&#xff1a;98&#xff0c;一旦出现strNum[i - 1] > strNum[i]的情况…