SpringBoot整合RabbitMQ,三种交换机类型示例
1、流程概括
2、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、配置RabbitMQ连接
在application.properties或application.yml中配置RabbitMQ服务器的连接参数:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
4、不同交换机模式下的使用
4.1、DirectExchange(直连交换机)
消费者
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConsumer {
//注册一个队列
@Bean //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的
public Queue queue(){
return QueueBuilder.durable("Direct_Q01").maxLength(100).build();
}
//注册交换机
@Bean
public DirectExchange exchange(){
//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的
return ExchangeBuilder.directExchange("Direct_E01").build();
}
//绑定交换机与队列关系
@Bean
public Binding binding(Queue queue,DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("RK01");
}
//启动一个消费者
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage(String msg){
System.out.println("收到消息:"+msg);
}
}
生产者
@Service
public class DirectProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message) {
rabbitTemplate.convertAndSend("Direct_E01", "RK01", message);
}
}
测试
@SpringBootTest
class TestApp {
@Autowired
private DirectProvider directProvider;
@Test
void test() throws IOException {
for(int i = 0; i < 10; i++){
directProvider.send("你好呀"+i);
System.out.println("发送成功"+i);
ThreadUtil.safeSleep(1000);
}
System.in.read();
}
}
一个交换机对多个队列的特点:
一个队列对多个消费者特点:
4.2、FanoutExchange(扇形/广播交换机)
消费者
/*
* Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
*
*/
package com.fpl.consumers;
import com.fpl.model.OrderingOk;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>Project: spring-rabbitmq - FanoutConsumer</p>
* <p>Powered by fpl1116 On 2024-04-08 11:35:22</p>
* <p>描述:<p>
* @author penglei
* @version 1.0
* @since 1.8
*/
@Configuration
public class FanoutConsumer {
//注册一个队列
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("Fanout_Q01").build();
}
//注册交换机
@Bean
public FanoutExchange fanoutExchange(){
//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的
return ExchangeBuilder.fanoutExchange("Fanout_E01").build();
}
@Bean //交换机与队列关系
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage(OrderingOk msg){
System.out.println("FanoutConsumer1 消费者1 收到消息:"+msg);
}
@RabbitListener(queues = "Fanout_Q01")
public void receiveMessage32(OrderingOk msg){
System.out.println("FanoutConsumer1 消费者2 收到消息:"+msg);
}
}
生产者
@Service
public class FanoutProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message) {
rabbitTemplate.convertAndSend("Fanout_E01", "", message);
}
}
4.3、TopicExchange(主题交换机)
消费者
/*
* Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
*
*/
package com.fpl.consumers;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
/**
* <p>Project: spring-rabbitmq - TopicConsumer</p>
* <p>Powered by fpl1116 On 2024-04-08 11:38:20</p>
* <p>描述:<p>
* @author penglei
* @version 1.0
* @since 1.8
*/
//@Configuration
public class TopicConsumer {
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("Topic_E01").build();
}
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("小龙").build();
}
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("小虎").build();
}
@Bean
public Queue topicQueue3(){
return QueueBuilder.durable("小羊").build();
}
//注册交换机
@Bean //交换机与队列关系
public Binding TopicBinding1(Queue topicQueue1, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#");
}
@Bean //交换机与队列关系
public Binding TopicBinding2(Queue topicQueue2,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("1.6.*");
}
@Bean //交换机与队列关系
public Binding TopicBinding3(Queue topicQueue3,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue3).to(topicExchange).with("1.8.*");
}
@RabbitListener(queues = "小龙")
public void receiveMessage(String msg){
System.out.println("小龙 收到消息:"+msg);
}
@RabbitListener(queues = "小虎")
public void receiveMessage2(String msg){
System.out.println("小虎 收到消息:"+msg);
}
@RabbitListener(queues = "小羊")
public void receiveMessage3(String msg){
System.out.println("小羊 收到消息:"+msg);
}
}
生产者
@Service
public class TopicProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Girl girl) {
rabbitTemplate.convertAndSend("Topic_E01",girl.getHeight(), girl);
}
}