文章目录
- 同步通讯和异步通讯
- 什么是MQ?
- 一.RabbitMQ概述
- 二.SpringAMQP
- 1.Work Queue 工作队列
- 2.发布订阅-Fanout Exchange
- 3.发布订阅-DirectExchange
- 4.发布订阅-TopicExchange
- 3.消息转换器
同步通讯和异步通讯
其实在之前的JAVASE中就已经在线程一章中接触过了同步和异步的概念,那么同步通讯和异步通讯有什么异同呢?
1.时序(Timing):
同步通讯: 在同步通讯中,发送方发出请求后会等待接收方的响应,直到接收到响应或者发生超时。这意味着发送和接收两个操作是顺序执行的。
异步通讯: 在异步通讯中,发送方发送消息后不会等待接收方的响应,而是继续执行其他操作。接收方在接收到消息后会进行处理,但发送方不会阻塞等待。
2.阻塞(Blocking):
同步通讯: 在同步通讯中,发送方通常会被阻塞,直到接收到响应或者发生超时。这意味着发送方需要等待,直到接收方完成处理。
异步通讯: 在异步通讯中,发送方通常不会被阻塞,可以继续执行其他任务。这使得异步通讯适用于需要同时处理多个任务的场景。
3.响应方式(Response):
同步通讯: 在同步通讯中,发送方需要等待接收方的响应,响应通常是直接返回给发送方的。
异步通讯: 在异步通讯中,发送方不会直接等待接收方的响应。通常使用回调函数、事件或者轮询等方式来处理接收方的响应。
4.可靠性(Reliability):
同步通讯: 由于同步通讯需要等待接收方的响应,所以相对容易处理错误和异常情况,可以更容易地实现重试机制。
异步通讯: 异步通讯中,由于发送方不等待响应,错误处理可能更为复杂,需要采用其他机制来处理错误,例如超时处理、回调错误处理等。
5.适用场景(Use Cases):
同步通讯: 适用于简单的请求-响应场景,例如传统的函数调用、HTTP 请求等。
异步通讯: 适用于需要提高系统的并发性、响应性和吞吐量的场景,例如事件驱动架构、消息队列系统等。
同步调用存在的问题
总而言之,同步调用的优点:时效性较强,可以立即得到结果
同步调用的问题:1.耦合度高2.性能和吞吐能力下降3.有额外的资源消耗4.有级联失败问题
异步调用方案
异步调用常见实现就是事件驱动模式
事件驱动优势:
优势一:服务解耦
当后期想要增加服务,不需要与服务调用者打交道, 只需要订阅事件到Broker(消息队列即可)
优势二:性能提升,吞吐量提高
当来自网关的请求调用到了服务,这个服务需要调用其他服务时,只需要通知相应的消息队列即可,服务耗时减少,其次,由于消息队列采用的是异步通讯,所以耗时相应减少
优势三:服务没有强依赖,不担心级联失败问题
当消息队列的某个服务阻塞后,也不影响整个消息的通知以及其他服务和消息队列的阻塞
优势四:流量削峰
当有高并发请求时,消息队列可以分批次发送消息,进行流量削峰
总结:
异步通信的优点:
-
耦合度低
-
吞吐量提升
-
故障隔离
-
流量削峰
异步通信的缺点:
-
依赖于Broker的可靠性、安全性、吞吐能力
-
架构复杂了,业务没有明显的流程线,不好追踪管理
什么是MQ?
MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
市面上常见的MQ对比:
一.RabbitMQ概述
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)
RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
安装: 通过Docker镜像安装即可
RabbitMQ的结构和概念
RabbitMQ中的几个概念:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
常见消息模型
MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:
基本消息队列(BasicQueue)
工作消息队列(WorkQueue)
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
由于使用官网提供的api编写代码操作RabbitMQ过于复杂,下面引出SpringAMQP
二.SpringAMQP
什么是SpringAMQP?
案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能
流程如下:
1.在父工程中引入spring-amqp的依赖
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列
总结:
1.SpringAMQP如何发送消息?
-
引入amqp的starter依赖
-
配置RabbitMQ地址
-
利用RabbitTemplate的convertAndSend方法
2.SpringAMQP如何接收消息?
-
引入amqp的starter依赖
-
配置RabbitMQ地址
-
定义类,添加@Component注解
-
类中声明方法,添加@RabbitListener注解,方法参数就时消息
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
1.Work Queue 工作队列
Work queue工作队列,可以提高消息处理速度,避免队列消息堆积
消费预取限制(能者多劳)
修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
发布( Publish )、订阅( Subscribe )
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:
-
Fanout:广播
-
Direct:路由
-
Topic:话题
2.发布订阅-Fanout Exchange
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
使用方法:
案例:利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
@Configuration
public class FanoutConfig {
//声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("root.fanout");
}
//声明第一个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列一和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//声明第二个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//绑定队列二和交换机
@Bean
public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者接收到消息 :【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg){
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
}
3.在publisher中编写测试方法,向itcast.fanout发送消息
@Test
void testFanoutExchange() {
// 队列名称
String exchangeName = "root.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
Q&A
1.交换机的作用是什么?
-
接收publisher发送的消息
-
将消息按照规则路由到与之绑定的队列
-
不能缓存消息,路由失败,消息丢失
-
FanoutExchange的会将消息路由到每个绑定的队列
2.声明队列、交换机、绑定关系的Bean是什么?
-
Queue
-
FanoutExchange
-
Binding
3.发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例:利用SpringAMQP演示DirectExchange的使用
实现思路如下:
1.利用@RabbitListener声明Exchange、Queue、RoutingKey并且同时实现监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到Direct消息:【"+msg+"】");
}
2.在publisher中编写测试方法,向itcast. direct发送消息
@Test
void testDirectExchange() {
// 队列名称
String exchangeName = "root.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息,参数依次为:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
Q&A
1.描述下Direct交换机与Fanout交换机的差异?
-
Fanout交换机将消息路由给每一个与之绑定的队列
-
Direct交换机根据RoutingKey判断路由给哪个队列
-
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
2.基于@RabbitListener注解声明队列和交换机有哪些常见注解?
-
@Queue
-
@Exchange
4.发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
-
#:代指0个或多个单词
-
*:代指一个单词
区别在于Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
3.消息转换器
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
总结:SpringAMQP中消息的序列化和反序列化是怎么实现的?
- 利用MessageConverter实现的,默认是JDK的序列化
- 注意发送方与接收方必须使用相同的MessageConverter