【消息队列】RabbitMQ实现消费者组机制

目录

1. RabbitMQ 的 发布订阅模式

2. GRPC 服务间的实体同步

2.1 生产者服务

2.2 消费者服务

3. 可靠性

3.1 生产者丢失消息

3.2 消费者丢失消息

3.3 RabbitMQ 中间件丢失消息


1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go

  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列接收消息

2. GRPC 服务间的实体同步

考虑以下业务需求——

  • 模拟消费者组机制:
    • 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
    • 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
  • 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据

服务之间的实体数据同步方案:

2.1 生产者服务

(1) 初始化

生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:

  • 声明交换机 exchange_user、exchange_group
  • 声明消费者 consumer_user_rpc、consumer_org_rpc
  • 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变更时发送消息

发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。

2.2 消费者服务

消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。

  • 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费

Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~

3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列

在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。

func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {
	// ...

	// 初始化交换机和队列
	for topic, consumerGroups := range option.TopicConsumerGroupsBinding {
		err = initExchange(topic, consumerGroups, mq)
		if err != nil {
			return nil, err
		}
	}
	return mq, nil
}

func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {
	// 1. 创建发送通道
	pch, err := mq.conn.Channel()
	if err != nil {
		return err
	}
	mq.produceChannels[exchange] = pch

	// 2. 开启消息确认机制
	if err := pch.Confirm(false); err != nil {
		return err
	}

	// 3. 创建交换机
	// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
	err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
	if err != nil {
		return err
	}
	slog.Info("rabbitmq declared exchange", "exchange_name", exchange)

	// 4. 创建队列并绑定到交换机
	for _, consumerGroup := range strings.Split(consumerGroups, ",") {
		consumerGroup = strings.TrimSpace(consumerGroup)
		if consumerGroup == "" {
			continue
		}
		queue := queueName(exchange, consumerGroup)

		// 创建队列
		// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
		_, err = pch.QueueDeclare(queue, true, false, false, false, nil)
		if err != nil {
			return err
		}

		// 将队列绑定到交换机
		// 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数
		err = pch.QueueBind(queue, "", exchange, false, nil)
		if err != nil {
			return err
		}
		slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)

		// 创建接收通道
		cch, err := mq.conn.Channel()
		if err != nil {
			return err
		}
		mq.consumeChannels[queue] = cch
	}

	// 5. 开启消息确认事件监听、消息投递事件监听
	mq.publishWatcher[exchange] = &watcher{
		returnCh:  pch.NotifyReturn(make(chan amqp.Return)),
		confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
	}
	// 监听未被交换机投递的消息
	go func() {
		for ret := range mq.publishWatcher[exchange].returnCh {
			// 尝试重新投递
			ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)
			if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {
				slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)
			} else {
				slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)
			}
			time.Sleep(time.Second * 3)
		}
	}()
	return nil
}

(2) 发送重试

发送消息时增加重试机制。若超过重试上限,需记录日志或报警。

func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {
	body, _ := json.Marshal(data)
	msgID := uuid.New()

	var retried int
	for {
		err := r.publish(ctx, topic, msgID, body, time.Now())
		if err == nil {
			return nil
		}

		retried++
		if retried > r.option.RetryNum {
			return err
		}
		time.Sleep(r.option.RetryInterval)
	}
}

(3) confirm 消息确认机制

生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:

func initExchange(exchange string, mq *RabbitMQ) error {
	// ...
	// 开启消息确认机制
	if err := pch.Confirm(false); err != nil {
		return err
	}
	
	// 创建监听器
	mq.publishWatcher[exchange] = &watcher{
		confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
	}
	// ...
}

func (r *RabbitMQ) publish(ctx context.Context, ...) error {
	// publish发送消息
	// ...
	
	// 等待rabbitmq返回消息确认
	select {
	case confirm := <-r.publishWatcher[exchange].confirmCh:
		if !confirm.Ack {
			return errors.New("publish failed, got nack from rabbitmq")
		}
	case <-ctx.Done():
		return errors.New("context deadline, publish to rabbitmq timeout")
	case <-time.After(r.config.Timeout):
		return errors.New("publish to rabbitmq timeout")
	}
	return nil
}

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:

func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {
	// ...

	return consumeChannel.Ack(deliveryTag, false)
}

如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:

func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {
	// ...

	return consumeChannel.Nack(deliveryTag, false, true)
}

3.3 RabbitMQ 中间件丢失消息

(1) 数据持久化到磁盘

交换机持久化(durable=true):

// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)

队列持久化(durable=true):

// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)

消息持久化(DeliveryMode=Persistent):

err := ch.Publish(
	exchange, // 交换机名称
	"",       // 路由键
	true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息
	false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃
	amqp.Publishing{
		MessageId:    msgID,              // 消息ID
		ContentType:  "application/json", // 消息内容类型
		Body:         body,               // 消息内容
		DeliveryMode: amqp.Persistent,    // 消息需要持久化
		Timestamp:    t,                  // 消息时间
	},
)

(2) RabbitMQ 本身的数据一致性保证

RabbitMQ 使用 raft 共识算法保证数据一致性:

https://www.rabbitmq.com/docs/clustering#replica-placement

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/898709.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

winUI3 c++ 入门 2、 样式

目录 一、winUI3 基本概念及样式 1、边距 2、如何使用样式 1)、布局控件内定义样式 2)、APP.xmal定义全局样式 3)、单独的样式文件 3.1)、新增字典资源 xmal 3.2)、在里面设置样式 3.3)、引用样式 3、更多样式修改 1)、修改默认属性 2)、修改所有的默认颜色…

垃圾收集器与内存分配机制(一)

目录 一、为什么我们要去了解垃圾收集和内存分配 二、对象已死&#xff1f; 1. 引用计数算法 2. 可达性分析算法 3. 再谈引用 4. 生存还是死亡 5. 回收方法区 三、垃圾收集算法 1. 简介 2. 分代收集理论 2.1. 弱分代/强分代假说 2.2. 前面两代假说的缺陷 3. 标记-清…

智能去毛刺:2D视觉引导机器人如何重塑制造业未来

机器人技术已经深入到各个工业领域中&#xff0c;为制造业带来了前所未有的变革。其中&#xff0c;2D视觉引导机器人技术以其精准、高效的特点&#xff0c;在去毛刺工艺中发挥着越来越重要的作用。本文将为您介绍2D视觉引导机器人技术的基本原理及其在去毛刺工艺中的应用&#…

blender 理解 积木组合 动画制作 学习笔记

一、学习blender视频教程链接 案例2&#xff1a;积木组合_动画制作_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1Bt4y1E7qn?vd_sourced0ea58f1127eed138a4ba5421c577eb1&p10&spm_id_from333.788.videopod.episodes 二、说明 之前已经学习了如何制作积木组…

20 Shell Script输入与输出

标出输入、标准输出、错误输出 一、程序的基本三个IO流 一&#xff09;文件描述符 ​ 任何程序在Linux系统中都有3个基本的文件描述符 ​ 比如: ​ cd/proc/$$/fd ​ 进入当前shell程序对于内核在文件系统的映射目录中: [rootlocalhost ~]# cd /proc/$$/fd [rootlocalhos…

Ubuntu22.04环境搭建MQTT服务器

官网&#xff1a; https://mosquitto.org 1.引入库 sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa2.升级安装工具 sudo apt-get update 3.安装 sudo apt-get install mosquitto 4.安装客户端 sudo apt-get install mosquitto-clients5.添加修改配置文件 进…

微信小程序上传图片添加水印

微信小程序使用wx.chooseMedia拍摄或从手机相册中选择图片并添加水印&#xff0c; 代码如下&#xff1a; // WXML代码&#xff1a;<canvas canvas-id"watermarkCanvas" style"width: {{canvasWidth}}px; height: {{canvasHeight}}px;"></canvas&…

【Linux】冯诺依曼体系结构 OS的概念

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 前言废话&#xff1a…

将java项目jar包打包成exe服务

1.结构展示 2.注意事项 前提: 环境准备:jdk8 和 .net支持 { 1.控制面板》程序和功能》启用和关闭windows功能》.net的勾选》2.jdk8自行百度安装环境3.其他项目必须的软件环境安装等&#xff08;数据库...&#xff09; }第一次准备: 1.将打包好的jar包放到premiumServices.exe…

销冠教你如何转化观望客户

在销售实践中&#xff0c;常会遇到这样的场景&#xff1a;客户对我们的提案表现出极大的兴趣&#xff0c;但在执行阶段却显得迟疑&#xff0c;频繁表示“还需观望&#xff0c;再考虑”。这种态度不仅拖慢了项目进度&#xff0c;甚至可能导致项目完全停滞&#xff0c;从而错失宝…

Spring Boot技术栈在论坛网站开发中的应用

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

FLUX.1-dev-LoRA模型:用一张卡通图片总结4张真实人物照片One-Click-Creative-Template

在这个数字时代&#xff0c;创意图像生成正成为我们生活中不可或缺的一部分。Shakker Labs 推出的 FLUX.1-dev-LoRA-One-Click-Creative-Template 为我们带来了一个全新的体验。这款创新的模板模型专为文本生成图像设计&#xff0c;让您能够轻松创造出令人惊艳的照片。 模型功能…

听泉鉴宝在三个月前已布局商标注册!

近日“听泉鉴宝”以幽默的风格和节目效果迅速涨粉至2500多万&#xff0c;连线出现“馆藏文物”和“盗墓现场”等内容&#xff0c;听泉鉴宝早在几个月前已布局商标注册。 据普推知产商标老杨在商标局网站检索发现&#xff0c;“听泉鉴宝”的主人丁某所持股的江苏灵匠申请了三十…

qt QNetworkProxy详解

一、概述 QNetworkProxy通过设置代理类型、主机、端口和认证信息&#xff0c;可以使应用程序的所有网络请求通过代理服务器进行。它支持为Qt网络类&#xff08;如QAbstractSocket、QTcpSocket、QUdpSocket、QTcpServer、QNetworkAccessManager等&#xff09;配置网络层代理支持…

mysql innodb 引擎如何直接复制数据库文件?

mysql innodb 引擎如何直接复制数据库文件&#xff1f;介绍如下&#xff1a; 1、首先找到数据库文件所在位置 一般可以看my.conf/my.ini配置的文件的“datadir” 看示例&#xff1a; “MAMP”在Macos下的数据库文件位置&#xff1a; /Library/Application Support/appsolu…

展会亮点回顾|HMS汽车工业通信解决方案

2024 汽车测试及质量监控博览会&#xff08;中国&#xff09;&#xff08;Testing Expo China – Automotive&#xff09;于 8 月 28 日至 30 日在上海世博展览馆顺利举行。作为汽车测试技术领域的顶级盛会&#xff0c;来自全球的行业领袖和技术专家齐聚一堂&#xff0c;共同探…

值得细读的8个视觉大模型生成式预训练方法

大语言模型的进展催生出了ChatGPT这样的应用&#xff0c;让大家对“第四次工业革命”和“AGI”的来临有了一些期待&#xff0c;也作为部分原因共同造就了美股2023年的繁荣。LLM和视觉的结合也越来越多&#xff1a;比如把LLM作为一种通用的接口&#xff0c;把视觉特征序列作为文…

高速数字化仪,4路每路20M同步AD模拟信号采集卡——PCIe8531B/8532B

阿尔泰科技 型号&#xff1a;PCIe8531B/8532B 简介&#xff1a; PCIe8531B/8532B&#xff0c; 是4 通道12/14 位20Ms/s 采样数字化仪&#xff0c;专为输入信号高达 10M 的高频和高动态范国的信号而设计。模拟输入范围可以通过软件编程设罝为1V或者主SV。配备了容量高达256MB…

Java的评论大冒险:用代码征服API数据

在一个充满数字奥秘的虚拟世界里&#xff0c;Java勇士正准备踏上他的新征程&#xff1a;获取商品评论的API数据。这不仅是一次技术的挑战&#xff0c;更是一次与时间赛跑的较量。Java勇士&#xff0c;这位编程界的探险家&#xff0c;打开了他的IDE&#xff0c;准备开始这场冒险…

Qt贪吃蛇-游戏房间窗口(3)

目录 游戏房间 房间渲染 房间背景设置 设置房间标题和尺寸 运动逻辑 定义可能会用到的对象 向上移动逻辑 向下移动 向右移动 判断失败 绘制蛇 初始化贪吃蛇 创建食物 蛇的吃吃吃 绘制食物 连接上文 游戏房间 房间渲染 房间背景设置 背景图比例 绘制背景 设置…