Springboot整合RabbitMQ
文章目录
- Springboot整合RabbitMQ
- 1.pom依赖
- 2.yml配置
- 3.配置队列、交换机
- 方式一:直接通过配置类配置bean
- 方式二:消息监听通过注解配置
- 4.编写消息监听发送测试
- 5.其他类型交换机配置
- 1.FanoutExchange
- 2.TopicExchange
- 3.HeadersExchange
- 6.延迟消息处理(TTL)
- 方式一:ttl配置
- 方式二:消息发送设置
- 7.死信队列
1.pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
2.yml配置
#配置使用的配置文件
spring:
#配置rabbitmq
rabbitmq:
host: 47.122.26.28 #主机地址
port: 5672 #端口号
username: xxx #用户名
password: xxx #密码
virtual-host: my_vhost #虚拟主机地址
#开启消息送达提示
publisher-returns: true
# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果
publisher-confirm-type: correlated
listener: #消息监听配置
type: simple
simple:
acknowledge-mode: manual #manual手动确认消息 auto没有异常时 进行自动确认 (异常类型 消息重新入队)
prefetch: 1 #限制每次发送一条数据。
concurrency: 3 #同一个队列启动几个消费者
max-concurrency: 3 #启动消费者最大数量
#重试策略相关配置
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
server:
port: 18082
address: 127.0.0.1
servlet:
context-path: /
3.配置队列、交换机
方式一:直接通过配置类配置bean
推送消息时不存在创建队列和交换机
/**
* direct模式声明配置
*/
@Configuration
public class RabbitDirectConfig {
public static final String EXCHANGE_NAME="direct-exchange";
public static final String QUEUE_NAME="direct-queue";
public static final String BINDING_KEY="change:direct";
/**
* 声明直连交换机
* name:交换机的名称
* durable 队列是否持久化
* autoDelete:是否自动删除,(当该交换机上绑定的最后一个队列解除绑定后,该交换机自动删除)
* argument:其他一些参数
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME,false,false,null);
}
/**
* 声明队列
* queue 队列的名称
* durable 队列是否持久化
* exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
*/
@Bean
public Queue directQueue() {
return new Queue(QUEUE_NAME,false,false,false,null);
}
/**
* 交换机队列绑定
*/
@Bean
public Binding springExchangeBindSpringQueue() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(BINDING_KEY);
}
}
方式二:消息监听通过注解配置
启动时创建队列和交换机
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "direct1-queue",durable = "true"),
exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),
key = "change1:direct"))
注意:rabbitmq同名的队列只能创建一个,创建多个会报错,推送消息时需确保队列和交换机已存在,
方式一队列和交换机在第一次推送消息时才会自动创建队列和交换机,方式二注解在启动时就会创建
4.编写消息监听发送测试
监听
@Slf4j
@Component
public class RabbitMQListener {
@RabbitListener(queues = "direct-queue")
@RabbitHandler
public void bootMsg(Channel channel, Message message){
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" direct 消费者:'" + message1 + "'");
//手动确认该消息
try {
//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
log.error("执行异常",e);
// 拒绝消息并重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "direct1-queue",durable = "true"),
exchange = @Exchange(value = "direct1-exchange",type = ExchangeTypes.DIRECT,durable = "true"),
key = "change1:direct"))
@RabbitHandler
public void bootMsg1(Channel channel, Message message){
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" direct 消费者:'" + message1 + "'");
//手动确认该消息
try {
//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
log.error("执行异常",e);
// 拒绝消息并重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
测试
@Slf4j
@SpringBootTest(classes = RabbitProviderApplication.class)
public class RabbitTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void directProvider(){
String message = "direct模式消息推送。。。。。";
/**
* 参数分别为,交换机,路由key,消息体
*/
amqpTemplate.convertAndSend("direct-exchange","change:direct",message);
System.out.println(" 消息发送 :'" +message + "'");
}
@Test
public void directProvider1(){
String message = "direct模式消息推送1。。。。。";
/**
* 参数分别为,交换机,路由key,消息体
*/
amqpTemplate.convertAndSend("direct1-exchange","change1:direct",message);
System.out.println(" 消息发送1 :'" +message + "'");
}
}
5.其他类型交换机配置
1.FanoutExchange
/**
* fanout模式声明配置
*/
@Configuration
public class RabbitFanoutConfig {
public static final String EXCHANGE_NAME="fanout-exchange";
public static final String QUEUE_NAME1="fanout-queue1";
public static final String QUEUE_NAME2="fanout-queue2";
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME,false,false,null);
}
/**
* 声明队列
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(QUEUE_NAME1,false,false,false,null);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(QUEUE_NAME2,false,false,false,null);
}
/**
* 交换机队列绑定
*/
@Bean
public Binding springExchangeBindQueue1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* 交换机队列绑定
*/
@Bean
public Binding springExchangeBindQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
监听
@RabbitListener(queues = "fanout-queue1")
public void fanoutMsg1(Channel channel, Message message) {
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" fanout-queue1 消费者:'" + message1 + "'");
}
@RabbitListener(queues = "fanout-queue2")
public void fanoutMsg2(Channel channel, Message message) {
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" fanout-queue2 消费者:'" + message1 + "'");
}
测试
@Test
public void fanoutProvider(){
String message = "fanout模式消息推送。。。。。";
amqpTemplate.convertAndSend("fanout-exchange", "",message);
System.out.println(" 消息发送 :'" +message + "'");
}
2.TopicExchange
/**
* topic模式声明配置
*/
@Configuration
public class RabbitTopicConfig {
public static final String EXCHANGE_NAME="topic-exchange";
public static final String QUEUE_NAME="topic-queue";
public static final String BINDING_KEY="*.orange.#";
/**
* 声明交换机
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME,false,false,null);
}
/**
* 声明队列
*/
@Bean
public Queue topicQueue() {
return new Queue(QUEUE_NAME,false,false,false,null);
}
/**
* 交换机队列绑定
*/
@Bean
public Binding topicExchangeBindQueue() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(BINDING_KEY);
}
}
@RabbitListener(queues = "topic-queue")
public void topicMsg2(Channel channel, Message message) {
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" topic-queue2 消费者:'" + message1 + "'");
}
测试
@Test
public void topicProvider(){
String message1 = "topic test模式消息推送。。。。。";
String message2 = "topic test.aaa模式消息推送。。。。。";
amqpTemplate.convertAndSend("topic-exchange", "com.orange.test",message1);
amqpTemplate.convertAndSend("topic-exchange", "com.orange.test.aaa",message2);
System.out.println(" 消息发送");
}
3.HeadersExchange
/**
* headers模式声明配置
* 与路由key无关,只需要消息的头参数匹配即可
* x-match参数代表是全部匹配还是部分匹配
*/
@Configuration
public class RabbitHeadersConfig {
public static final String EXCHANGE_NAME="headers-exchange";
public static final String QUEUE_NAME="headers-queue";
public static final String QUEUE_NAME1="headers-queue1";
/**
* 声明交换机
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(EXCHANGE_NAME,false,false,null);
}
/**
* 声明队列
*/
@Bean
public Queue headersQueue() {
return new Queue(QUEUE_NAME,false,false,false,null);
}
@Bean
public Queue headersQueue2() {
return new Queue(QUEUE_NAME1,false,false,false,null);
}
/**
* 交换机队列绑定(任意匹配)
* whereAny 等同于x-match = any
*/
@Bean
public Binding headersExchangeBindSpringQueue() {
HashMap<String, Object> header = new HashMap<>();
header.put("test", "111");
header.put("test1", "222");
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAny(header).match();
}
/**
* 交换机队列绑定(全部匹配)
* whereAny 等同于x-match = all
*/
@Bean
public Binding headersExchangeBindSpringQueue1() {
HashMap<String, Object> header = new HashMap<>();
header.put("test", "111");
header.put("test1", "222");
return BindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(header).match();
}
}
发送测试
@Test
public void headerProvider(){
String param = "headers 模式消息推送。。。。。";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
messageProperties.setHeader("test","111");
Message message = new Message(param.getBytes(), messageProperties);
amqpTemplate.convertAndSend("headers-exchange", null,message);
System.out.println(" 消息发送");
}
队列queue任意匹配有数据,queue1全部匹配无数据
headers-queue
headers-queue1
消息监听
@RabbitListener(queues = "headers-queue")
public void headersMsg2(Channel channel, Message message) {
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" headers-queue 消费者:'" + message1 + "'");
}
@RabbitListener(queues = "headers-queue1")
public void headers1Msg2(Channel channel, Message message) {
String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" headers-queue1 消费者:'" + message1 + "'");
}
6.延迟消息处理(TTL)
-
第一种是使用普通队列和死信队列来模拟实现延迟的效果。将消息放入一个没有被监听的队列上,
设置TTL(一条消息的最大存活时间)为延迟的时间
,时间到了没有被消费,直接成为死信,进入死信队列。后监听私信队列来消息消费 -
第二种是使用rabbitmq官方提供的
delayed插件
来真正实现延迟队列。
方式一:ttl配置
超时自动删除
/**
* rabbitmq的ttl延迟过期时间配置
*/
@Configuration
public class RabbitMQTTLConfig {
/**
* 声明交换机
* @return
*/
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("ttl-direct-exchange");
}
/**
* 声明队列
* @return
*/
@Bean
public Queue ttlQueue(){
//设置参数
Map<String,Object> args = new HashMap<>();
//设置ttl过期时间,需设置int值
args.put("x-message-ttl",5000);
return new Queue("ttl-direct-queue",true,false,false,args);
}
/**
* 绑定队列
* @return
*/
@Bean
public Binding ttlBingQueue(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("direct:ttl:key");
}
}
测试
@Test
public void ttlSendMessageTest(){
String exchange = "ttl-direct-exchange";
String routingKey = "direct:ttl:key";
String msg = UUID.randomUUID().toString();
//发送并设置
amqpTemplate.convertAndSend(exchange,routingKey,msg);
System.out.println("消息发送成功====="+msg);
}
方式二:消息发送设置
注释掉x-message-ttl
参数,使用普通队列,发送消息时设置过期时间
@Test
public void ttlSendMessageTest(){
String exchange = "ttl-direct-exchange";
String routingKey = "direct:ttl:key";
String msg = UUID.randomUUID().toString();
//设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
//发送并设置
amqpTemplate.convertAndSend(exchange,routingKey,msg,messagePostProcessor);
System.out.println("消息发送成功====="+msg);
}
注意:如果项目中即使用了ttl配置过期时间,有设置了消息过期时间,则执行时以最小的时间为准,ttl过期队列的消息过期会写到死信,而设置方式的普通队列则不会自动写到死信队列
7.死信队列
死信的情况:消息被拒绝,消息过期,队列达到最大长度
死信队列声明
@Configuration
public class RabbitMQDLXConfig {
/**
* 声明死信交换机
* @return
*/
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange("dlx-direct-exchange");
}
/**
* 声明死信队列
* @return
*/
@Bean
public Queue dlxQueue(){ ;
return new Queue("dlx-direct-queue",true);
}
/**
* 绑定队列
* @return
*/
@Bean
public Binding dlxBingQueue(){
return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with("direct:dlx:key");
}
}
过期推送到死信设置
/**
* 声明ttl队列
* @return
*/
@Bean
public Queue ttlQueue(){
//设置参数
Map<String,Object> args = new HashMap<>();
//设置ttl过期时间,需设置int值
args.put("x-message-ttl",5000);
args.put("x-max-length",5);//最大长度
//消息过期死信队列入队配置
args.put("x-dead-letter-exchange","dlx-direct-exchange");//设置死信交换机
args.put("x-dead-letter-routing-key","direct:dlx:key");//死信路由key,fanout模式不需要设置路由key
return new Queue("ttl-direct-queue",true,false,false,args);
}
注意:队列参数修改后,不会重新创建覆盖而是会报错,需要手动删除重新创建,生产环境中则可以通过重新创建一个队列,进行转移
测试
消息过期进死信队列