Go操作各大消息队列教程(RabbitMQ、Kafka)

Go操作各大消息队列教程

1 RabbitMQ

1.1 概念

①基本名词

当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。

在这里插入图片描述

  1. Broker:表示消息队列服务实体
  2. Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
    • AMQP(Advanced Message Queuing Protocol)高级消息队列协议
  3. Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

②常见模式

1. simple简单模式

在这里插入图片描述

消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)

2. worker工作模式

在这里插入图片描述

多个消费者从一个队列中争抢消息

  • (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  • 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
3. publish/subscribe发布订阅(共享资源)

在这里插入图片描述

消费者订阅消息,然后从订阅的队列中获取消息进行消费。

  • X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
  • 相关场景:邮件群发,群聊天,广播(广告)
4. routing路由模式

在这里插入图片描述

  • 交换机根据路由规则,将消息路由到不同的队列中
  • 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
5. topic主题模式(路由模式的一种)

在这里插入图片描述

  • 星号井号代表通配符
  • 星号代表多个单词,井号代表一个单词
  • 路由功能添加模糊匹配
  • 消息产生者产生消息,把消息交给交换机
  • 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

1.2 搭建(docker方式)

①拉取镜像

# 拉取镜像
docker pull rabbitmq:3.7-management

②创建并启动容器

# 创建并运行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是项目中连接rabbitmq的端口(我这里映射的是5672),15672是rabbitmq的web管理界面端口(我映射为15672)

# 输入网址http://ip:15672即可进入rabbitmq的web管理页面,账户密码:guest / guest

③web界面创建用户和virtual host

在这里插入图片描述

下面为了我们后续的操作,首先我们新建一个Virtual Host并且给他分配一个用户名,用来隔离数据,根据自己需要自行创建

  1. 新增virtual host
    在这里插入图片描述
  2. 新增用户
    在这里插入图片描述
  3. 点击新建好的用户,设置其host
    在这里插入图片描述
    在这里插入图片描述
  4. 最终效果
    在这里插入图片描述

1.3 代码操作

①RabbitMQ struct:包含创建、消费、生产消息

package RabbitMQ

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

//amqp:// 账号 密码@地址:端口号/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"

type RabbitMQ struct {
	//连接
	conn *amqp.Connection
	//管道
	channel *amqp.Channel
	//队列名称
	QueueName string
	//交换机
	Exchange string
	//key Simple模式 几乎用不到
	Key string
	//连接信息
	Mqurl string
}

//创建RabbitMQ结构体实例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
	var err error
	//创建rabbitmq连接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "创建连接错误!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "获取channel失败")
	return rabbitmq
}

//断开channel和connection
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s:%s", message, err))
	}
}

//简单模式step:1。创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

//订阅模式创建rabbitmq实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
	//创建rabbitmq实例
	rabbitmq := NewRabbitMQ("", exchangeName, "")
	var err error
	//获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
	//获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel!")
	return rabbitmq
}

//订阅模式生成
func (r *RabbitMQ) PublishPub(message string) {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 广播类型
		"fanout",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")

	//2 发送消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false,
		false,
		amqp.Publishing{
			//类型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub() {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 广播类型
		"fanout",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2试探性创建队列,创建队列
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//绑定队列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,这里的key要为空
		"",
		r.Exchange,
		false,
		nil,
	)
	//消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出请按 Ctrl+C")
	<-forever
}

//话题模式 创建RabbitMQ实例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
	//创建rabbitmq实例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//话题模式发送信息
func (r *RabbitMQ) PublishTopic(message string) {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 话题模式
		"topic",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "topic failed to declare an excha"+"nge")
	//2发送信息
	err = r.channel.Publish(
		r.Exchange,
		//要设置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//类型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//话题模式接收信息
//要注意key
//其中* 用于匹配一个单词,#用于匹配多个单词(可以是零个)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 话题模式
		"topic",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an exchange")
	//2试探性创建队列,创建队列
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//绑定队列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,这里的key要为空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出请按 Ctrl+C")
	<-forever
}

//路由模式 创建RabbitMQ实例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
	//创建rabbitmq实例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//路由模式发送信息
func (r *RabbitMQ) PublishRouting(message string) {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 广播类型
		"direct",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//发送信息
	err = r.channel.Publish(
		r.Exchange,
		//要设置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//类型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
	//尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		//交换机名称
		r.Exchange,
		//交换机类型 广播类型
		"direct",
		//是否持久化
		true,
		//是否字段删除
		false,
		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2试探性创建队列,创建队列
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//绑定队列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,这里的key要为空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出请按 Ctrl+C")
	<-forever
}

//简单模式Step:2、简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
	//1、申请队列,如果队列存在就跳过,不存在创建
	//优点:保证队列存在,消息能发送到队列中
	_, err := r.channel.QueueDeclare(
		//队列名称
		r.QueueName,
		//是否持久化
		false,
		//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
		false,
		//是否具有排他性 true表示自己可见 其他用户不能访问
		false,
		//是否阻塞 true表示要等待服务器的响应
		false,
		//额外数据
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}

	//2.发送消息到队列中
	r.channel.Publish(
		//默认的Exchange交换机是default,类型是direct直接类型
		r.Exchange,
		//要赋值的队列名称
		r.QueueName,
		//如果为true,根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者
		false,
		//如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息还给发送者
		false,
		//消息
		amqp.Publishing{
			//类型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

func (r *RabbitMQ) ConsumeSimple() {
	//1、申请队列,如果队列存在就跳过,不存在创建
	//优点:保证队列存在,消息能发送到队列中
	_, err := r.channel.QueueDeclare(
		//队列名称
		r.QueueName,
		//是否持久化
		false,
		//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//额外数据
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用来区分多个消费者
		"",
		//是否自动应答
		true,
		//是否具有排他性
		false,
		//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
		false,
		//队列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//启用协程处理
	go func() {
		for d := range msgs {
			//实现我们要处理的逻辑函数
			log.Printf("Received a message:%s", d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}

func (r *RabbitMQ) ConsumeWorker(consumerName string) {
	//1、申请队列,如果队列存在就跳过,不存在创建
	//优点:保证队列存在,消息能发送到队列中
	_, err := r.channel.QueueDeclare(
		//队列名称
		r.QueueName,
		//是否持久化
		false,
		//是否为自动删除 当最后一个消费者断开连接之后,是否把消息从队列中删除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//额外数据
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用来区分多个消费者
		consumerName,
		//是否自动应答
		true,
		//是否具有排他性
		false,
		//如果设置为true,表示不能同一个connection中发送的消息传递给这个connection中的消费者
		false,
		//队列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//启用协程处理
	go func() {
		for d := range msgs {
			//实现我们要处理的逻辑函数
			log.Printf("%s Received a message:%s", consumerName, d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}

②测试代码

1. simple简单模式

consumer.go

func main() {
	//消费者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	rabbitmq.ConsumeSimple()
}

producer.go

func main() {
	//Simple模式 生产者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	for i := 0; i < 5; i++ {
		time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}
2. worker模式

consumer.go

func main() {
	/*
		worker模式无非就是多个消费者去同一个队列中消费消息
	*/
	//消费者1
	rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	go rabbitmq1.ConsumeWorker("consumer1")
	//消费者2
	rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	rabbitmq2.ConsumeWorker("consumer2")
}

producer.go

func main() {
	//Worker模式 生产者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	for i := 0; i < 100; i++ {
		//time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}
3. publish/subscribe模式

consumer.go:

func main() {
	//消费者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	rabbitmq.RecieveSub()
}

producer.go

func main() {
	//订阅模式发送者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	for i := 0; i <= 20; i++ {
		rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
		fmt.Println(i)
		time.Sleep(1 * time.Second)
	}
}
4. router模式

consumer.go

func main() {
	//消费者
	rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	rabbitmq.RecieveRouting()
}

producer.go

func main() {
	//路由模式生产者
	imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
		imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}
5. topic模式

consumer.go

func main() {
	/*
		星号井号代表通配符
		星号代表多个单词,井号代表一个单词
		路由功能添加模糊匹配
		消息产生者产生消息,把消息交给交换机
		交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
	*/
	//Topic消费者
	//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
	rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
	rabbitmq.RecieveTopic()
}

producer.go

func main() {
	//Topic模式生产者
	imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
	imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
		imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}

2 Kafka

2.1 基本概念

在这里插入图片描述
Kafka是分布式的,其所有的构件borker(server服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
producer给broker发送数据,这些消息会存到kafka server里,然后consumer再向kafka server发起请求去消费这些数据。
kafka server在这个过程中像是一个帮你保管数据的中间商。所以kafka服务器也可以叫做broker(broker直接翻译可以是中间人或者经纪人的意思)。

在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡

参考:https://blog.csdn.net/lingfy1234/article/details/122900348

  • 应用场景
    • 监控
    • 消息队列
    • 流处理
    • 日志聚合
    • 持久性日志
  • 基础概念
    • topic:话题
    • broker:kafka服务集群,已发布的消息保存在一组服务器中,称之为kafka集群。集群中的每一个服务器都是一个代理(broker)
    • partition:分区,topic物理上的分组
    • message:消息,每个producer可以向一个topic主题发布一些消息

在这里插入图片描述
1.⽣产者从Kafka集群获取分区leader信息
2.⽣产者将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader拉取消息数据
5.follower将消息写入本地磁盘后向leader发送ACK
6.leader收到所有的follower的ACK之后向生产者发送ACK

2.2 常见模式

①点对点模式:火车站出租车抢客

发送者将消息发送到消息队列中,消费者去消费,如果消费者有多个,他们会竞争地消费,也就是说对于某一条消息,只有一个消费者能“抢“到它。类似于火车站门口的出租车抢客的场景。

在这里插入图片描述

②发布订阅模式:组间无竞争,组内有竞争

消费者订阅对应的topic(主题),只有订阅了对应topic消费者的才会接收到消息。

例如:

  • 牛奶有很多种,光明牛奶,希望牛奶等,只有你订阅了光明牛奶,送奶工才会把光明牛奶送到对应位置,你也才会有机会消费这个牛奶

注意:为了提高消费者的消费能力,kafka中引入了消费者组的概念。相当于是:不同消费者组之间因为订阅的topic不同,不会有竞争关系。但是消费者组内是有竞争关系。

例如:

  • 成都、厦门的出租车司机分别组成各自的消费者组。
  • 成都的出租车司机只拉成都的人,厦门的只拉厦门的人。(因此他们两个消费者组不是竞争关系)
  • 成都市内的出租车司机之间是竞争关系。(消费者组内是竞争关系)

2.3 docker-compose部署

 vim docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:6.2.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      #KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip,例如我本地mac的ip为192.168.0.101
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
# 进入到docker-compose.yml所在目录,执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps 

在这里插入图片描述

2.4 代码操作

# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092

# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181

在这里插入图片描述

①producer.go

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// 基于sarama第三方库开发的kafka client

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

②consumer.go

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// kafka consumer

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)
	}
	//演示时使用
	select {}
}

③运行效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/95727.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java生成二维码(前后端分离项目实战)

&#x1f4cd; 本文代码已放置 github&#xff1a;Mr-Write/SpringbootDemo: 各种demo案例 (github.com) 文章目录 1.ZXing1.1 概念1.2 ZXing 相关依赖1.3 zxing常用API&#x1f340; EncodeHintType&#xff08;编码提示类型&#xff09;&#x1f340; MultiFormatWriter&…

Android Activity启动过程一:从Intent到Activity创建

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、概览二、应用内启动源码流程 (startActivity)2.1 startActivit…

财务数据分析?奥威BI数据可视化工具很擅长

BI数据可视化工具通常是可以用户各行各业&#xff0c;用于不同主题的数据可视化分析&#xff0c;但面对财务数据分析这块难啃的骨头&#xff0c;能够好好地完成的&#xff0c;还真不多。接下来要介绍的这款BI数据可视化工具不仅拥有内存行列计算模型这样的智能财务指标计算功能…

我们的第一个 Qt 窗口程序

Qt 入门实战教程&#xff08;目录&#xff09; Windows Qt 5.12.10下载与安装 为何使用Qt Creator开发QT 本文介绍用Qt自带的集成开发工具Qt Creator创建Qt默认的窗口程序。 本文不需要你另外安装Visual Studio 2022这样的集成开发环境&#xff0c;也不需要你再在Visual St…

精准高效农业作业,植保无人机显身手

中国作为农业大国&#xff0c;拥有约18亿亩的农田&#xff0c;每年都需要进行种子喷洒和农药施用等农业作业&#xff0c;对于普通农户来说&#xff0c;这是一项耗时耗力的工程&#xff0c;同时&#xff0c;人工喷洒农药极易造成农药慢性中毒&#xff0c;对农民的身体健康产生极…

摄像头的调用和视频识别

CV_tutorial3 摄像头调用实时播放保存视频 运动目标识别帧差法背景减除法 摄像头调用 创建视频捕捉对象&#xff1a;cv2.VideoCapture() 参数为视频设备的索引号&#xff0c;就一个摄像投的话写0默认&#xff1b; 或者是指定要读取视频的路径。 实时播放 import cv2 import …

基于亚马逊云科技无服务器服务快速搭建电商平台——部署篇

受疫情影响消费者习惯发生改变&#xff0c;刺激了全球电商行业的快速发展。除了依托第三方电商平台将产品销售给消费者之外&#xff0c;企业通过品牌官网或者自有电商平台销售商品也是近几年电商领域快速发展的商业模式。独立站电商模式可以进行多方面、全渠道的互联网市场拓展…

clickhouse 系列2:clickhouse 离线安装

1.下载rpm包 Altinity/clickhouse - Packages packagecloud 使用wget下载到本地目录 wget --content-disposition https://packagecloud.io/Altinity/clickhouse/packages/el/7/clickhouse-common-static-20.8.3.18-1.el7.x86_64.rpm/download.rpm wget

如何通过四个步骤清理网络防火墙规则

组织必须确保适当的安全策略到位&#xff0c;以保护其投资并优化其安全有效性。然而&#xff0c;随着网络的扩展和复杂性的增加&#xff0c;网络运营团队面临着管理来自多个供应商的大量防火墙和网络设备的挑战。他们必须解决分散的基础设施、职能孤岛、人员配置问题、分散的管…

强化自主可控,润开鸿发布基于RISC-V架构的开源鸿蒙终端新品

2023 RISC-V中国峰会于8月23日至25日在北京召开,峰会以“RISC-V生态共建”为主题,结合当下全球新形势,把握全球新时机,呈现RISC-V全球新观点、新趋势。本次大会邀请了RISC-V国际基金会、业界专家、企业代表及社区伙伴等共同探讨RISC-V发展趋势与机遇,吸引超过百余家业界企业、高…

指针(个人学习笔记黑马学习)

1、指针的定义和使用 #include <iostream> using namespace std;int main() {int a 10;int* p;p &a;cout << "a的地址为&#xff1a;" << &a << endl;cout << "a的地址为&#xff1a;" << p << endl;…

Doris数据库BE——rowset版本追踪

rowset代码位置be/src/olap/version_graph.cpp&#xff0c;字面意思行集合&#xff0c;由一行或多行组成。rowset版本简单理解为rowset编号&#xff0c;每次导入生成一个rowset&#xff0c;比如insert执行10次会生成10个rowset&#xff0c;一次streamload生成一个rowset。 版本…

Mac OS 13.4.1 搜狗输入法导致的卡顿问题

一、Mac OS 系统版本 搜狗输入法已经更新到最新 二、解决方案 解决方案一 在我的电脑上面需要关闭 VSCode 和 Chrmoe 以后&#xff0c;搜狗输入法回复正常。 解决方案二 强制重启一下搜狗输入法。 可以用 unix 定时任务去隔 2个小时自动 kill 掉一次进程 # kill 掉 mac …

【JS案例】JS实现手风琴效果

JS案例手风琴 &#x1f31f;效果展示 &#x1f31f;HTML结构 &#x1f31f;CSS样式 &#x1f31f;实现思路 &#x1f31f;具体实现 1.绑定事件 2.自定义元素属性 3.切换菜单 &#x1f31f;完整JS代码 &#x1f31f;写在最后 &#x1f31f;效果展示 &#x1f31f;HTML…

excel功能区(ribbonx)编程笔记--2 button控件与checkbox控件

我们上一章简单先了解了ribbonx的基本内容,以及使用举例实现自己修改ribbox的内容,本章紧接上一章,先讲解一下ribbonx的button控件。 在功能区的按钮中,可以使用内置图像或提供自已的图像,可以指定大按钮或者更小的形式,添加少量的代码甚至可以同时提供标签。此外,可以利…

C++ 多线程编程

C 多线程编程 点击获取更多的C学习笔记 1. 线程库的基本使用 创建线程 要创建线程&#xff0c;我们需要一个可调用的函数或函数对象&#xff0c;作为线程的入口点。在C11中&#xff0c;我们可以使用函数指针、函数对象或lambda表达式来实现。创建线程的基本语法如下&#x…

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

开发工具——IDE安装 / IDEA子module依赖导入失败编译提示xx找不到符号 / IDEA在Git提交时卡顿

近期换了工作电脑&#xff0c;公司的IT团队不够给力&#xff0c;不能复制电脑系统&#xff0c;所以又到了需要重装IDE配置开发环境的时候了&#xff1b;在安装和导入Java编译器IDEA的时候遇到一些"棘手"问题&#xff0c;这里整理下解决方法以备不时之需&#xff1b; …

R语言空气污染数据的地理空间可视化和分析:颗粒物2.5(PM2.5)和空气质量指数(AQI)...

原文链接&#xff1a;http://tecdat.cn/?p23800 由于空气污染对公众健康的不利影响&#xff0c;人们一直非常关注。世界各国的环境部门都通过各种方法&#xff08;例如地面观测网络&#xff09;来监测和评估空气污染问题&#xff08;点击文末“阅读原文”获取完整代码数据&…

ChatGPT⼊门到精通(4):ChatGPT 为何⽜逼

⼀、通⽤型AI 在我们原始的幻想⾥&#xff0c;AI是基于对海量数据的学习&#xff0c;锻炼出⼀个⽆所不知⽆所不能的模 型&#xff0c;并借助计算机的优势&#xff08;计算速度、并发可能&#xff09;等碾压⼈类。 但我们⽬前的AI&#xff0c;不管是AlphaGo还是图像识别算法&am…