本人理解,顺序消息如果不分消息组,那么会影响并行处理速度,所以尽量消息组分的散一些
首先上要求,官方文档如下:
总结:
1.必须同一个消息组,消息组和消费组不是一个概念,不要混
2.必须单一生产者,也就是说线上生产只能开一个 pod,感觉局限有点高,无法多 pod 接入
3.必须串行发送,这个点也不太好,限制过高
以上三点
在我的案例中(企业微信回调消息)
那么只能开一个接口服务Pod 来接收微信回调,如果挂了,那完蛋,开多个 pod的话,那只有把请求放队列,通过队列pop进行消费再生产消息到 mq,这样同时也解决了第三点的串行发送。
生产的时候如何确定是顺序消息,只需要生产消息的时候给设定一个消息组
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// 这里设置消息组
msg.SetMessageGroup("fifo")
分组分的越细越好
提高消费速度
当拿到消息后,根据消息分组来进行并发处理,每个分组内进行串行处理,关键代码如下
func (m *RocketMq) ConsumerOrderly(funcMap map[string]func([]byte) error) {
var err error
m.consumerOnce.Do(func() {
Log().Info("##############pro consume orderly start#############", m.MqConfig)
errTmp := m.proSimConsumer.Start()
if errTmp != nil {
Log().Panic("MQ启动失败", errTmp)
return
}
})
defer m.proSimConsumer.GracefulStop()
// 总体保证有N个在运行
var ch = make(chan int, m.MaxGoroutine)
for {
fmt.Println("start receive message")
mvs, errReceive := m.proSimConsumer.Receive(context.TODO(), 8, 20*time.Second)
if errReceive != nil {
if strings.Contains(errReceive.Error(), "no new message") {
Log().Info(errReceive)
} else {
Log().Error("顺序消息,拉取MQ消息失败:", errReceive)
}
time.Sleep(time.Second * 2)
continue
}
var msgGroupMap = make(map[string][]*rmq_client.MessageView, 0)
for _, v := range mvs {
if _, ok := funcMap[*v.GetTag()]; !ok {
Log().Error(v.GetTag(), ": action do not exist")
continue
} else {
// 根据msgGroup汇总
msgGroup := v.GetMessageGroup()
msgGroupMap[*msgGroup] = append(msgGroupMap[*msgGroup], v)
}
}
// 最大程度多线程消费
for _, item := range msgGroupMap {
ch <- 1
go func(item []*rmq_client.MessageView) {
for _, v := range item {
fmt.Println(*v.GetTag(), *v.GetMessageGroup(), string(v.GetBody()))
action, _ := funcMap[*v.GetTag()]
if errTmp := action(v.GetBody()); errTmp != nil {
Log().Error("mq顺序消费失败", errTmp)
break
} else {
m.proSimConsumer.Ack(context.TODO(), v)
}
}
<-ch
}(item)
}
}
}
最后需要注意的点:
同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失
请保证订阅一致性