目录
预备工作和配置
1.发送消息
实现类
控制层
效果
2.收消息
3.异步读取
效果
4.Work queues --工作队列模式
创建队列text2
实体类
效果
5.Subscribe--发布订阅模式
效果
6.Routing--路由模式
效果
7.Topics--通配符模式
效果
异步处理、应用解耦、流量削锋、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高扩展的系统。
为了高效处理消息,使用第三种方式,在写入消息队列后就响应,存在队列中。
预备工作和配置
打开RabbitMQ和控制页面并登录
新建一个消息队列
创建项目加入依赖并且写好配置类
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 配置RabbitMQ消息中间件连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
1.发送消息
实现类
调用RabbitTemplate类的convertAndSend方法,设置消息内容和消息队列。
@Service
public class DemoService {
@Autowired
RabbitTemplate rabbitTemplate;
public void setRabbitTemplate() {
//convertAndSend("RabbitMQ虚拟主机路径","消息队列","消息内容")
rabbitTemplate.convertAndSend("", "text", "abc123456");
}
}
控制层
@RestController
public class MyCoutrollor {
@Autowired
DemoService demoService;
@RequestMapping("/text")
public String setDemoService() {
demoService.setRabbitTemplate();
return "setRabbitTemplate";
}
效果
有一条消息,未取出,存在队列中
可见其中的值就是所存的值
2.收消息
监听器进行监听,不需要进行调用等,一旦发现该队列,将直接收取。
//监听器 workQueue1队列名 如果队列中有消息 就取走
//@RabbitListener(queues = "text") 接收消息,queues="队列名"
@RabbitListener(queues = "text")
//默认参数Message
public void revcMessage(Message message) {
byte[] body = message.getBody();
String result = new String(body);
System.out.println(result);
}
此时消息已被取走,队列中无消息。
3.异步读取
效果
此时并未取到值,只是存入队列后就立刻返回发送成功
4.Work queues --工作队列模式
指定唯一的消息队列进行消息传递。在这种模式下,多个消息消费者通过轮询的方式依次接收消息队列中存储的消息,一旦消息被某一个消费者接收,消息队列会将消息移除。
@RabbitListener(queues = "text"),指定队列监听。
创建队列text2
实体类
@Data
public class User {
public String name;
public String password;
}
效果
//消息内容为对象
public void setRabbitTemplate2() {
User user = new User();
user.setName("123");
user.setPassword("123456");
rabbitTemplate.convertAndSend("", "text", user);
}
存进指定text2队列中
指定从text2中取出
//所有消息接受时,不管传输的是什么,接受时都转成字节
@RabbitListener(queues = "text")
//默认参数Message
public void revcMessage2(User user) {
System.out.println(user);
}
此时队列中无消息
5.Subscribe--发布订阅模式
配置一个fanout类型的交换器,不需要指定对应的路由键(Routing key),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,在同一交换器下的所有队列都将收到消息。
@RabbitListener注解--创建队列和交换机并绑定,并且设置交换机类型。
效果
存入交换机中,此时不需要指定对应的键
//广播模式
public void setRabbitTemplate3() {
User user = new User();
user.setName("张三");
user.setPassword("zhangsan123456");
rabbitTemplate.convertAndSend("TextExchange", "", user);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("text3"),//队列名
exchange = @Exchange(
value = "TextExchange",//交换机名
type = "fanout")))//交换机类型 fanout----广播模式
public void revcMessage3(User user) {
System.out.println("fanout TextExchange text3"+user);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("text4"),//队列名
exchange = @Exchange(
value = "TextExchange",//交换机名
type = "fanout")))//交换机类型 fanout----广播模式
public void revcMessage4(User user) {
System.out.println("fanout TextExchange text4"+user);
}
自动创建队列text3和text4
并且绑定了交换机
因为交换机类型为fanout----广播模式,所以该交换机绑定的所有队列均可收到消息。
6.Routing--路由模式
配置一个direct类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,适用于进行不同类型消息分类处理的场合。
效果
指定了能取出消息的队列,所以应只有key = "info"的text5能取出消息。
//路由模式
public void setRabbitTemplate4() {
User user = new User();
user.setName("123");
user.setPassword("123456");
rabbitTemplate.convertAndSend("Text4Exchange", "info", user);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("text5"),//队列名
exchange = @Exchange(
value = "Text4Exchange",//交换机名
type = "direct"),key = "info"))//交换机类型 direct----路由模式
public void revcMessage5(User user) {
System.out.println("fanout Text4Exchange text5 "+user);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("text6"),//队列名
exchange = @Exchange(
value = "Text4Exchange",//交换机名
type = "direct"),key = "error"))//交换机类型 direct----路由模式
public void revcMessage6(User user) {
System.out.println("fanout Text4Exchange text6 "+user);
}
7.Topics--通配符模式
配置一个topic类型的交换器。与Routing模式不同的是:Routing只能指定一个key值,而通配符可以指定一类key值,只要含有指定词的那一类队列都可以收到,加强版路由模式。
效果
指定了一类key值的消息能被取出,所以应只有首位是info并且其中含有email的消息能被取出
//通配符模式
public void setRabbitTemplate5() {
User user = new User();
user.setName("小红");
user.setPassword("xiaohong123456");
rabbitTemplate.convertAndSend("Text5Exchange", "info.email", user);
User user1 = new User();
user1.setName("小绿");
user1.setPassword("xiaolv123456");
rabbitTemplate.convertAndSend("Text5Exchange", "info.sms", user1);
User user2 = new User();
user2.setName("小蓝");
user2.setPassword("xiaolan123456");
rabbitTemplate.convertAndSend("Text5Exchange", "info.email.sms", user2);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("text7"),//队列名
exchange = @Exchange(
value = "Text5Exchange",//交换机名
type = "topic"),key = "info.#.email.#"))//交换机类型 topic----通配符模式
public void revcMessage7(User user) {
System.out.println("topic Text5Exchange text7 "+user);
}
取出满足条件的消息