这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
背景
我们都知道RocketMQ
5.x
新增了proxy
模式部署方式,也就是支持了GRPC
的消费方式消费,所以今天我们来试试
本次使用的开发语言是goland
前置条件
这里默认我们已经部署了RocketMQ
proxy
,如果不会部署的可以参考我之前的文章
依赖管理
本次使用的依赖管理方式是go.mod
使用的goland
sdk是github.com/apache/rocketmq-clients/golang
也就是这个开源项目
我们直接执行
go get github.com/apache/rocketmq-clients/golang@master
以master
分支作为我们的依赖
发送消息
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "xiao-zou-topic"
Endpoint = "127.0.0.1:8081"
AccessKey = "xxxxxx"
SecretKey = "xxxxxx"
)
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// new producer instance
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
我们可以直接运行,然后看到消息发送成功了
消息消费
package main
import (
"context"
"fmt"
"log"
"os"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang"
"github.com/apache/rocketmq-clients/golang/credentials"
)
const (
Topic = "xiao-zou-topic"
ConsumerGroup = "gid-xiaozou-grpc"
Endpoint = "127.0.0.1:8081"
AccessKey = "xxxxxx"
SecretKey = "xxxxxx"
)
var (
// maximum waiting time for receive func
awaitDuration = time.Second * 5
// maximum number of messages received at one time
maxMessageNum int32 = 16
// invisibleDuration should > 20s
invisibleDuration = time.Second * 20
// receive messages in a loop
)
func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// new simpleConsumer instance
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithAwaitDuration(awaitDuration),
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer simpleConsumer.GracefulStop()
go func() {
for {
fmt.Println("start receive message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
simpleConsumer.Ack(context.TODO(), mv)
msg := string(mv.GetBody())
fmt.Println(msg)
}
fmt.Println("wait a moment")
fmt.Println()
time.Sleep(time.Second * 3)
}
}()
select {}
}
执行结果:
源码
相关源码已上传到github,需要可以自取
https://github.com/weihubeats/java-to-go-learning/tree/main/student/rocketmq-demo
总结
可以看到我们使用GRPC
的方式消费和发送消息都成功了,但是需要注意的是目前rocketmq-clients
还不是很稳定,有一些bug,生产使用还是需要谨慎