前言
最近有个想法想整理一个内容比较完整springboot项目初始化Demo。
SpringBoot集成RabbitMQ
RabbitMQ中的一些角色:
-
publisher:生产者
-
consumer:消费者
-
exchange个:交换机,负责消息路由
-
queue:队列,存储消息
-
virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
junit用于测试。
一、pom引入依赖amqp
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!--rabbitmq-->
二、application-dev.yaml 增加RabbitMQ相关配置
spring:
#RabbitMQ服务器配置,地址账号密码,virtualhost等配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: murg
password: 123456
virtual-host: murg-host
#队列中没有消息,阻塞等待时间
template:
receive-timeout: 2000
logging:
level:
org.springframework.security: debug
三、发布/订阅
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。
此模式下根据交换机类型又分为三种。
1.Fanout类型: 广播模式 把消息交给所有绑定到交换机的队列
2.Direct类型: 路由模式 把消息交给符合指定routing key 的队列
3Topic类型: 主题模式 把消息交给符合主题通配符的队列
3.1 Fanout类型
在广播模式下,消息发送流程是这样的:
1) 可以有多个队列
2) 每个队列都要绑定到Exchange(交换机)
3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
4) 交换机把消息发送给绑定过的所有队列
5) 订阅队列的消费者都能拿到消
3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起
创建配置类FanoutConfig
声明一个Fanout类型交换机命名为murg.fanout
声明两个Queue队列分别为fanout.queue1和fanout.queue2
分别将两个队列和交换机绑定,后续用于消费消息。
package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Fanout 广播模式下
*声明交换机和队列
*/
@Configuration
public class FanoutConfig {
/**
* 声明交换机和队列 将交换机和队列绑定在一起
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("murg.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
3.1.2创建消息生产服务
创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。
注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
新增测试广播模式下发送消息方法
package com.murg.bootdemo.rabbitmq;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* Rabbitmq消息生产者
*/
@Service
@RequiredArgsConstructor
public class MessageProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* 测试广播模式下发送消息
* @param msg
*/
public void testFanoutExchange(String msg) {
// 发送消息
//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
rabbitTemplate.convertAndSend("murg.fanout","",msg);
}
}
3.13创建消息消费服务
创建消息生产服务MessageConsumerService
@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener
注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。
package com.murg.bootdemo.rabbitmq;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* 消息消费者
*
*/
@Service
@RequiredArgsConstructor
public class MessageConsumerService {
/**
* 下面是Fanout,广播模式的监听
* 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2
* @param msg
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
}
3.14修改测试类进行测试
修改创建项目时生成的 BootdemoApplicationTests.class
增加以下注解
@ActiveProfiles("dev")
指定运行环境为开发环境
@RunWith(SpringRunner.class)
指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。
@SpringBootTest(classes={BootdemoApplication.class})
指定启动类
package com.murg.bootdemo;
import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {
@Autowired
MessageProducerService messageProducerService;
@Test
public void contextLoads() {
System.out.printf("aaaaaaaaaaaaa");
}
//测试
@Test
public void testFanoutExchange(){
String msg = "遍身罗绮者,不是养蚕人";
for (int i=0;i<10;i++){
messageProducerService.testFanoutExchange(msg);
}
}
}
3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果
两个队列都可消费。
3.2 Direct类型
3.2.1MessageProducerService生产服务增加方法testDirectExchange
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
/**
* 测试Direct模式下发送消息
* 在Direct模型下:
*
* 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
*
* 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
*
* Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
* murg.Direct
* @param msg
*/
public void testDirectExchange(String msg,String rountingkey) {
// 发送消息
//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);
}
3.2.2MessageConsumerService消费服务增加方法testDirectExchange
Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息
direct.queue1 消费 rountingkey为“蚕妇”的消息
direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息
/**
*
* 下面是rountingKey 路由key 模式的消费监听
* 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
*
* 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
*
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1
exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
key = {"蚕妇"} //指定消费rountingkey
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2
exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
3.2.3测试类增加测试方法
@Test
public void testDirectExchange() throws InterruptedException {
String msg1 = "遍身罗绮者,不是养蚕人";
String key1 ="蚕妇";
String msg2 = "朱门酒肉臭,路有冻死骨";
String key2 ="自京赴奉先县咏怀五百字";
for (int i=0;i<10;i++){
if (i % 2 == 0){
messageProducerService.testDirectExchange(msg2,key2);
Thread.sleep(1000);
}else {
messageProducerService.testDirectExchange(msg1,key1);
Thread.sleep(1000);
}
}
}
调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果
3.3 Topic类型
3.3.1MessageProducerService生产服务增加方法testTopicExchange
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu #.demo写法也可以
demo.*:只能匹配demo.spu
public void testTopicExchange(String msg, String key) {
rabbitTemplate.convertAndSend("murg.topic",key,msg);
}
3.3.2MessageConsumerService消费服务增加方法testDirectExchange
@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息
topic.queue1 消费 rountingkey为“罗隐.*”的消息
topic.queue2 消费 rountingkey为“#.贫女”的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1
exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
key = {"罗隐.*"} //指定消费rountingkey
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2
exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
key = {"#.贫女"}//指定消费rountingkey
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
3.3.3测试类增加测试方法
@Test
public void testTopicExchange() throws InterruptedException {
String msg = "采得百花成蜜后,为谁辛苦为谁甜";
String key ="罗隐.蜂";
String msg2 = "苦恨年年压金线,为他人作嫁衣裳";
String key2 ="秦韬玉.贫女";
for (int i=0;i<10;i++){
if (i % 2 == 0){
messageProducerService.testTopicExchange(msg,key);
Thread.sleep(1000);
}else {
messageProducerService.testTopicExchange(msg2,key2);
Thread.sleep(1000);
}
}
}
调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果