go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:
    在这里插入图片描述
    在这里插入图片描述
  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo

require (
	github.com/Shopify/sarama v1.19
)

go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy
在这里插入图片描述
等待命令运行完毕,打开.mod文件,看到如下内容就OK了:
在这里插入图片描述

利用sarama向Kafka发送消息(消息的生产)

代码

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()                              //创建config实例
	config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
	config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回

	//创建信息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web.log"
	msg.Value = sarama.StringEncoder("this is a test log")

	//连接KafKa
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()

	//发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed,err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
    zkServer
    
    在这里插入图片描述
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述
最后运行程序即可,输出结果为:
在这里插入图片描述

补充:消息的消费

代码

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"time"
)

func main() {
	customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Println("failed init customer,err:", err)
		return
	}

	partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
	if err != nil {
		fmt.Println("failed get partition list,err:", err)
		return
	}

	fmt.Println("partitions:", partitionlist)

	for partition := range partitionlist { // 遍历所有分区
		//根据消费者对象创建一个分区对象
		pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Println("failed get partition consumer,err:", err)
			return
		}
		defer pc.Close() // 移动到这里

		go func(consumer sarama.PartitionConsumer) {
			defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
		time.Sleep(time.Second * 10)
	}
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/552824.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32芯片flash被锁导致Error Flash Download failed Cortex-M4,解决办法(全)亲测有效

STM32芯片flash被锁导致Error: Flash Download failed - "Cortex-M4",解决办法(全)亲测有效🤩! 方法1:由于Keil 中debug的仿真器配置出错导致的下载失败(这种问题虽然是低级错误&…

友思特应用 | 红外视角的延伸:短波红外相机的机器视觉应用

导读 短波红外SWIR在不同波段针对不同材料的独特成像特征为各领域检测应用的拓宽提供了基础。本文将展现短波红外成像技术在水分检测、塑料检测、太阳能电池板检查和矿场开采等领域的丰富应用案例,讨论短波红外相机在未来的发展方向。 SWIR 背景简介 短波红外 &am…

基于SpringBoot+Vue的IT技术交流平台(源码+文档+包运行)

一.系统概述 我国科学技术的不断发展,计算机的应用日渐成熟,其强大的功能给人们留下深刻的印象,它已经应用到了人类社会的各个层次的领域,发挥着重要的不可替换的作用。信息管理作为计算机应用的一部分,使用计算机进行…

PHP货运搬家/拉货小程序二开源码搭建的功能

运搬家/拉货小程序的二次开发可以添加许多功能,以增强用户体验和提高业务效率。以下是一些可能的功能: 用户端功能: 注册登录:允许用户创建个人账户并登录以使用应用程序。货物发布:允许用户发布他们需要搬运的货物信息…

OpenHarmony实战开发-如何通过分割swiper区域,实现指示器导航点位于swiper下方的效果。

介绍 本示例介绍通过分割swiper区域,实现指示器导航点位于swiper下方的效果。 效果预览图 使用说明 1.加载完成后swiper指示器导航点,位于显示内容下方。 实现思路 1.将swiper区域分割为两块区域,上方为内容区域,下方为空白区…

HAL STM32 I2C方式读取MT6701磁编码器获取角度例程

HAL STM32 I2C方式读取MT6701磁编码器获取角度例程 📍相关篇《Arduino通过I2C驱动MT6701磁编码器并读取角度数据》🎈《STM32 软件I2C方式读取MT6701磁编码器获取角度例程》📌MT6701当前最新文档资料:https://www.magntek.com.cn/u…

生产服务器变卡怎么排查

服务器变卡怎么排查,可以从以下四个方面去考虑 生产服务器变卡怎么排查 1、网络2、cpu的利用率3、io效率4、内存瓶颈 1、网络 可以使用netstat、iftop等工具查看网络流量和网络连接情况,检查是否网络堵塞、丢包等问题 2、cpu的利用率 1、用top命令定…

VMWare Ubuntu压缩虚拟磁盘

VMWare中ubuntu会越用越大,直到占满预分配的空间 即使系统里没有那么多东西 命令清理 开机->open Terminal sudo vmware-toolbox-cmd disk shrink /关机-> 编辑虚拟机设置->硬盘->碎片整理&压缩 磁盘应用 开机->disk usage analyzer(应用) …

【LLM】认识LLM

文章目录 1.LLM1.1 LLM简介1.2 LLM发展1.3 市面常见的LLM1.4 LLM涌现的能力 2.RAG2.1 RAG简介2.2 RAG 的工作流程2.3 RAG 和 Finetune 对比2.4 RAG的使用场景分析 3. LangChain3.1 LangChain简介3.2 LangChain的核心组件3.3 LangChain 入门 4.开发 RAG 应用的整体流程5. 环境配…

stm32中的中断优先级

在工作中使用到多个定时器中断,由于中断的中断优先级不熟悉导致出错,下面来写一下中断的一些注意事项。 一、中断的分类 1、EXTI外部中断:由外部设备或外部信号引发,例如按键按下、外部传感器信号变化等。外部中断用于响应外部事件,并及时处理相关任务。 2、内部中断:…

搭建Zookeeper完全分布式集群(CentOS 9 )

ZooKeeper是一个开源的分布式协调服务,它为分布式应用提供了高效且可靠的分布式协调服务,并且是分布式应用保证数据一致性的解决方案。该项目由雅虎公司创建,是Google Chubby的开源实现。 分布式应用可以基于ZooKeeper实现诸如数据发布/订阅…

Jmeter 测试-跨线程调用变量

1、Jmeter中线程运行规则 ①各个线程组是完全独立的,每个线程组是不同的业务,互不影响 ②线程组中的每个线程也是完全独立 ③线程组中的每个线程,都是从上往下执行,完成一轮循环后,继续下一轮循环 ④存在业务流或者…

考察自动化立体库应注意的几点

导语 大家好,我是智能仓储物流技术研习社的社长,老K。专注分享智能仓储物流技术、智能制造等内容。 整版PPT和更多学习资料,请球友到知识星球 【智能仓储物流技术研习社】自行下载 考察自动化立体仓库的关键因素: 仓库容量&#x…

python爬虫之爬取微博评论(4)

一、获取单页评论 随机选取一个微博,例如下面这个 【#出操死亡女生家属... - 冷暖视频的微博 - 微博 (weibo.com) 1、fnf12,然后点击网络,搜索评论内容,然后预览,就可以查看到网页内容里面还有评论内容 2、编写代码…

稀碎从零算法笔记Day51-LeetCode:最小路径和

题型:DP、数组、矩阵 链接:64. 最小路径和 - 力扣(LeetCode) 来源:LeetCode 题目描述 给定一个包含非负整数的 m x n 网格 grid ,请找出一条从左上角到右下角的路径,使得路径上的数字总和为…

适用于Windows电脑的最佳数据恢复软件是哪些?10佳数据恢复软件

丢失我们系统中可用的宝贵信息是很烦人的。我们可以尝试几种手动方法来重新获取丢失的数据。然而,当我们采用非自动方法来恢复数据时,这是一项令人厌烦和乏味的工作。在这种情况下,我们可以尝试使用一些正版硬盘恢复软件进行数据恢复。此页面…

Dual-AMN论文阅读

Boosting the Speed of Entity Alignment 10: Dual Attention Matching Network with Normalized Hard Sample Mining 将实体对齐速度提高 10 倍:具有归一化硬样本挖掘的双重注意力匹配网络 ABSTRACT 寻找多源知识图谱(KG)中的等效实体是知识图谱集成的关键步骤&…

TRIZ理论下攀爬机器人的创新设计与研究

随着科技的飞速发展,机器人技术已广泛应用于各个领域。特别是在复杂环境下的作业,如灾难救援、太空探测等,对机器人的移动能力和适应性提出了更高要求。在这样的背景下,基于TRIZ理论的攀爬机器人设计与研究应运而生,它…

分类算法——朴素贝叶斯(四)

概率基础 1概率定义 概率定义为一件事情发生的可能性 扔出一个硬币,结果头像朝上 P(X):取值在[0,1] 2女神是否喜欢计算案例 在讲这两个概率之前我们通过一个例子,来计算一些结果: 问题如下: 1、女神喜欢…

Python pyglet制作彩色圆圈“连连看”游戏

原文链接: Python 一步一步教你用pyglet制作“彩色方块连连看”游戏(续)-CSDN博客文章浏览阅读1.6k次,点赞75次,收藏55次。上期讲到相同的色块连接,链接见: Python 一步一步教你用pyglet制作“彩色方块连连看”游戏-…