一、概念
MQ(消息队列):是指在消息传送过程中保存消息的容器,用于分布式系统之间的通信
生产者:是发送消息的用户应用程序。
队列:是存储消息的缓冲区。
消费者:是接收消息的用户应用程序。
1、优劣势
优势
● 应用解耦:使用MQ使得应用间耦合度降低,提高系统容错性和可维护性
● 异步提速:系统将消息发给MQ之后,就可返回用户信息,后续操作异步执行
● 消峰填谷:并发量高时,将消息全部放到MQ中,限制消费的速度为固定并发量,这样就消掉了高峰期的并发量,这就是消峰;但是因为消息积压,在高峰期过后的一段时间内,消费速度也仍旧保持固定并发量,直到消费完积压的消息,这就是填谷
劣势
● 系统可用性降低:一旦MQ宕机
● 系统复杂度提高:如何保证消息没有重复被消费,消息丢失了怎么办,如何保证消息的顺序等等
● 一致性问题:A系统同时给B、C、D发送消息,BC成功,D失败,如何保证数据一致性
2、工作模式
-
简单工作模式:一个生产者对应一个消费者
-
工作队列模式(Work Queues):一个生产者对应多个消费者,多个消费者之间属于竞争关系,当任务比较重时,可以提高处理速度
-
订阅模式(Publish/Subscribe):一个生产者对应多个消费者,多个消费者之间不是竞争关系,在这种模式中引入交换机的概念,交换机类型为fanout
交换机:接收生产者的消息,并将消息推送给队列;交换机必须知道要如何处理他接收到的消息,类型如下:
○ direct:定向,将消息交给符合指定routing key的队列
○ topic:通配符,将消息交给符合制定routing pattern的队列
○ headers:参数匹配
○ fanout:广播,将收到的所有消息广播到它知道的所有队列。发送消息时不需要指定routing key
-
路由模式(Routing):需要设计交换机类型为 direct,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
● 队列与交换机的绑定不再是随意绑定,而是指定要routing key;
● 发送方发送消息时,需要指定routing key;
● 只有队列的routing key与消息的routing key一致,才能接收到消息
-
通配符模式(Topics):需要设计交换机类型为Topics,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
● *(星号)只能代替一个词。
● # (hash) 可以替代零个或多个单词。
二、SpringBoot 整合RabbitMQ
1、引入依赖
implementation 'org.springframework.boot:spring-boot-starter-amqp'
2、配置文件
spring:
rabbitmq:
host: 192.168.252.206
port: 5672
username: admin
password: admin
3、配置类
@Configuration
public class RabbitConfig {
public static String EXCHANGE_NAME = "test_exchange";
public static String QUEUE_NAME = "test_queue";
/**
* 1、交换机
*
* @return
*/
@Bean(name = "testExchange")
public Exchange exchanger() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
* 2、队列
*
* @return
*/
@Bean(name = "testQueue")
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 3、绑定交换机和队列
*
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
}
}
4、生产者发送消息
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
5、消费者接收消息
@Component
public class RabbitMqListener {
@RabbitListener(queues = "test_queue")
public void testListener(Message message) {
System.out.println(message);
}
}
结果如下:
(Body:'hello world!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.kk, deliveryTag=1, consumerTag=amq.ctag-nYz1fLxW0ezTLEcO3W1rVw, consumerQueue=test_queue])
三、特性
1、消息的可靠投递
RabbitMQ为我们提供了两种控制消息可靠性的模式
confirm 确认模式
1、开启确认模式
spring:
rabbitmq:
publisher-confirm-type: correlated
- NONE 禁用发布确认模式,是默认值
- CORRELATED 发布消息成功到交换器后会触发回调方法
- SIMPLE 经测试有两种效果:
- 其一效果和 CORRELATED 值一样会触发回调方法,
- 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;
2、设置ConfirmCallback
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息接收成功!");
} else {
System.out.println("消息接收失败!" + cause);
}
});
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
return 退回模式
当消息发送给Exchange时,Exchange路由到Queue失败,才会执行RetureCallback
1、开启回退模式
spring:
rabbitmq:
publisher-returns: true
2、设置RetureCallback
3、设置Exchange消息处理模式
- 如果消息没有路由到Queue,则丢弃消息
- 如果消息没有路由到Queue,将消息返回给发送方
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
// 设置交换机失败处理模式,true:返回消息给发送方;默认为false,即丢弃消息
rabbitTemplate.setMandatory(true);
// 设置RetureCallback
rabbitTemplate.setReturnsCallback((returned) -> {
System.out.println("return 执行了");
System.out.println(returned);
});
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test1.kk", "hello world!");
}
}
2、Consumer ack
1、设置模式
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
- none:自动确认
- manual:手动确认
2、设置监听器
- 如果在消费端没有出现异常,就调用basicAck()方法签收消息
- 如果在消费端出现异常,就调用basicNack()方法拒绝消息,让mq重新发送
@Component
public class AckListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("处理业务逻辑");
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, true);
}
}
}
3、消费端限流
1、这是Consumer ack的模式为手动确认
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
prefetch: 2
- none:自动确认
- manual:手动确认
- prefetch:表示消费端每次从mq中拉取多少条消息,直到手动确认消费完,才会拉取下一条消息
2、设置监听器
@Component
public class AckListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(message.getBody().toString());
System.out.println("处理业务逻辑");
//channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, true);
}
}
}
4、生产者
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
}
4、TTL
TTL全程 time to live,也就是存活时间/过期时间;当消息到达存活时间后,还没有被消费,会被清除;RabbitMQ可以对消息设置存活时间也可以对队列设置存活时间
对队列统一设置:是对 x-message-ttl 参数设置
对消息单独设置:是对 expiration 参数设置
如果两者都设置了,以时间短的为准
设置队列的存活时间
1、配置队列,将ttl设置为10秒
@Configuration
public class RabbitConfig {
public static String EXCHANGE_NAME = "test_exchange";
public static String QUEUE_NAME = "test_queue";
/**
* 1、交换机
*
* @return
*/
@Bean(name = "testExchange")
public Exchange exchanger() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
* 2、队列
*
* @return
*/
@Bean(name = "testQueue")
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
}
/**
* 3、绑定交换机和队列
*
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
}
}
2、发送mq
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
}
3、去RabbitMQ界面查看,会发现该队列的ready在10秒之后会置0
设置消息的存活时间
1、生产者发送MQ:只需在发送消息时加上messagePostProcessor即可
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!",messagePostProcessor);
}
}
5、死信队列
消息成为死信的条件:
- 消息队列长度达到限制
- 消费者拒收消息:basicNack/basicReject,并且不把消息放回原目标队列:requeue=false
- 原队列存在消息过期设置,消息达到过期时间未被消费
队列绑定死信交换机
- x-dead-letter-exchange
- x-dead-letter-routing-key
1、配置
spring:
rabbitmq:
host: 192.168.252.206
port: 5672
username: admin
password: admin
2、配置类
@Configuration
public class RabbitConfig {
public static String EXCHANGE_NAME = "test_exchange";
public static String QUEUE_NAME = "test_queue";
public static String DEAD_EXCHANGE_NAME = "dead_test_exchange";
public static String DEAD_QUEUE_NAME = "dead_test_queue";
/**
* 1、死信交换机
*
* @return
*/
@Bean(name = "deadTestExchange")
public Exchange deadExchanger() {
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
}
/**
* 2、死信队列
*
* @return
*/
@Bean(name = "deadTestQueue")
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
/**
* 3、绑定死信交换机和死信队列
*
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding bindingDead(@Qualifier(value = "deadTestExchange") Exchange exchange,
@Qualifier(value = "deadTestQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test.kk.#").noargs();
}
/**
* 1、交换机
*
* @return
*/
@Bean(name = "testExchange")
public Exchange exchanger() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
* 2、队列
*
* @return
*/
@Bean(name = "testQueue")
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME)
.ttl(10000)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("test.kk")
.maxLength(3)
.build();
}
/**
* 3、绑定交换机和队列
*
* @param exchange
* @param queue
* @return
*/
@Bean
public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
}
}
3、测试类
- 超时情况
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
- 超出长度
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
}
- 消费端拒收
生产端
@SpringBootTest
class RabbitConfigTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
}
}
消费端
@Component
public class DeadListener implements ChannelAwareMessageListener {
@RabbitListener(queues = "test_queue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
int i = 3 / 0;
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, false);
}
}
}
6、延迟队列
延迟队列:消息进入消费端之后,不会立马被消费,会在指定时间达到后,才会消费。RabbitMQ通过TTL和死信队列实现延迟队列
- 只需设置队列或者消息过期时间,当消息过期后即可进入死信队列
- 消费端监听队列要监听死信队列