目录
1. RabbitMQ 的 发布订阅模式
2. GRPC 服务间的实体同步
2.1 生产者服务
2.2 消费者服务
3. 可靠性
3.1 生产者丢失消息
3.2 消费者丢失消息
3.3 RabbitMQ 中间件丢失消息
1. RabbitMQ 的 发布订阅模式
https://www.rabbitmq.com/tutorials/tutorial-three-go
- P 生产者:发送消息到一个特定的交换机(交换机类型是
fanout
),不需指定具体的目标队列 - X 交换机:将消息分发给所有绑定到它的队列
- C 消费者:订阅主题,通过绑定到交换机的队列接收消息
2. GRPC 服务间的实体同步
考虑以下业务需求——
- 模拟消费者组机制:
- 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
- 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
- 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据
服务之间的实体数据同步方案:
2.1 生产者服务
(1) 初始化
生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:
- 声明交换机 exchange_user、exchange_group
- 声明消费者 consumer_user_rpc、consumer_org_rpc
- 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
- 将交换机和队列绑定
(2) 实体变更时发送消息
发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。
2.2 消费者服务
消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。
- 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
- 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费
Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~
3. 可靠性
3.1 生产者丢失消息
(1) 生产者绑定交换机和队列
在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。
func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {
// ...
// 初始化交换机和队列
for topic, consumerGroups := range option.TopicConsumerGroupsBinding {
err = initExchange(topic, consumerGroups, mq)
if err != nil {
return nil, err
}
}
return mq, nil
}
func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {
// 1. 创建发送通道
pch, err := mq.conn.Channel()
if err != nil {
return err
}
mq.produceChannels[exchange] = pch
// 2. 开启消息确认机制
if err := pch.Confirm(false); err != nil {
return err
}
// 3. 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
if err != nil {
return err
}
slog.Info("rabbitmq declared exchange", "exchange_name", exchange)
// 4. 创建队列并绑定到交换机
for _, consumerGroup := range strings.Split(consumerGroups, ",") {
consumerGroup = strings.TrimSpace(consumerGroup)
if consumerGroup == "" {
continue
}
queue := queueName(exchange, consumerGroup)
// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
return err
}
// 将队列绑定到交换机
// 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数
err = pch.QueueBind(queue, "", exchange, false, nil)
if err != nil {
return err
}
slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)
// 创建接收通道
cch, err := mq.conn.Channel()
if err != nil {
return err
}
mq.consumeChannels[queue] = cch
}
// 5. 开启消息确认事件监听、消息投递事件监听
mq.publishWatcher[exchange] = &watcher{
returnCh: pch.NotifyReturn(make(chan amqp.Return)),
confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
}
// 监听未被交换机投递的消息
go func() {
for ret := range mq.publishWatcher[exchange].returnCh {
// 尝试重新投递
ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)
if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {
slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)
} else {
slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)
}
time.Sleep(time.Second * 3)
}
}()
return nil
}
(2) 发送重试
发送消息时增加重试机制。若超过重试上限,需记录日志或报警。
func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {
body, _ := json.Marshal(data)
msgID := uuid.New()
var retried int
for {
err := r.publish(ctx, topic, msgID, body, time.Now())
if err == nil {
return nil
}
retried++
if retried > r.option.RetryNum {
return err
}
time.Sleep(r.option.RetryInterval)
}
}
(3) confirm 消息确认机制
生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:
func initExchange(exchange string, mq *RabbitMQ) error {
// ...
// 开启消息确认机制
if err := pch.Confirm(false); err != nil {
return err
}
// 创建监听器
mq.publishWatcher[exchange] = &watcher{
confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
}
// ...
}
func (r *RabbitMQ) publish(ctx context.Context, ...) error {
// publish发送消息
// ...
// 等待rabbitmq返回消息确认
select {
case confirm := <-r.publishWatcher[exchange].confirmCh:
if !confirm.Ack {
return errors.New("publish failed, got nack from rabbitmq")
}
case <-ctx.Done():
return errors.New("context deadline, publish to rabbitmq timeout")
case <-time.After(r.config.Timeout):
return errors.New("publish to rabbitmq timeout")
}
return nil
}
3.2 消费者丢失消息
消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:
func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {
// ...
return consumeChannel.Ack(deliveryTag, false)
}
如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:
func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {
// ...
return consumeChannel.Nack(deliveryTag, false, true)
}
3.3 RabbitMQ 中间件丢失消息
(1) 数据持久化到磁盘
交换机持久化(durable=true):
// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
队列持久化(durable=true):
// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)
消息持久化(DeliveryMode=Persistent):
err := ch.Publish(
exchange, // 交换机名称
"", // 路由键
true, // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息
false, // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃
amqp.Publishing{
MessageId: msgID, // 消息ID
ContentType: "application/json", // 消息内容类型
Body: body, // 消息内容
DeliveryMode: amqp.Persistent, // 消息需要持久化
Timestamp: t, // 消息时间
},
)
(2) RabbitMQ 本身的数据一致性保证
RabbitMQ 使用 raft 共识算法保证数据一致性:
https://www.rabbitmq.com/docs/clustering#replica-placement