Go微服务: 基于rocketmq:server和rocketmq:broker搭建RocketMQ环境,以及生产消息和延迟消费消息的实现

RocketMQ 的搭建


1 ) 配置 docker-compose.yaml 文件

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
    networks:
      rmq:
        aliases:
          - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - "10909:10909"
      - "10911:10911"
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - "8080:8080"
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

2 ) 配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
  • 注意,需要指定 brokerIP1 且不能使用 0.0.0.0 也不能不指定,否则无法通信
  • fileReservedTime 默认是 48h

3 ) 拉取镜像

  • $ docker pull foxiswho/rocketmq:server
  • $ docker pull foxiswho/rocketmq:broker
  • $ docker pull styletang/rocketmq-console-ng

4 )启动和检查

  • 启动 $ docker compose up -d
  • 检查状态 $ docker compose ps

打开 UI 界面验证

  • 访问:http://127.0.0.1:8080

上面这个就是和上面的 brokerIP1 对应

编写程序验证生产和消费消息

  • 现在简述下场景
    • 生产5条消息
    • 10s 后进行消费

代码实现

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

const groupName = "BBS_SHOP_GROUP_123"

func GetMqAddr() string {
	mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口
	return mqAddr
}

func ProduceMsg(mqAddr string, topic string) {
	p, err := rocketmq.NewProducer( // 普通消息生产者
		producer.WithGroupName(groupName),
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
		producer.WithRetry(2),
	)
	if err != nil {
		panic(err)
	}
	err = p.Start()
	if err != nil {
		log.Fatal()
		fmt.Println("生产者错误: %v", err.Error())
		os.Exit(1)
	}
	for i := 0; i < 5; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("Hello XProjectOrder " + strconv.Itoa(i)),
		}
		msg.WithDelayTimeLevel(3)
		r, err := p.SendSync(context.Background(), msg)
		if err != nil {
			fmt.Println("发送消息错误: %v", err.Error())
		} else {
			fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Println("生产者shutdown: %v", err.Error())
		os.Exit(1)
	}
}

func ComsumeMsg(mqAddr string, topic string) {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(groupName),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
	)
	if err != nil {
		panic(err)
	}
	err = c.Subscribe(topic, consumer.MessageSelector{},
		func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgList {
				fmt.Printf("订阅消息,消费%v \n", msgList[i])
			}
			return consumer.ConsumeSuccess, nil
		})
	if err != nil {
		fmt.Println("消费消息错误: %v", err.Error())
	}
	err = c.Start()
	if err != nil {
		fmt.Println("开启消费这错误: %v", err.Error())
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Println("shutdown消费者错误: %v", err.Error())
	}
}

func main() {
	topic := "BBS_SHOP_TOPIC_123"
	mqAddr := GetMqAddr()
	ProduceMsg(mqAddr, topic)
	ComsumeMsg(mqAddr, topic)
}
  • 以上是一个 demo,在真实场景,自行进行封装处理

  • 这里定义了一个 主题 BBS_SHOP_TOPIC_123 和 一个订阅组 BBS_SHOP_GROUP_123

  • 查看生产消息输出

    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
    
  • 查看消费 (10s 之后)

    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
    
  • 以上就是生产和消费的主要过程

效果

总结

  • 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
  • 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/683401.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue3实战笔记(58)—从零开始掌握Vue3插槽机制,基础入门

文章目录 前言插槽基础入门总结 前言 不论是组件封装还是分析源码&#xff0c;实际开发中经常接触插槽&#xff0c;插槽是干什么用的呢&#xff1f;组件之间能够接收任意类型的 JavaScript 值作为 props&#xff0c;但组件要如何接收模板内容呢&#xff1f;在某些场景中&#…

伏图(Simdroid)5.0 电子散热模块 | 可试用

伏图-电子散热模块&#xff08;Simdroid-EC&#xff09;是云道智造基于通用多物理场仿真PaaS平台伏图开发的针对电子元器件、设备等散热的专用热仿真模块&#xff0c;内置电子产品专用零部件模型库&#xff0c;支持用户通过“搭积木”的方式快速建立电子产品的热分析模型&#…

【13】Camunda7-第一个BPMN流程

在前一篇博文里&#xff0c;我们介绍了Camunda Modeler&#xff0c;那么接下来&#xff0c;我们就使用这个设计器工具绘制我们的第一个BPMN流程。 1. 场景概述 关于第一个流程&#xff0c;这里计划以我们最常见的请假审批流程为例&#xff0c;它涉及到表单设计、人工审批任务等…

从0开始用TCN预测股价

源码地址 由于用的是tensflow&#xff0c;其实和pytorch差不多看懂就行&#xff08;pytorch版本我还在改&#xff0c;有些难度&#xff09;因为这里面的tcn结构是用的 别人的代码https://github.com/philipperemy/keras-tcn 首先说一下什么是TCN 首先我想说 我们预测股票价格…

Science项目文章 | 中国农科院作科所研究团队解析“复粒稻”多粒簇生的机制

2024年3月8日&#xff0c;由中国农业科学院作物科学研究所童红宁研究员领衔的研究团队在Science发表题为“Enhancing rice panicle branching and grain yield through tissue-specific brassinosteroid inhibition”的研究论文。该研究报道了复粒稻多粒簇形成的机制&#xff0…

python-flask项目的服务器线上部署

在部署这部分我首先尝试了宝塔面板&#xff0c;始终连接失败 换了一种思路选择了Xshell成功连接 首先我们需要下载个免费版本的Xshell 免费的&#xff1a;家庭/学校免费 - NetSarang Website 下载完毕打开 1新建-> 输入服务器的账号密码&#xff1a; 在所有会话中点击自…

WalleWeb简化你的DevOps部署流程

walle-web&#xff1a;简化部署流程&#xff0c;提升开发效率&#xff0c;Walle Web让DevOps触手可及 - 精选真开源&#xff0c;释放新价值。 概览 Walle Web是一个功能强大且免费开源的DevOps平台&#xff0c;旨在简化和自动化代码部署流程。它支持多种编程语言&#xff0c;包…

学习axios拦截器

axios拦截器的作用&#xff1a;用于在请求发送前和响应返回后对请求和响应进行统一处理&#xff0c;例如添加公共请求头、处理请求参数、统一处理错误信息等。拦截器提供了一种灵活、高效的方式来管理HTTP请求和响应&#xff0c;帮助在前端开发中更好地处理数据交互。 这是一个…

Commons-Collections篇-CC3链

前言 我们分析前两条链CC1和CC6时&#xff0c;都是利用invoke反射调用的Runtime().getRuntime().exec()来执行命令。而很多时候服务器的代码当中的黑名单会选择禁用Runtime CC3链主要通过动态加载类加载机制来实现自动执行恶意类代码 1.环境安装 可以接着使用我们之前分析C…

植物大战僵尸杂交版2.0.88最新版安装包

游戏简介 游戏中独特的杂交植物更是为游戏增添了不少亮点。这些杂交植物不仅外观独特&#xff0c;而且拥有更强大的能力&#xff0c;能够帮助玩家更好地应对游戏中的挑战。玩家可以通过一定的条件和方式&#xff0c;解锁并培养这些杂交植物&#xff0c;从而不断提升自己的战斗…

重大变化,2024软考!

根据官方发布的2024年度计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试安排&#xff0c;2024年软考上、下半年开考科目有着巨大变化&#xff0c;我为大家整理了相关信息&#xff0c;大家可以看看&#xff01; &#x1f3af;2024年上半年&#xff1a;5月25日&am…

【动手学深度学习】卷积神经网络(AlexNet)的研究详情

目录 &#x1f30a;1. 研究目的 &#x1f30a;2. 研究准备 &#x1f30a;3. 研究内容 &#x1f30d;3.1 卷积神经网络&#xff08;AlexNet&#xff09; &#x1f30d;3.2 练习 &#x1f30a;4. 研究体会 &#x1f30a;1. 研究目的 多层感知机模型选择&#xff1a;比较不同…

【devops】 Bytebase 一站式开源 数据库DevOps平台

初识 Bytebase 1、安装 安装地址 https://www.bytebase.com/docs/get-started/self-host/#docker 安装指令 docker run --init \--name bytebase \--publish 8080:8080 --pull always \--volume ~/.bytebase/data:/var/opt/bytebase \bytebase/bytebase:2.18.02、登录-dashboa…

山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十七)- 微服务(7)

11.1 : 同步调用的问题 11.2 异步通讯的优缺点 11.3 MQ MQ就是事件驱动架构中的Broker 安装MQ docker run \-e RABBITMQ_DEFAULT_USERxxxx \-e RABBITMQ_DEFAULT_PASSxxxxx \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management 浏览器访问1…

gkuubibiih

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…

SwiftUI中Menu和ControlGroup的使用

本篇文章主要介绍一下Menu组件和ControlGroup组件的使用。Menu组件是在iOS 14&#xff08;tvOS 17&#xff09;推出的一个组件&#xff0c;点击后提供一个可选择的操作列表。ControlGroup组件是一个容器视图&#xff0c;以视觉上适当的方式为给定的上下文显示语义相关的控件&am…

钢基础知识介绍

钢铁是一种铁碳合金&#xff0c;含有一定量的碳和其他合金元素&#xff0c;如硅、锰等。而钢材则是经过加工处理后的钢铁材料&#xff0c;具有更高的强度、硬度、塑性和韧性。钢铁的硬度、强度和耐磨性相对较低&#xff0c;而钢材经过加工处理后&#xff0c;其硬度、强度和耐磨…

训练营第二十七天 | 491.递增子序列46.全排列47.全排列 II332.重新安排行程51. N皇后

491.递增子序列 力扣题目链接(opens new window) 给定一个整型数组, 你的任务是找到所有该数组的递增子序列&#xff0c;递增子序列的长度至少是2。 示例: 输入: [4, 6, 7, 7]输出: [[4, 6], [4, 7], [4, 6, 7], [4, 6, 7, 7], [6, 7], [6, 7, 7], [7,7], [4,7,7]] 说明: …

企业(园区)智慧能源双碳平台解决方案

园区作为工业企业集聚区&#xff0c;在提供了大量基础设施和公共服务的同时也成为了碳排放的主要源头。工业园区的耗能约占全社会总耗能的69%&#xff0c;碳排放占全国总排放约31%。工业园区节能、减耗、提质、减碳工作的落实&#xff0c;是我国实现碳达峰碳中和目标的必然要求…

关系代数与规范化

本文是根据自己的理解&#xff0c;结合实践整理所得&#xff0c;有兴趣的可以参考学习。