RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

文章目录

  • 一、RabbitMQ消息可靠性概述
    • 1、引出问题
    • 2、RabbitMQ消息可靠性保证的四个环节
  • 二、保证生产者消息发送到RabbitMQ服务器
    • 1、服务端确认:Transaction模式
      • (1)JavaAPI
      • (2)springbootAPI
    • 2、服务端确认:Confirm模式
      • (1)JavaAPI
      • (2)springbootAPI
  • 三、保证消息从交换机路由到队列
    • 1、消息回发
      • (1)JavaAPI
      • (2)SpringbootAPI
    • 2、消息路由到备份交换机
      • (1)JavaAPI
  • 四、保证消息在队列中可靠存储
    • 1、队列持久化
    • 2、交换机持久化
    • 3、消息持久化
    • 4、搭建集群
  • 五、保证消费者成功消费消息
    • 1、自动ACK
    • 2、手动ACK
  • 六、保证消息一致之消费者回调
    • 1、调用生产者API
    • 2、发送响应消息给生产者
  • 七、保证消息一致性之补偿机制
    • 1、消息结果查证
    • 2、消息重发
    • 3、消息幂等性
    • 4、最终一致性
  • 八、消息的顺序性

一、RabbitMQ消息可靠性概述

1、引出问题

我们先看一串代码,并思考一下为什么要先入库然后发MQ:

public int add(Merchant merchant) {
    int k = merchantMapper.add(merchant);
    System.out.println("aaa : "+merchant.getId());
    JSONObject title = new JSONObject();
    String jsonBody = JSONObject.toJSONString(merchant);
    title.put("type","add");
    title.put("desc","新增商户");
    title.put("content",jsonBody);
    rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
    return k;
}

如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。

2、RabbitMQ消息可靠性保证的四个环节

在这里插入图片描述
① 消息从生产者发送到Broker
生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接收?如果Broker不给应答,生产者发送的消息也无法知道成功还是失败。

② 消息从Exchange路由到Queue
Exchange交换机维护一个与Queue的绑定列表,它的职责是分发消息。如果交换机处理消息的分发出现问题怎么办?

③ 消息存储在Queue
RabbitMQ的队列有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,消息要一直存储在队列中。队列中的消息有没有丢失的可能?怎么保证消息在队列中稳定的存储呢?

④ 消费者订阅Queue并消费消息
队列是先进先出的,消息投递给消费者是一条一条投递的,如何能保证消费者正确地消费了消息?

二、保证生产者消息发送到RabbitMQ服务器

生产者发送RabbitMQ消息时,如果遇到了网络问题或者Broker的问题(硬盘故障、磁盘写满等),就会导致消息发送失败,生产者无法确定Broker是否正确地接收到消息。

在RabbitMQ中提供了两种生产者确认机制,也就是说生产者发送消息给RabbitMQ时,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

1、服务端确认:Transaction模式

在事务模式里面,只有收到了服务端的Commit-OK指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。
在这里插入图片描述

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试事务模式。发送消息的效率比较低,建议使用Confirm模式
 * 参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class TransactionProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
        	// 开启事务
            channel.txSelect();
            // 发送消息,发布了4条,但只确认了3条
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit(); // 提交
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            int i =1/0;
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (Exception e) {
            channel.txRollback(); // 回滚
            System.out.println("消息已经回滚");
        }

        channel.close();
        conn.close();
    }
}

(2)springbootAPI

rabbitTemplate.setChannelTransacted(true);

2、服务端确认:Confirm模式

Confirm模式有三种,单个确认、批量确认、异步确认。

在这里插入图片描述

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 普通确认模式
 */
public class NormalConfirmProducer {

    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Normal Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启发送方确认模式
        channel.confirmSelect();

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // 普通Confirm,发送一条,确认一条
        if (channel.waitForConfirms()) {
            System.out.println("消息发送成功" );
        }else{
            System.out.println("消息发送失败");
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试批量Confirm模式
 */
public class BatchConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Batch Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
            // 开启confirm模式
            channel.confirmSelect();
            for (int i = 0; i < 5; i++) {
                // 发送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            }
            // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
            // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
            // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送完毕,批量确认成功");
        } catch (Exception e) {
            // 发生异常,可能需要对所有消息进行重发
            e.printStackTrace();
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * 消息生产者,测试异步Confirm模式
 *  参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class AsyncConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, Async Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 用来维护未确认消息的deliveryTag
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
        // 异步监听确认和未确认的消息
        // 如果要重复运行,先停掉之前的生产者,清空队列
        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未确认消息,标识:" + deliveryTag);
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // 这里添加重发的方法
            }
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一个元素
                    confirmSet.remove(deliveryTag);
                }
                System.out.println("未确认的消息:"+confirmSet);
            }
        });

        // 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 发送消息
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            confirmSet.add(nextSeqNo);
        }
        System.out.println("所有消息:"+confirmSet);

        // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
        //channel.close();
        //conn.close();
    }
}

(2)springbootAPI

Confirm模式是在Channel上开启的,RabbitTemplate对Channel进行了封装。

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("--------收到服务端异步确认--------");
        System.out.println("received ack: "+ack);
        System.out.println("cause: "+cause);
        System.out.println("correlationId: "+correlationData.getId());
        if (!ack) {
            System.out.println("发送消息失败:" + cause);
            throw new RuntimeException("发送异常:" + cause);
            // 做一些回滚、重试、记录处理
        } else {
			System.out.println("消息确认成功");
		}
    }
});

三、保证消息从交换机路由到队列

如果routingkey错误,或者队列不存在(但是生产环境基本不会出现这么低级的错误),就会导致消息从交换机无法路由到队列。

我们有两种方式处理无法路由的消息,一种是服务端回发给生产者,一种是让交换机路由到另一个备份的交换机。

1、消息回发

(1)JavaAPI

channel.addReturnListener(new ReturnListener() {
	   public void handleReturn(int replyCode,
	                            String replyText,
	                            String exchange,
	                            String routingKey,
	                            AMQP.BasicProperties properties,
	                            byte[] body)
	           throws IOException {
	       System.out.println("=========监听器收到了无法路由,被返回的消息============");
	       System.out.println("replyText:"+replyText);
	       System.out.println("exchange:"+exchange);
	       System.out.println("routingKey:"+routingKey);
	       System.out.println("message:"+new String(body));
	   }
});

(2)SpringbootAPI

Springboot消息回发是使用mandatory参数和ReturnListener(在SPringAMQP中是ReturnCallback)

// 为RabbitTemplate设置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            System.out.println("--------收到无法路由回发的消息--------");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
            System.out.println("properties:" + message.getMessageProperties());
            System.out.println("body:" + new String(message.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
});

2、消息路由到备份交换机

在创建交换机时,从属性中指定备份交换机。

(1)JavaAPI

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
        contentEncoding("UTF-8").build();

// 备份交换机
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");

// 在声明交换机的时候指定备份交换机
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);


// 发送到了默认的交换机上,由于没有任何队列使用这个关键字跟交换机绑定,所以会被退回
// 第三个参数是设置的mandatory,如果mandatory是false,消息也会被直接丢弃
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只为更好的你".getBytes());

四、保证消息在队列中可靠存储

如果消息一直没有被消费者消费,消息会一直存储在队列中,并且队列中的数据存储在数据库中。

如果RabbitMQ服务或者硬件发生了故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘中。

1、队列持久化

声明队列时可以设置几个重要的参数:

// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// springboot中
@Bean("reliableQueue")
public Queue queue() {
	// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}

参数解析:
durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会丢失。设置为true可以保证消息持久化
exclusive:排他性队列的特点是:只对首次声明它的连接(Connection)可见;会在其连接断开的时候自动删除。
autoDelete:没有消费者连接的时候,自动删除。

2、交换机持久化

声明交换机时可以设置这几个重要的参数,和队列类似:

// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {
	// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}

3、消息持久化

在生产者发送消息时,可以指定消息的配置:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)   // 2代表持久化
        .contentEncoding("UTF-8")  // 编码
        .expiration("10000")  // TTL,过期时间
        .headers(headers) // 自定义属性
        .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用
        .messageId(String.valueOf(UUID.randomUUID()))
        .build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一条正常的消息".getBytes(), messageProperties);

4、搭建集群

如果只有一个RabbitMQ节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件故障,RabbitMQ的服务一样是不可以的。

所以为了提高MQ服务的可用性,保证消息的传输,我们需要有多个RabbitMQ的节点。

RabbitMQ集群搭建与高可用实现

五、保证消费者成功消费消息

如果消费者收到消息后没来得及处理或者发生了一场,就会导致消费失败。RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端。

如果消费者拿到消息没有ACK会怎么样?
没有收到ACK的消息,消费者断开连接之后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

1、自动ACK

默认就是自动ACK。消费者收到消息的时候就会自动发送ACK,而不是方法执行成功的时候发送ACK。这种情况RabbitMQ只会关心消费者是否接收到了消息,对消息的处理结果是不关心的,所以通常来说我们会选择手动ACK。

// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");
        System.out.println("consumerTag : " + consumerTag );
        System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
    }
};
// 开始获取消息 autoAck参数为true代表自动ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);

同样的,在Springboot中如果没有特殊配置,默认的就是自动ACK。

2、手动ACK

// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");

        if (msg.contains("拒收")){
            // 拒绝消息
            // requeue:是否重新入队列,true:是,会再发给其他消费者;false:直接丢弃,相当于告诉队列可以直接删除掉
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicReject(long deliveryTag, boolean requeue)
            channel.basicReject(envelope.getDeliveryTag(), false);
        } else if (msg.contains("异常")){
            // 批量拒绝
            // requeue:是否重新入队列
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicNack(long deliveryTag, boolean multiple, boolean requeue)
            channel.basicNack(envelope.getDeliveryTag(), true, false);
        } else {
            // 手工应答
            // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
            // basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(envelope.getDeliveryTag(), true);
        }
    }
};

// 开始获取消息,注意这里开启了手工应答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

在Springboot中,可以这样配置应答方式:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中这样设置:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

消费者中获取到Channel对象,就可以进行应答了:

@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent,Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

注意这个枚举值:
NONE:自动ACK
MANUAL:手动ACK
AUTO:如果方法未抛出异常,则发送ACK。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队。如果抛出异常是AmqpRejectAndDontRequeueException则发送nack并不会重新入队。

注意!拒绝消息时如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者处理,如果只有一个消费者时,这种方式可能会出现无限循环重复消费的情况。

六、保证消息一致之消费者回调

消费者成功消费了消息之后,如何告知生产者该消息已经被成功消费了?

虽然说使用MQ的目的之一是解耦,但是某些一致性要求很高的场景,比如说金融业务,还是很有必要通知生产者的。

1、调用生产者API

例如,提单系统给其他系统发送了保险消息后,其他系统必须在处理完消息之后调用提单系统提供的API,来修改提单系统中这笔数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

但是这种方式又从解耦的状态变成了耦合状态,还是需要根据实际情况来判断是否要采用这种方式。

2、发送响应消息给生产者

例如商业银行与人民银行二代支付系统(使用IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。

整个通信的流程设计非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的。
在这里插入图片描述

七、保证消息一致性之补偿机制

如果消费者消费了消息之后,一直迟迟没有通知给生产者,我们如何知道消费者已经成功消费了消息?
就像是银行A给银行B发起了一笔转账,银行B一直没有给银行A回调,此时我们如何确定银行B确定是已经处理了该消息?

此时生产者与消费者之间应该约定一个超时时间,对于超出这个时间没有得到响应的消息,才确定为消费失败,比如说5分钟。

1、消息结果查证

假如说生产者5分钟内没有收到消费者消费成功的回调,生产者可以主动发起一次结果查证,通过业务要素或者唯一流水号,查证该业务在消费者是否正常消费。

2、消息重发

假如说消息一直没有结果,就需要考虑消息重发了。

可以启动一个定时任务,比如30秒跑一次,查询业务表业务状态是中间状态的记录,查询出来,构建MQ消息,重新发送。也可以单独设计一张消息表,把本系统所有发送出去的消息全部异步登记,状态是未回复的消息,进行重发(这种方式会对数据库性能造成一定损耗)。

重发机制也不可能一直重复发,如果消费者确实是有bug或者其他问题,如果一直重复发送会导致死循环了。我们可以设置一个衰减机制,第一次间隔一分钟,第二次间隔2分钟,最终发送三次,三次过后如果还没有收到回复,就需要将消息设置为特殊状态,进行人工干预。

3、消息幂等性

消息重发就意味着要处理消息的幂等。

什么是服务的幂等?为什么要实现幂等?
接口的幂等性——详细谈谈接口的幂等即解决方案

4、最终一致性

在一些一致性很强的接口调用中,比如说转账操作,通常会在一天的业务结束之后,第二天营业之前,生产者和消费者之间会进行一次消息对账。生成一个对账文件,两者分别解析对方的文件进行对账,如果确实有消息不一致的情况,会通过短信或者邮件的方式通知业务人员进行手动处理,要么把钱退回,要么把钱补上。

八、消息的顺序性

在RabbitMQ中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用队列)。

除非负载的场景,不要用多个消费者消费消息,可以保证消息的顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/38674.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

laravel 的SQL使用正则匹配

案例场景 精准正则匹配 查询结果 代码如下 $regexp ^ . $new_str . [^0-9];$info Test::query()->where(is_del, 0)->whereRaw("name REGEXP $regexp")->pluck(name, id)->toArray();字符 “^” 匹配以特定字符或者字符串开头的文本 name 字段值包含…

【kubernetes系列】Kubernetes之调度器和调度过程

Kubernetes之调度器和调度过程 概述 当用户请求向API server创建新的Pod时&#xff0c;API server检查授权、权限等没有任何问题的话&#xff0c;他会把这个请求交由Scheduler&#xff0c;由Scheduler检查所有符合该Pod要求的列表&#xff0c;开始执行Pod调度逻辑&#xff0c…

AI智能助手的未来:与人类互动的下一代人工智能技术

自我介绍⛵ &#x1f4e3;我是秋说&#xff0c;研究人工智能、大数据等前沿技术&#xff0c;传递Java、Python等语言知识。 &#x1f649;主页链接&#xff1a;秋说的博客 &#x1f4c6; 学习专栏推荐&#xff1a;人工智能&#xff1a;创新无限、MySQL进阶之路、C刷题集、网络安…

WEB APIs day4 (1)

一、日期对象 1.实例化 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevi…

数据结构——六大排序 (插入,选择,希尔,冒泡,堆,快速排序)

1. 插入排序 1.1基本思路 把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 我们熟知的斗地主就是一个插入排序 1.2 代码实现 我们这里将一个无序数组变成有序数组 插入排序时…

antd-React Table 中文转化

1.首先需要进行中文包导入 2.引入标签对Table进行包裹即可 import zh_CN from antd/lib/locale-provider/zh_CN;import {ConfigProvider} from antd;<ConfigProvider locale{zh_CN}><Tablecolumns{columns}rowKey{record > record.id}dataSource{data}pagination{p…

SQL进阶(2)——SQL语句类型 增删改查CRUD 事务初步 表关联关系 视图 +索引

目录 引出SQL语句类型1.DML数据操纵语言&#xff08;重点&#xff09;2.DQL数据查询语言&#xff08;重点&#xff09;3.DDL(Data Definition Language了解)4.DCL(Data Control Language了解)5.TCL 事务控制语言 运算符和其他函数1.运算符2.其它函数增删改查CRUD 视图索引事务1…

如何克服Leetcode做题的困境

文章目录 如何克服Leetcode做题的困境问题背景克服困境的建议实践与理论结合切忌死记硬背分析解题思路不要过早看答案迭代式学习寻求帮助坚持与耐心查漏补缺 结论 如何克服Leetcode做题的困境 问题背景 明明自觉学会了不少知识&#xff0c;可真正开始做Leetcode题目时&#x…

基于单片机的恒温恒湿温室大棚温湿度控制系统的设计与实现

功能介绍 以51单片机作为主控系统&#xff1b;液晶显示当前温湿度按键设置温湿度报警上限和下限&#xff1b;温度低于下限继电器闭合加热片进行加热&#xff1b;温度超过上限继电器闭合开启风扇进行降温湿度低于下限继电器闭合加湿器进行加湿湿度高于上限继电器闭合开启风扇进行…

JavaWeb(3)——HTML、CSS、JS 快速入门

一、JavaScript 运算符 • 赋值运算符&#xff08; &#xff09; 赋值运算符执行过程&#xff1f; 将等号右边的值赋予给左边, 要求左边必须是一个容器 出现是为了简化代码, 比如让 let age 18 &#xff0c;age 加 2 怎么写呢 let age 18age 2console.log(age)age * 2con…

苹果APP安装包ipa如何安装在手机上

苹果APP安装包ipa如何安装在手机上 苹果APP的安装比安卓复杂且困难&#xff0c;很多人不知道如何将ipa文件安装到手机上。以下是几种苹果APP安装在iOS设备的方式&#xff0c;供大家参考。 一、上架App Store 这是最正规的方式。虽然审核过程复杂、时间较长&#xff0c;且审核…

关于电脑显示器屏幕看不出灰色,灰色和白色几乎一样无法区分,色彩调整方法

问题&#xff1a; 电脑显示器屏幕看不出灰色&#xff0c;灰色和白色几乎一样无法区分。白色和灰色有色差。 解决方法&#xff1a; 打开“控制面板” ->“色彩管理” ->“高级” ->“校正显示器” 在下一步调节中调成中间这一个实例的样子就可以了 进行微调&#x…

基于linux下的高并发服务器开发(第二章)- 2.9 waitpid 函数

#include <sys/types.h> #include <sys/wait.h>pid_t waitpid(pid_t pid, int *wstatus, int options); 功能&#xff1a;回收指定进程号的子进程&#xff0c;可以设置是否阻塞。 参数&#xff1a; - pid: pid > 0…

小白到运维工程师的自学之路 第五十四集 (ansible自动化运维工具)

一、概述 Ansible是一种开源的自动化工具&#xff0c;用于自动化任务的执行、配置管理和应用部署。它采用基于Python编写的简单、轻量级的语法&#xff0c;可以通过SSH协议远程管理和配置多台计算机。 Ansible的主要特点包括&#xff1a; 1、简单易用&#xff1a;设计简单&a…

(33)接收信号强度指示(RSSI)

文章目录 前言 33.1 在你的自动驾驶仪上设置RSSI 33.2 在MissionPlanner的HUD中显示RC接收器的RSSI值 33.3 连接实例 33.4 特殊用例 前言 本文介绍了如何获取自动驾驶仪的接收信号强度指示&#xff08;RSSI&#xff09;。 33.1 在你的自动驾驶仪上设置RSSI RSSI 可通过一…

ChatGPT变现五个思路

一、前言 ChatGPT是一款AI聊天机器人&#xff0c;发布于2022年11月。凭借着在广泛的知识领域为消费者问题做出清晰、详尽解答的出色能力&#xff0c;其一经推出就引发全球轰动&#xff0c;自然也得到零售行业的高度关注。例如&#xff0c;消费者只要询问ChatGPT如何布置一个梦…

flink1.16读取hive数据存到es 本地和服务器上遇到的问题和解决思路

话不多说 直接上官网 Overview | Apache Flink hive版本 3.1.3000 ​ hadoop 版本 3.1.1.7.1.7 ​ flink 1.16.2 ​ 代码 很简单我还是贴下 import com.fasterxml.jackson.databind.ObjectMapper import com.typesafe.config.{Config, ConfigFactory} import org.apache…

Redis可视化工具 - Another Redis Desktop Manager 安装与使用详细步骤

一、下载安装 Another Redis Desktop Manager AnotherRedisDesktopManager 发行版 - Gitee.com&#xff08;gitee&#xff09; 2. 安装 以管理员身份运行下载的安装包 选择是为所有用户还是当前用户安装&#xff0c;按需选择 选择安装位置&#xff0c;点击安装进行安装 安装…

Unity UnityWebRequest使用http与web服务器通讯

一、搭建客户端与服务器http通讯 1.在Nodejs中文官网Node.js 中文网 (nodejs.com.cn)&#xff0c;下载并安装Nodejs 2.在项目文件夹下新建WebServer文件夹&#xff0c;打开CMD窗口&#xff0c;在WebServer文件夹路径下安装express 3.在WebServer文件夹中新建main.js文件&#…

Unity游戏源码分享-迷你高尔夫球游戏MiniGolfConstructionKitv1.1

Unity游戏源码分享-迷你高尔夫球游戏MiniGolfConstructionKitv1.1 有多个游戏关卡 工程地址&#xff1a;https://download.csdn.net/download/Highning0007/88052881