RocketMQ 5.x如何使用GRPC方式发送消费消息

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

我们都知道RocketMQ 5.x新增了proxy模式部署方式,也就是支持了GRPC的消费方式消费,所以今天我们来试试

本次使用的开发语言是goland

前置条件

这里默认我们已经部署了RocketMQ proxy,如果不会部署的可以参考我之前的文章

依赖管理

本次使用的依赖管理方式是go.mod
使用的goland sdk是github.com/apache/rocketmq-clients/golang

也就是这个开源项目

我们直接执行

go get github.com/apache/rocketmq-clients/golang@master

master分支作为我们的依赖

发送消息

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic     = "xiao-zou-topic"
	Endpoint  = "127.0.0.1:8081"
	AccessKey = "xxxxxx"
	SecretKey = "xxxxxx"
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// new producer instance
	producer, err := rmq_client.NewProducer(&rmq_client.Config{
		Endpoint: Endpoint,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop producer
	defer producer.GracefulStop()

	for i := 0; i < 10; i++ {
		// new a message
		msg := &rmq_client.Message{
			Topic: Topic,
			Body:  []byte("this is a message : " + strconv.Itoa(i)),
		}
		// set keys and tag
		msg.SetKeys("a", "b")
		msg.SetTag("ab")
		// send message in sync
		resp, err := producer.Send(context.TODO(), msg)
		if err != nil {
			log.Fatal(err)
		}
		for i := 0; i < len(resp); i++ {
			fmt.Printf("%#v\n", resp[i])
		}
		// wait a moment
		time.Sleep(time.Second * 1)
	}
}

我们可以直接运行,然后看到消息发送成功了

消息消费

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic         = "xiao-zou-topic"
	ConsumerGroup = "gid-xiaozou-grpc"
	Endpoint      = "127.0.0.1:8081"
	AccessKey     = "xxxxxx"
	SecretKey     = "xxxxxx"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// new simpleConsumer instance
	simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
		Endpoint:      Endpoint,
		ConsumerGroup: ConsumerGroup,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithAwaitDuration(awaitDuration),
		rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
			Topic: rmq_client.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("start receive message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				msg := string(mv.GetBody())
				fmt.Println(msg)
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	select {}
}

执行结果:

源码

相关源码已上传到github,需要可以自取

https://github.com/weihubeats/java-to-go-learning/tree/main/student/rocketmq-demo

总结

可以看到我们使用GRPC的方式消费和发送消息都成功了,但是需要注意的是目前rocketmq-clients还不是很稳定,有一些bug,生产使用还是需要谨慎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/67203.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

php通过各种函数判断0和空php实例

php通过各种函数判断0和空php实例 本文给大家介绍php同各种函数判断0和空的方法&#xff0c;在文章给大家补充介绍了php 语法里0不等于null为空的解决办法 补充&#xff1a;下面给大家介绍下php 语法里0不等于null为空的解决办法 今天遇到这样一个问题是这样的: php 语句里,我…

Nevron Diagram for .NET Crack

Nevron Diagram for .NET Crack Nevron Diagram for.NET可帮助您快速轻松地在.NET Windows窗体和ASP.NET应用程序中集成和显示复杂的图表。这是一个完整的绘图解决方案&#xff0c;包含许多交互功能、形状、自动布局和令人惊叹的视觉效果&#xff0c;并配备了现成的控件&#x…

Linux常规操作命令

日升时奋斗&#xff0c;日落时自省 目录 1、vim 1.1、工作模式 1.2、末行模式操作相关命令 1.2.1、保存退出操作 1.2.2、查找替换 1.3、输入模式操作相关命令 1.3.1、移动相关命令 1.3.2、删除和剪切命令 1.3.3、复制操作 1.3.4、撤销 2、head 3、tail 4、ps 5、…

策略模式【Strategy Pattern】

刘备要到江东娶老婆了&#xff0c;走之前诸葛亮给赵云&#xff08;伴郎&#xff09;三个锦囊妙计&#xff0c;说是按天机拆开解决棘手问题&#xff0c; 嘿&#xff0c;还别说&#xff0c;真是解决了大问题&#xff0c;搞到最后是周瑜陪了夫人又折兵呀&#xff0c;那咱们先看看…

爬虫学习记录(持续更新)

一、问题记录 1.使用webdriver报错AttributeError: str object has no attribute capabilities 解决&#xff1a;目前使用的selenium版本是4.11.2&#xff0c;可以不必设置driver.exe的路径&#xff0c;selenium可以自己处理浏览器和驱动程序&#xff0c;因此&#xff0c;使用…

Shopify平台Fulfillment业务模块升级

上图是销售订单、发货单与配送之间的关系图&#xff0c;销售订单可以创建多个发货单&#xff0c;多个发货单(不同销售订单)可以合并在一个配送订单进行发货 接口请求错误记录: 1. The api_client does not have the required permission(s). 2. Required parameter missing or…

具有吸引子的非线性系统(MatlabSimulink实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

第三章 图论 No.5最小生成树之虚拟源点,完全图与次小生成树

文章目录 虚拟源点&#xff1a;1146. 新的开始贪心或kruskal性质&#xff1a;1145. 北极通讯网络最小生成树与完全图&#xff1a;346. 走廊泼水节次小生成树&#xff1a;1148. 秘密的牛奶运输 虚拟源点&#xff1a;1146. 新的开始 1146. 新的开始 - AcWing题库 与一般的最小…

(MVC)SpringBoot+Mybatis+Mapper.xml

前言&#xff1a;本篇博客主要对MVC架构、Mybatis工程加深下理解&#xff0c;前面写过一篇博客&#xff1a;SprintBoothtml/css/jsmybatis的demo&#xff0c;里面涉及到了Mybatis的应用&#xff0c;此篇博客主要介绍一种将sql语句写到了配置文件里的方法&#xff0c;即Mybatis里…

将大容量机械硬盘克隆到固态硬盘的简单方法!

可以大容量机械硬盘克隆到固态硬盘吗&#xff1f; 随着硬盘使用时间增长&#xff0c;电脑的性能可能会下降。为了追求更快的读写速度&#xff0c;不少用户将目光投向了固态硬盘。 ​众所周知&#xff0c;固态硬盘的读写速度和启动速度比机械硬盘快。用固态硬盘替…

k8s --pod详解

目录 一、Pod基础概念 1、pod简介 2、在Kubrenetes集群中Pod有如下两种使用方式 3、pause容器使得Pod中的所有容器可以共享两种资源&#xff1a;网络和存储。 &#xff08;1&#xff09;网络 &#xff08;2&#xff09;存储 4、kubernetes中的pause容器主要为每个容器提供…

4.2、Flink任务怎样读取文件中的数据

目录 1、前言 2、readTextFile&#xff08;已过时&#xff0c;不推荐使用&#xff09; 3、readFile&#xff08;已过时&#xff0c;不推荐使用&#xff09; 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢&#xff1f; 1. 文件的格式(tx…

Babylon.js着色器简明简称【Shader】

推荐&#xff1a;用 NSDT设计器 快速搭建可编程3D场景 为了生成 BabylonJS 场景&#xff0c;需要用 Javascript 编写代码&#xff0c;BabylonJS 引擎会处理该代码并将结果显示在屏幕上。 场景可以通过改变网格、灯光或摄像机位置来改变。 为了及时显示可能的变化&#xff0c;屏…

kubernetes中最小组件——Pod

目录 一、Pod简介 二、Pod的使用方式 三、Pause——Pod中底层基础容器 四、为什么kubernetes这样设计Pod 五、Pod的分类 1.自主式Pod 2.控制器管理的Pod 3.静态Pod 六、Pod容器的分类 1. 基础容器&#xff08;infrastructure container&#xff09; 2. 初始化容器&am…

MySQL DAL单表练习一(学生表)

目录 步骤&#xff1a; 1、创建学生表 2、插入数据 1&#xff09;查询表中所有学生的信息 2&#xff09;查询表中所有学生的姓名和英语成绩 3&#xff09; 过滤表中的重复数据 4&#xff09; 统计每个学生的总分 5&#xff09; 所有学生总分上加上10 6&#xff09; 使…

微信个人小程序申请 (AppID 和 AppSecret)

1. 登录微信公众平台 https://mp.weixin.qq.com/cgi-bin/loginpage?url%2Fcgi-bin%2Fhome%3Ft%3Dhome%2Findex%26lang%3Dzh_CN%26token%3D47421820 2. 右上角立即注册 3. 注册类型选择小程序 4. 账号信息 5. 邮箱激活 6. 小程序发布流程 7. 小程序信息 (前往填写) 8. 获取小程…

【Axure高保真原型】JS版日期区间下拉选择器

今天和大家分享JS版日期区间下拉选择器的原型模板&#xff0c;该模板通过调用浏览器的下拉列表&#xff0c;所以可以获取真实的日历效果&#xff0c;具体包括哪一年二月份有29天&#xff0c;几号对应星期几&#xff0c;都是真实的。建议使用谷歌浏览器来演示&#xff0c;其他浏…

Apache2.4源码安装与配置

环境准备 openssl-devel pcre-devel expat-devel libtool gcc libxml2-devel 这些包要提前安装&#xff0c;否则httpd编译安装时候会报错 下载源码、解压缩、软连接 1、wget下载[rootnode01 ~]# wget https://downloads.apache.org/httpd/httpd-2.4.57.tar.gz --2023-07-20 …

leetcode870. 优势洗牌(java)

优势洗牌 leetcode870. 优势洗牌题目描述双指针 排序代码 滑动窗口 leetcode870. 优势洗牌 难度 - 中等 leetcode870. 优势洗牌 题目描述 给定两个长度相等的数组 nums1 和 nums2&#xff0c;nums1 相对于 nums2 的优势可以用满足 nums1[i] > nums2[i] 的索引 i 的数目来描…