目录
一、介绍
1. 概述
2. 作用及优势
3. 工作原理
二、交换机Exchange
1. Direct
2. Topic
3. Fanout
三、代码案例
消费者代码
1. 直连direct
生产者代码
测试
2. 主题topic
生产者代码
测试
3. 扇形fanout
生产者代码
测试
每篇一获
一、介绍
1. 概述
RabbitMQ中的交换机(exchange)是消息的分发中心,它接收来自生产者的消息,并将这些消息路由到一个或多个队列中。交换机根据消息的路由键(routing key)将消息发送到相应的队列中。
四型交换机
-
直连交换机(direct exchange):直连交换机根据消息的路由键将消息发送到与之匹配的队列中。如果消息的路由键与队列的绑定键(binding key)完全匹配,那么消息将被发送到该队列中。
-
主题交换机(topic exchange):主题交换机根据消息的路由键和队列的绑定键的模式进行匹配。可以使用通配符(*和#)来匹配多个路由键,从而实现更灵活的消息路由。
-
扇形交换机(fanout exchange):扇出交换机将消息发送到所有与之绑定的队列中,无论消息的路由键是什么。它实现了一对多的消息分发。
-
头部交换机(headers exchange):头部交换机根据消息的头部信息进行匹配,而不是路由键。可以根据消息的头部属性来决定消息的路由。
交换机和队列之间通过绑定(binding)进行关联,生产者将消息发送到交换机,交换机根据路由键将消息发送到相应的队列中。交换机和队列的绑定关系可以通过管理界面或者命令行工具进行配置。
总的来说,交换机在RabbitMQ中扮演着非常重要的角色,它负责将消息路由到相应的队列中,实现了灵活的消息分发机制。不同类型的交换机可以满足不同的业务需求,开发者可以根据实际情况选择合适的交换机类型来实现消息的路由和分发。
2. 作用及优势
2.1 作用
交换机在项目中的主要作用包括:
1. 消息路由:交换机负责将消息路由到一个或多个队列中,根据消息的路由键和交换机的类型进行匹配和分发,确保消息能够准确地到达目标队列。
2. 消息分发:交换机可以根据不同的规则将消息分发到不同的队列中,实现灵活的消息分发机制。这对于实现消息的多播、广播等场景非常有用。
3. 解耦:通过交换机,生产者和消费者之间可以完全解耦,生产者只需要将消息发送到交换机中,而不需要关心消息具体发送到哪个队列中,消费者也只需要从队列中接收消息,而不需要关心消息的来源。
4. 消息过滤:通过不同类型的交换机和绑定规则,可以实现消息的过滤和选择性接收,确保消费者只接收到其关心的消息。
5. 实现消息通道:交换机是消息在RabbitMQ中的通道,通过交换机可以将消息从生产者传递给消费者,实现了消息的传递和通信。
总的来说,交换机在项目中起到了消息路由、分发、解耦和过滤等重要作用,是实现消息传递和通信的关键组件。通过合理使用不同类型的交换机,可以实现灵活、高效的消息传递机制,满足不同业务场景的需求。
2.2 优势
交换机在消息传递系统中具有以下优势:
1. 灵活的消息路由:交换机可以根据消息的路由键将消息发送到不同的队列中,实现了灵活的消息路由机制。这样可以根据消息的不同属性将消息发送到不同的消费者或处理逻辑中,提高了系统的灵活性和可扩展性。
2. 解耦和分布式系统支持:通过交换机,生产者和消费者之间可以完全解耦,生产者只需要将消息发送到交换机中,而不需要关心消息具体发送到哪个队列中,消费者也只需要从队列中接收消息,而不需要关心消息的来源。这对于构建分布式系统和微服务架构非常有用。
3. 多播和广播支持:通过扇出交换机(fanout exchange),交换机可以将消息发送到所有与之绑定的队列中,实现了一对多的消息分发,支持了多播和广播的消息传递方式。
4. 消息过滤和选择性接收:通过不同类型的交换机和绑定规则,可以实现消息的过滤和选择性接收,确保消费者只接收到其关心的消息,提高了系统的效率和性能。
5. 实现消息通道:交换机是消息在消息队列系统中的通道,通过交换机可以将消息从生产者传递给消费者,实现了消息的传递和通信,为系统中的消息传递提供了可靠的通道。
总的来说,交换机在消息传递系统中具有灵活的消息路由、解耦和分布式系统支持、多播和广播支持、消息过滤和选择性接收等优势,为构建高效、灵活的消息传递系统提供了重要的支持。
3. 工作原理
RabbitMQ的交换机(Exchange)是消息路由的核心组件,负责消息的分发和路由。下面是RabbitMQ交换机的工作原理:
1. 发布消息:生产者将消息发送到RabbitMQ的交换机中,同时指定一个路由键(Routing Key)。
2. 交换机根据类型进行路由:RabbitMQ的交换机有四种类型,分别是直连交换机(direct exchange)、扇出交换机(fanout exchange)、主题交换机(topic exchange)和头部交换机(headers exchange)。不同类型的交换机根据不同的路由规则进行消息的路由和分发。
3. 路由规则:直连交换机根据消息的路由键将消息发送到与之绑定的队列中;扇出交换机将消息发送到所有与之绑定的队列中;主题交换机根据消息的路由键和队列的绑定规则进行匹配,将消息发送到匹配的队列中;头部交换机根据消息的头部属性进行匹配,将消息发送到匹配的队列中。
4. 绑定队列:交换机需要和队列进行绑定,指定绑定的路由键或者其他条件,确保消息能够被正确地路由到目标队列中。
5. 发送到队列:一旦消息被交换机路由到目标队列,消费者就可以从队列中接收并处理消息。
总的来说,RabbitMQ的交换机根据不同的类型和路由规则,将消息发送到目标队列中,实现了消息的路由和分发。通过合理使用不同类型的交换机和绑定规则,可以实现灵活、高效的消息传递机制,满足不同业务场景的需求。
二、交换机Exchange
1. Direct
直连交换机(Direct Exchange)是RabbitMQ中最简单的交换机类型之一,它使用消息的路由键(Routing Key)来决定将消息发送到哪个队列。
案例及实现流程:
假设有一个简单的电子商务系统,其中包括订单服务和库存服务。订单服务负责接收和处理用户的订单,库存服务负责管理商品的库存信息。这两个服务之间需要通过消息队列进行通信。
-
首先在RabbitMQ中创建一个名为"order-exchange"的直连交换机,类型为direct。
-
订单服务和库存服务分别创建自己的队列,并将它们分别绑定到"order-exchange"交换机上。订单服务将队列绑定键设置为"order.create",库存服务将队列绑定键设置为"stock.update"。
-
当用户下单时,订单服务将订单信息发送到"order-exchange"交换机,并指定路由键为"order.create"。
-
直连交换机根据消息的路由键进行匹配,发现匹配了"order.create"的绑定键,将消息发送到订单服务的队列中。同时,库存服务的队列不会收到该消息。
-
订单服务从队列中接收订单信息,并处理订单;库存服务则不会收到该消息,因为它的队列没有匹配的路由键。
通过上述实现,订单服务和库存服务之间实现了解耦合,订单服务只需要将订单相关的消息发送到"order-exchange"交换机,而不需要直接关心库存服务的处理逻辑。同时,库存服务也只需要关注与自己相关的消息,通过设置不同的绑定键,实现了消息的精确路由和分发。
2. Topic
主题交换机(Topic Exchange)是RabbitMQ中一种灵活且强大的交换机类型,它使用消息的路由键和通配符模式来决定将消息发送到哪个队列。
案例及实现流程:
假设我们有一个电子商务平台,需要实现一个基于主题交换机的消息路由系统。具体的场景是用户下单后,需要根据订单的类型和地区信息,将订单信息发送给不同的处理服务进行处理。
-
在RabbitMQ中创建一个名为"order-topic-exchange"的主题交换机,类型为topic。
-
我们有三个处理服务,分别是订单处理服务、支付处理服务和物流处理服务。我们为每个服务创建一个队列,并将它们分别绑定到"order-topic-exchange"交换机上。订单处理服务的队列绑定键为"order.",支付处理服务的队列绑定键为"payment.",物流处理服务的队列绑定键为"shipping.*"。这样,订单处理服务将接收所有以"order."开头的消息,支付处理服务将接收所有以"payment."开头的消息,物流处理服务将接收所有以"shipping."开头的消息。
-
当用户下单后,订单服务将订单信息发送到"order-topic-exchange"交换机,并指定路由键为"order.create.us",其中"us"表示美国地区。
-
主题交换机根据消息的路由键进行匹配,发现匹配了"order.*"的绑定键,将消息发送到订单处理服务的队列中。同时,支付处理服务和物流处理服务的队列不会收到该消息。
-
订单处理服务从队列中接收订单信息,并处理订单;支付处理服务和物流处理服务则不会收到该消息,因为它们的队列没有匹配的路由键。
通过上述实现,我们实现了基于主题交换机的灵活消息路由系统。订单服务只需要将订单相关的消息发送到"order-topic-exchange"交换机,而不需要直接关心支付处理服务和物流处理服务的处理逻辑。同时,支付处理服务和物流处理服务也只需要关注与自己相关的消息,通过设置不同的绑定键,实现了消息的精确路由和分发。
3. Fanout
扇形交换机是RabbitMQ中的一种消息路由方式,它将消息发送到与之绑定的所有队列中,不管消息的路由键是什么。
案例及实现流程:
假设我们有一个在线商城的系统,需要实现一个基于扇out交换机的消息通知系统。具体的场景是,当用户下单后,需要通知多个服务进行处理,比如订单处理服务、支付处理服务和物流处理服务。
-
在RabbitMQ中创建一个名为"order-fanout-exchange"的扇out交换机,类型为fanout。
-
我们有三个处理服务,分别是订单处理服务、支付处理服务和物流处理服务。我们为每个服务创建一个队列,并将它们分别绑定到"order-fanout-exchange"交换机上。
-
当用户下单后,订单服务将订单信息发送到"order-fanout-exchange"交换机。
-
扇out交换机会将消息发送到所有与之绑定的队列中,不管消息的路由键是什么。因此,订单处理服务、支付处理服务和物流处理服务都会接收到相同的消息。
-
订单处理服务、支付处理服务和物流处理服务分别从各自的队列中接收订单信息,并进行相应的处理。
通过上述实现,我们实现了基于扇out交换机的消息通知系统。订单服务只需要将订单相关的消息发送到"order-fanout-exchange"交换机,而不需要关心具体的处理服务。同时,处理服务也只需要关注与自己相关的消息,而不需要关心其他处理服务的处理逻辑。这样,实现了消息的广播通知和解耦。
三、代码案例
以下代码都是基于我博客中的:
RabbitMQ的基本使用,进行实例案例的消息队列https://blog.csdn.net/SAME_LOVE/article/details/135704883?spm=1001.2014.3001.5501
有了前面的安装部署,我们只需要开启docker服务,如果是开机自启就可以不用输入以下
命令(开启docker服务):
systemctl start docker
docker run -d \ --name my-rabbitmq \ -p 5672:5672 -p 15672:15672 \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:management
以上是我们创建的容器,如果有了就可以直接启动容器即可:
docker start my-rabbitmq
消费者代码
创建ReceiverQ1接收交换机中Queue01队列消息的方法
ReceiverQ1:
package com.cloudjun.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "Queue01") public class ReceiverQ1 { // 接收directExchange01交换机中Queue01队列消息的方法 @RabbitHandler public void Queue01(String msg) { log.warn("Queue01,接收到信息:" + msg); } }
创建ReceiverQ2接收交换机中Queue02队列消息的方法
ReceiverQ2:
package com.cloudjun.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "Queue02") public class ReceiverQ2 { // 接收directExchange01交换机中Queue02队列消息的方法 @RabbitHandler public void Queue02(String msg) { log.warn("Queue02,接收到信息:" + msg); } }
1. 直连direct
生产者代码
在生产者项目中的RabbitConfig中增加以下代码:
/** * 直连交换机 * / * 创建两个Binding Bean,分别与Queue01和Queue02队列进行绑定 * 并都指向directExchange01(直连交换机),键分别为Key01和Key02 */ // 创建队列 @Bean public Queue Queue01() { return new Queue("Queue01"); } @Bean public Queue Queue02() { return new Queue("Queue02"); } // 创建直连(direct)交换机 @Bean public DirectExchange directExchange01() { return new DirectExchange("directExchange01"); } // 创建Binding Bean,与Queue01和directExchange01绑定,键为Key01 @Bean public Binding binding01() { return BindingBuilder .bind(Queue01()) .to(directExchange01()) .with("Key01"); } // 创建Binding Bean,与Queue02和directExchange01绑定,键为Key02 @Bean public Binding binding02() { return BindingBuilder .bind(Queue02()) .to(directExchange01()) .with("Key02"); }
在生产者项目中的TestController中增加以下代码:
@RequestMapping("test03") public String test03() { // 发送消息到名为directExchange01的交换机,路由键为key01,信息内容为:Hello, direct exchange! // 这里的directExchange01是RabbitMQ中定义的交换机名称 // 这里的key01是RabbitMQ中定义的路由键名称 template.convertAndSend("directExchange01","Key01", "Hello, direct exchange!"); return "🙊👌"; } @RequestMapping("test04") public String test04() { // 发送消息到名为directExchange01的交换机,路由键为key02,信息内容为:Hello, direct exchange! // 这里的directExchange01是RabbitMQ中定义的交换机名称 // 这里的key02是RabbitMQ中定义的路由键名称 template.convertAndSend("directExchange01","Key02", "Hello, direct exchange!"); return "🙊👌"; }
测试
测试一(test03):
测试二(test04):
2. 主题topic
生产者代码
在生产者项目中的RabbitConfig中增加以下代码:
/** * 主题交换机 * / * binding03:将Queue01绑定到topicExchange,并使用*.*.Q1作为路由键。 * binding04:将Queue02绑定到topicExchange,并使用*.*.Q2作为路由键。 * binding05:将Queue01绑定到topicExchange,并使用un.#作为路由键。 * binding06:将Queue02绑定到topicExchange,并使用un.#作为路由键。 * '*'代表一个单词, * '#'代表任意数量的字符,也代表0个或多个 */ // 创建主题交换机 @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean public Binding binding03() { return BindingBuilder .bind(Queue01()) .to(topicExchange()) .with("*.*.Q1"); } @Bean public Binding binding04() { return BindingBuilder .bind(Queue02()) .to(topicExchange()) .with("*.*.Q2"); } @Bean public Binding binding05() { return BindingBuilder .bind(Queue01()) .to(topicExchange()) .with("un.#"); } @Bean public Binding binding06() { return BindingBuilder .bind(Queue02()) .to(topicExchange()) .with("un.#"); }
在生产者项目中的TestController中增加以下代码:
@RequestMapping("test05") public String test05(String rex) { template.convertAndSend("topicExchange",rex,"Hello,topicExchange:Queue!"); return "🙊👌"; }
测试
3. 扇形fanout
生产者代码
在生产者项目中的RabbitConfig中增加以下代码:
/** * 扇形交换机 * * 定义了一个FanoutExchange,加上Bean注解 * 定义了两个Binding,加上Bean注解 * 将两个队列绑定到FanoutExchange上,从而实现广播消息的功能 * 扇形交换机会将接收到的消息路由到所有绑定到它上的队列。 */ // 创建扇形交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Binding binding07() { return BindingBuilder .bind(Queue01()) .to(fanoutExchange()); } @Bean public Binding binding08() { return BindingBuilder .bind(Queue02()) .to(fanoutExchange()); }
在生产者项目中的TestController中增加以下代码:
@RequestMapping("test06") public String test06() { template.convertAndSend("fanoutExchange","","Hello,fanoutExchange:Queue!"); return "🙊👌"; }
测试
整合代码:
RabbitConfig:
package com.cloudjun.publisher;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
// 创建队列
@Bean
public Queue messageQueue() {
return new Queue("messageQueue");
}
@Bean
public Queue messageUser() {
return new Queue("messageUser");
}
/**
* 直连交换机
* /
* 创建两个Binding Bean,分别与Queue01和Queue02队列进行绑定
* 并都指向directExchange01(直连交换机),键分别为Key01和Key02
*/
// 创建队列
@Bean
public Queue Queue01() {
return new Queue("Queue01");
}
@Bean
public Queue Queue02() {
return new Queue("Queue02");
}
// 创建直连(direct)交换机
@Bean
public DirectExchange directExchange01() {
return new DirectExchange("directExchange01");
}
// 创建Binding Bean,与Queue01和directExchange01绑定,键为Key01
@Bean
public Binding binding01() {
return BindingBuilder
.bind(Queue01())
.to(directExchange01())
.with("Key01");
}
// 创建Binding Bean,与Queue02和directExchange01绑定,键为Key02
@Bean
public Binding binding02() {
return BindingBuilder
.bind(Queue02())
.to(directExchange01())
.with("Key02");
}
/**
* 主题交换机
* /
* binding03:将Queue01绑定到topicExchange,并使用*.*.Q1作为路由键。
* binding04:将Queue02绑定到topicExchange,并使用*.*.Q2作为路由键。
* binding05:将Queue01绑定到topicExchange,并使用un.#作为路由键。
* binding06:将Queue02绑定到topicExchange,并使用un.#作为路由键。
* '*'代表一个单词,
* '#'代表任意数量的字符,也代表0个或多个
*/
// 创建主题交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding binding03() {
return BindingBuilder
.bind(Queue01())
.to(topicExchange())
.with("*.*.Q1");
}
@Bean
public Binding binding04() {
return BindingBuilder
.bind(Queue02())
.to(topicExchange())
.with("*.*.Q2");
}
@Bean
public Binding binding05() {
return BindingBuilder
.bind(Queue01())
.to(topicExchange())
.with("un.#");
}
@Bean
public Binding binding06() {
return BindingBuilder
.bind(Queue02())
.to(topicExchange())
.with("un.#");
}
/**
* 扇形交换机
*
* 定义了一个FanoutExchange,加上Bean注解
* 定义了两个Binding,加上Bean注解
* 将两个队列绑定到FanoutExchange上,从而实现广播消息的功能
* 扇形交换机会将接收到的消息路由到所有绑定到它上的队列。
*/
// 创建扇形交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding07() {
return BindingBuilder
.bind(Queue01())
.to(fanoutExchange());
}
@Bean
public Binding binding08() {
return BindingBuilder
.bind(Queue02())
.to(fanoutExchange());
}
}
TestController:
package com.cloudjun.publisher;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author CloudJun
*/
@RestController
public class TestController {
@Autowired
private AmqpTemplate template;
@Autowired
private ObjectMapper objectMapper;
@RequestMapping("test01")
public String test01(){
// 发送消息到名为messageQueue的队列
// 这里的messageQueue是RabbitMQ中定义的队列名称
// 这里的"Hello World!"是发送的消息内容
template.convertAndSend("messageQueue", "HelloWorld!");
return "💖";
}
@RequestMapping("test02")
public String test02() throws Exception {
// 发送消息到名为messageQueue的队列
// 这里的messageQueue是RabbitMQ中定义的队列名称
User user = new User("Jun", "123456");
// 序列化对象转换为JSON字符串
String json = objectMapper.writeValueAsString(user);
template.convertAndSend("messageUser", json);
return "💖";
}
@RequestMapping("test03")
public String test03() {
// 发送消息到名为directExchange01的交换机,路由键为key01,信息内容为:Hello, direct exchange!
// 这里的directExchange01是RabbitMQ中定义的交换机名称
// 这里的key01是RabbitMQ中定义的路由键名称
template.convertAndSend("directExchange01","Key01", "Hello, direct exchange!");
return "🙊👌";
}
@RequestMapping("test04")
public String test04() {
// 发送消息到名为directExchange01的交换机,路由键为key02,信息内容为:Hello, direct exchange!
// 这里的directExchange01是RabbitMQ中定义的交换机名称
// 这里的key02是RabbitMQ中定义的路由键名称
template.convertAndSend("directExchange01","Key02", "Hello, direct exchange!");
return "🙊👌";
}
@RequestMapping("test05")
public String test05(String rex) {
template.convertAndSend("topicExchange",rex,"Hello,topicExchange:Queue!");
return "🙊👌";
}
@RequestMapping("test06")
public String test06() {
template.convertAndSend("fanoutExchange","","Hello,fanoutExchange:Queue!");
return "🙊👌";
}
}
每篇一获
通过学习以上所有的技术,您可能会获得以下收获:
-
理解消息队列的概念和作用:学习了消息队列的基本概念,以及它在系统架构中的作用,能够更好地理解消息队列的优势和适用场景。
-
掌握RabbitMQ的基本原理和使用方法:学习了RabbitMQ的基本原理、交换机类型、队列绑定等概念,以及如何使用RabbitMQ实现消息的发送、接收和路由。
-
理解消息队列在微服务架构中的应用:学习了消息队列在微服务架构中的重要性和应用场景,能够将消息队列应用于实际的微服务系统中,实现服务之间的解耦和异步通信。
-
掌握Docker容器化技术:学习了Docker容器化技术的基本原理和使用方法,能够使用Docker构建、部署和管理容器化的应用程序,实现应用的快速部署和扩展。
-
理解扇out交换机的应用场景:学习了扇out交换机的工作原理和应用场景,能够将其应用于实际的系统中,实现消息的广播通知和解耦。
这些技术的学习和掌握,有助于您更好地理解和应用现代系统架构中的关键技术,提升系统设计和开发的能力,同时也为您在实际项目中的应用提供了技术支持。