文章目录
- 1、前情提要【RabbitMQ】
- 2、RabbitMQ-SpringBoot案例 -fanout模式
- 2.1 实现架构总览
- 2.2 具体实现
- 2.2.1生产者
- 2.2.1消费者
1、前情提要【RabbitMQ】
【RabbitMQ】消息队列-RabbitMQ篇章
RabbitMQ实现流程
2、RabbitMQ-SpringBoot案例 -fanout模式
2.1 实现架构总览
实现步骤:
1:创建生产者工程:sspringboot-rabbitmq-fanout-producer
2:创建消费者工程:springboot-rabbitmq-fanout-consumer
3:引入spring-boot-rabbitmq的依赖
4:进行消息的分发和测试
5:查看和观察web控制台的状况
2.2 具体实现
2.2.1生产者
在这之前提前开好服务器,并且启动mq的服务,可参考上面的链接
- 1、创建生产者springboot工程:sspringboot-rabbitmq-fanout-producer
- 2、导入启动(web、mq)依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 3、在application.yml进行配置
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
# ip地址为开启mq服务的服务器地址
host: 47.104.141.27
port: 5672
- 4:定义订单的生产者
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author: 学相伴-飞哥
* @description: OrderService
* @Date : 2021/3/4
*/
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "fanout_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1: 模拟用户下单
String orderNumer = UUID.randomUUID().toString();
// 2: 根据商品id productId 去查询商品的库存
// int numstore = productSerivce.getProductNum(productId);
// 3:判断库存是否充足
// if(num > numstore ){ return "商品库存不足..."; }
// 4: 下单逻辑
// orderService.saveOrder(order);
// 5: 下单成功要扣减库存
// 6: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
- 4、配置类绑定交换机和队列的关系
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue emailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("weixin.fanout.queue", true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
public DirectExchange fanoutOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("fanout_order_exchange", true, false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingDirect1() {
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
}
}
- 6.测试
向队列发送10条消息
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer;
import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqFanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
Long userId = 100L + i;
Long productId = 10001L + i;
int num = 10;
orderService.makeOrder(userId, productId, num);
}
}
}
启动测试,此时进入mq的web页面,查看交换机和队列是否绑定上了,查看是否有队列消息
向所有队列中都加入10条消息—说明交换机和队列绑定没问题
2.2.1消费者
参照生产者的创建方法,选择在平级目录下创建:
springboot-order-rabbitmq-consumber
2. 修改配置文件
# 服务器
server:
# 端口要改成不冲突的
port: 8081/
# rabbitmq配置
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.157.128 #127.0.0.1
port: 5672
- 创建FanoutEmailConsumer、FanoutNoteConsumer、FanoutSMSConsumer消费者接收
4.运行测试