这里写目录标题
- 一、序言
- 二、配置文件application.yml
- 三、RabbitMQ交换机和队列配置
- 1、定义4个队列
- 2、定义Fanout交换机和队列绑定关系
- 2、定义Direct交换机和队列绑定关系
- 3、定义Topic交换机和队列绑定关系
- 4、定义Header交换机和队列绑定关系
- 四、RabbitMQ消费者配置
- 五、RabbitMQ生产者
- 六、测试用例
- 1、发送到FanoutExchage
- 2、发送到DirectExchage
- 3、发送到TopicExchange
- 4、发动到HeadersExchage
- 七、结语
一、序言
在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。
二、配置文件application.yml
我们先上应用启动配置文件application.yml
,如下:
server:
port: 8080
spring:
rabbitmq:
addresses: localhost:5672
username: admin
password: admin
virtual-host: /
listener:
type: simple
simple:
acknowledge-mode: auto
concurrency: 5
max-concurrency: 20
prefetch: 5
备注:这里我们指定了
RabbitListenerContainerFactory
的类型为SimpleRabbitListenerContainerFactory
,并且指定消息确认模式为自动确认。
三、RabbitMQ交换机和队列配置
Spring官方提供了一套 流式API 来定义队列、交换机和绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。
1、定义4个队列
/**
* 定义4个队列
*/
@Configuration
protected static class QueueConfig {
@Bean
public Queue queue1() {
return QueueBuilder.durable("queue-1").build();
}
@Bean
public Queue queue2() {
return QueueBuilder.durable("queue-2").build();
}
@Bean
public Queue queue3() {
return QueueBuilder.durable("queue-3").build();
}
@Bean
public Queue queue4() {
return QueueBuilder.durable("queue-4").build();
}
}
2、定义Fanout交换机和队列绑定关系
/**
* 定义Fanout交换机和对应的绑定关系
*/
@Configuration
protected static class FanoutExchangeBindingConfig {
@Bean
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
}
/**
* 定义多个Fanout交换机和队列的绑定关系
* @param fanoutExchange
* @param queue1
* @param queue2
* @param queue3
* @param queue4
* @return
*/
@Bean
public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
}
}
备注:这里我们将4个队列绑定到了名为
fanout-exchange
的交换机上。
2、定义Direct交换机和队列绑定关系
@Configuration
protected static class DirectExchangeBindingConfig {
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("direct-exchange").build();
}
@Bean
public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
}
}
备注:这里我们定义了名为
direct-exchange
的交换机并通过路由keyqueue3-route-key
将queue-3
绑定到了该交换机上。
3、定义Topic交换机和队列绑定关系
@Configuration
protected static class TopicExchangeBindingConfig {
@Bean
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange("topic-exchange").build();
}
@Bean
public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
return new Declarables(queue1Binding, queue2Binding);
}
}
这里我们定义了名为topic-exchange
类型的交换机,该类型交换机支持路由key通配符匹配,*
代表一个任意字符,#
代表一个或多个任意字符。
备注:
- 通过路由key
com.order.*
将queue-1
绑定到了该交换机上。- 通过路由key
com.#
将queue-2
也绑定到了该交换机上。
4、定义Header交换机和队列绑定关系
@Configuration
protected static class HeaderExchangeBinding {
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange("headers-exchange").build();
}
@Bean
public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
}
}
备注:这里我们定义了名为
headers-exchange
类型的交换机,并通过参数function=logging
将queue-4
绑定到了该交换机上。
四、RabbitMQ消费者配置
Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:
@Slf4j
@Component
public class RabbitMqConsumer {
@RabbitListener(queues = "queue-1")
public void handleMsgFromQueue1(String msg) {
log.info("Message received from queue-1, message body: {}", msg);
}
@RabbitListener(queues = "queue-2")
public void handleMsgFromQueue2(String msg) {
log.info("Message received from queue-2, message body: {}", msg);
}
@RabbitListener(queues = "queue-3")
public void handleMsgFromQueue3(String msg) {
log.info("Message received from queue-3, message body: {}", msg);
}
@RabbitListener(queues = "queue-4")
public void handleMsgFromQueue4(String msg) {
log.info("Message received from queue-4, message body: {}", msg);
}
}
备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。
五、RabbitMQ生产者
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMsgToFanoutExchange(String body) {
log.info("开始发送消息到fanout-exchange, 消息体:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
}
public void sendMsgToDirectExchange(String body) {
log.info("开始发送消息到direct-exchange, 消息体:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
}
public void sendMsgToTopicExchange(String routingKey, String body) {
log.info("开始发送消息到topic-exchange, 消息体:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("topic-exchange", routingKey, message);
}
public void sendMsgToHeadersExchange(String body) {
log.info("开始发送消息到headers-exchange, 消息体:{}", body);
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
}
}
六、测试用例
这里写了个简单的Controller用来测试,如下:
@RestController
@RequiredArgsConstructor
public class RabbitMsgController {
private final RabbitMqProducer rabbitMqProducer;
@RequestMapping("/exchange/fanout")
public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
rabbitMqProducer.sendMsgToFanoutExchange(body);
return ResponseEntity.ok("广播消息发送成功");
}
@RequestMapping("/exchange/direct")
public ResponseEntity<String> sendMsgToDirectExchange(String body) {
rabbitMqProducer.sendMsgToDirectExchange(body);
return ResponseEntity.ok("消息发送到Direct交换成功");
}
@RequestMapping("/exchange/topic")
public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
return ResponseEntity.ok("消息发送到Topic交换机成功");
}
@RequestMapping("/exchange/headers")
public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
rabbitMqProducer.sendMsgToHeadersExchange(body);
return ResponseEntity.ok("消息发送到Headers交换机成功");
}
}
1、发送到FanoutExchage
直接访问http://localhost:8080/exchange/fanout?body=hello
,可以看到该消息广播到了4个队列上。
2023-11-07 17:41:12.959 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
2、发送到DirectExchage
访问http://localhost:8080/exchange/direct?body=hello
,可以看到消息通过路由keyqueue3-route-key
发送到了queue-3
上。
2023-11-07 17:43:26.804 INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822 INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
3、发送到TopicExchange
访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create
,路由key为 com.order.create
的消息分别发送到了queue-1
和queue-2
上。
2023-11-07 17:44:45.301 INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
4、发动到HeadersExchage
访问http://localhost:8080/exchange/headers?body=hello
,消息通过头部信息function=logging
发送到了headers-exchange
上。
2023-11-07 17:47:21.736 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749 INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
七、结语
下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。