rocketMQ5.0顺序消息golang接入

本人理解,顺序消息如果不分消息组,那么会影响并行处理速度,所以尽量消息组分的散一些
首先上要求,官方文档如下:
在这里插入图片描述
总结:
1.必须同一个消息组,消息组和消费组不是一个概念,不要混
2.必须单一生产者,也就是说线上生产只能开一个 pod,感觉局限有点高,无法多 pod 接入
3.必须串行发送,这个点也不太好,限制过高
以上三点
在我的案例中(企业微信回调消息)
那么只能开一个接口服务Pod 来接收微信回调,如果挂了,那完蛋,开多个 pod的话,那只有把请求放队列,通过队列pop进行消费再生产消息到 mq,这样同时也解决了第三点的串行发送。

生产的时候如何确定是顺序消息,只需要生产消息的时候给设定一个消息组

msg := &rmq_client.Message{
Topic: Topic,
	Body:  []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// 这里设置消息组
msg.SetMessageGroup("fifo")

分组分的越细越好

提高消费速度

当拿到消息后,根据消息分组来进行并发处理,每个分组内进行串行处理,关键代码如下


func (m *RocketMq) ConsumerOrderly(funcMap map[string]func([]byte) error) {
	var err error

	m.consumerOnce.Do(func() {
		Log().Info("##############pro consume orderly start#############", m.MqConfig)
		errTmp := m.proSimConsumer.Start()
		if errTmp != nil {
			Log().Panic("MQ启动失败", errTmp)
			return
		}
	})
	defer m.proSimConsumer.GracefulStop()
	// 总体保证有N个在运行
	var ch = make(chan int, m.MaxGoroutine)
	for {
		fmt.Println("start receive message")
		mvs, errReceive := m.proSimConsumer.Receive(context.TODO(), 8, 20*time.Second)
		if errReceive != nil {
			if strings.Contains(errReceive.Error(), "no new message") {
				Log().Info(errReceive)
			} else {
				Log().Error("顺序消息,拉取MQ消息失败:", errReceive)
			}
			time.Sleep(time.Second * 2)
			continue
		}
	
		var msgGroupMap = make(map[string][]*rmq_client.MessageView, 0)
		for _, v := range mvs {
			if _, ok := funcMap[*v.GetTag()]; !ok {
				Log().Error(v.GetTag(), ": action do not exist")
				continue
			} else {
				// 根据msgGroup汇总
				msgGroup := v.GetMessageGroup()
				msgGroupMap[*msgGroup] = append(msgGroupMap[*msgGroup], v)
			}
		}
		// 最大程度多线程消费
		for _, item := range msgGroupMap {
			ch <- 1
			go func(item []*rmq_client.MessageView) {
				for _, v := range item {
					fmt.Println(*v.GetTag(), *v.GetMessageGroup(), string(v.GetBody()))
					action, _ := funcMap[*v.GetTag()]
					if errTmp := action(v.GetBody()); errTmp != nil {
						Log().Error("mq顺序消费失败", errTmp)
						break
					} else {
						m.proSimConsumer.Ack(context.TODO(), v)
					}
				}
				<-ch
			}(item)
		}
	}
}

最后需要注意的点:

同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失
请保证订阅一致性
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/183947.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

首个央企量子云计算项目,中标!

6月29日&#xff0c;北京玻色量子科技有限公司&#xff08;简称“玻色量子”&#xff09;成功中标中国移动云能力中心“2023—2024年量子算法及光量子算力接入关键技术研究项目”&#xff0c;这是玻色量子继与移动云签订“五岳量子云计算创新加速计划”后&#x1f517;&#xf…

Qt5.15编译工程报APK 的 API 级别设定低于套件所需的最低要求

APK 的 API 级别设定低于套件所需的最低要求。 套件所需的最低 API 级别是 21。 Error while building/deploying project qtpdfium (kit: 安卓 Qt 5.15.2 Clang Multi-Abi) When executing step "构建安卓 APK" 修改xml 工程中是16 修改为21 重新编译&#xff0c;问…

JSP:JDBC

JDBC&#xff08;Java Data Base Connectivity的缩写&#xff09;是Java程序操作数据库的API&#xff0c;也是Java程序与数据库相交互的一门技术。 JDBC是Java操作数据库的规范&#xff0c;由一组用Java语言编写的类和接口组成&#xff0c;它对数据库的操作提供基本方法&#…

YaRN方法:无需微调,高效扩展语言模型上下文窗口/蚂蚁集团与浙大发布原生安全框架v1.0,引领企业网络安全新时代 |魔法半周报

我有魔法✨为你劈开信息大海❗ 高效获取AIGC的热门事件&#x1f525;&#xff0c;更新AIGC的最新动态&#xff0c;生成相应的魔法简报&#xff0c;节省阅读时间&#x1f47b; &#x1f525;资讯预览 YaRN方法&#xff1a;无需微调&#xff0c;高效扩展语言模型上下文窗口 蚂蚁…

【Python】itertools模块,补充:可迭代对象、迭代器

Python中 itertools模块创建高效迭代器、处理序列数据集。 此模块所有函数返回迭代器&#xff0c;可用for循环获取迭代器中的内容&#xff0c;也可用list(...)用列表形式显示内容。 import itertools[ x for x in dir(itertools) if not x.startswith(_)] # 结果&#xff1a;…

从零开始,用Docker-compose打造SkyWalking、Elasticsearch和Spring Cloud的完美融合

&#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 "从零开始&#xff0c;用Docker-compose打造SkyWalking、Elasticsearch和Spring Cloud的完美融合 前言准备工作编写docker-compose.yml文件为什么使用本机ip为什么skywa…

Java 简单配置环境变量,切换多个jdk版本

文章目录 前言一、jdk下载二、配置环境变量三、查看jdk版本四、配置多个jdk五、切换jdk 前言 windows 配置jdk环境变量&#xff0c;如果项目没有规定使用的jdk版本的话&#xff0c;建议使用jdk8&#xff0c;这是最常用也是最稳定的版本 一、jdk下载 https://www.oracle.com/ja…

技术部工作职能规划分析

前言 技术部的职能。以下是一个基本的框架,其中涵盖了技术部在公司中的关键职能和子职能。 主要职能 技术部门的主要职能分为以下几个板块: - 技术规划与战略: 制定技术规划和战略,与业务团队合作确定技术需求。 研究和预测技术趋势,引领公司在技术创新和数字化转型方…

PC分页操作以及loading效果

page-size 每页显示条目个数 current-page 当前页数 total 数据总数 current-change【currentPage 改变时会触发】 切换分页时会先加载&#xff0c;等在接口数据&#xff0c;接口返回&#xff0c;加载会关闭&#xff08;在获取接口数据完毕哪里加上this.loadingfalse&#xff0…

苹果引导式访问是什么意思?怎么使用?这里有答案!

可能还有很多小伙伴都不知道&#xff0c;苹果手机里有一个隐藏的功能叫做引导式访问。引导式访问是什么意思&#xff1f;引导式访问是一种特殊的功能&#xff0c;可以帮助用户限制在某个应用程序中的操作&#xff0c;以保护隐私或防止误操作。今天&#xff0c;小编将给大家详细…

【免费】小傅哥 DDD 开发小册

作者&#xff1a;小傅哥 博客&#xff1a;https://bugstack.cn 沉淀、分享、成长&#xff0c;让自己和他人都能有所收获&#xff01;&#x1f604; 大家好&#xff0c;我是技术UP主小傅哥。 如果在面试的时候&#xff0c;面试官问你DDD是什么&#xff0c;你怎么解释&#xff1…

某上市证券公司:管控文件交换行为 保护核心数据资产

客户简介 某上市证券公司成立于2001年&#xff0c;经营范围包括&#xff1a;证券经纪、证券投资咨询、证券承销与保荐、证券自营等。经过多年发展&#xff0c;在北京、上海、深圳、重庆、杭州、厦门等国内主要中心城市及甘肃省内各地市设立了15家分公司和80余家证券营业部。20…

F盘满了变成红色怎么清理?这4个简单方法记得收藏!

“因为我电脑的磁盘比较多&#xff0c;我通常会把一些比较重要的文件放在F盘中。但是很奇怪&#xff0c;我的F盘用着用着就满成红色了&#xff0c;这该怎么办呢&#xff1f;应该怎么进行清理呢&#xff1f;” 我们在使用电脑时都会发现&#xff0c;电脑上有很多的磁盘。我们可以…

python 爬百度热搜并生成词云

1、爬取百度body存入txt def get_baidu_hot():url "https://top.baidu.com/board?tabrealtime"headers {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3&…

Mysql 锁机制分析

整体业务代码精简逻辑如下&#xff1a; Transaction public void service(Integer id) {delete(id);insert(id); }数据库实例监控&#xff1a; 当时通过分析上游问题流量限流解决后&#xff0c;后续找时间又重新分析了下问题发生的根本原因&#xff0c;现将其总结如下&#xf…

Android : AlertDialog对话框、单选、多选、适配器-简单应用

示例图&#xff1a; 1 &#xff1a;创建 AlertDialog.Builder 对象&#xff1b; 2 &#xff1a;调用 setIcon() 设置图标&#xff0c; setTitle() 或 setCustomTitle() 设置标题&#xff1b; 3 &#xff1a;设置对话框的内容&#xff1a; setMessage() 还有其他方法来指定显示…

成为AI产品经理——模型构建流程(下)

目录 1.模型训练 2.模型验证 3.模型融合 4.模型部署 上节课我们讲了模型设计、特征工程&#xff0c;这节课我们来讲模型构建剩下的三个部分&#xff1a;模型训练、模型验证和模型融合。 1.模型训练 模型训练就是要不断地训练、验证、调优直至让模型达到最优。 那么怎么达…

沃趣班11月月考题目解析

沃趣班11月月考题目解析 1.在oracle中创建用户时&#xff0c;若未设置default tablespace关键字&#xff0c;则oracle将哪个表空间分配给用户作为默认表空间 答案&#xff1a;D.user SQL> create user mytest identified by 123456; SQL> grant connect to mytest; SQL…

【解决】HDFS JournalNode启动慢问题排查

文章目录 一. 问题描述二. 问题分析1. 排查机器性能2. DNS的问题 三. 问题解决 一句话&#xff1a;因为dns的问题导致journalnode启动时很慢&#xff0c;通过修复dns对0.0.0.0域名解析&#xff0c;修复此问题。 一. 问题描述 从journalnode启动到服务可用&#xff0c;完成RPC…

又一重量级RTOS及组件开源,免费商用,支持更宽松 MIT 协议

关注星标公众号&#xff0c;不错过精彩内容 作者 | strongerHuang 微信公众号 | strongerHuang 不知道大家有没有发现&#xff1a;面向大众的软件代码&#xff0c;开源才是“王道”&#xff1f; FreeRTOS之所以这么流行&#xff0c;很大程度在于它免费开源&#xff08;遵循MIT开…