本篇文章学习于 bilibili黑马 的视频 (狗头保命)
同步通讯 & 异步通讯 (RabbitMQ 的前置知识)
- 同步通讯:类似打电话,只有对方接受了你发起的请求,双方才能进行通讯, 同一时刻你只能跟一个人打视频电话。
- 异步通讯:类似发信息,不用对方接受,你就可以直接发信息,而且可以多线操作,同时跟多人发信息。
同步调用
发送方直接把消息传递给接收者, 如果中间有很多环节, 有一步出错, 那么所有操作都要回滚
同步调用的缺点:
- 拓展性差
- 性能下降
- 级联失败
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
消息发送者:投递消息的人,就是原来的调用方
消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
消息接收者:接收和处理消息的人,就是原来的服务提供方
异步调用中, 发送方把消息发送给消息broker, 就算完成发送任务.
接收者从消息 breker 那里订阅消息
这样,发送消息的人和接收消息的人就完全解耦了。
此时, 如果发送方发送消息出错, 不需要全部回滚, 只需要将错误信息重新发布给消息代理
如果接受方接受消息过程出现错误, 那么消息代理重传就好, 发送方还是可以正常进行其他操作
异步调用的优势:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
异步通信的缺点:
- 完全依赖于Broker的可靠性、安全性和性能
- 架构复杂,后期维护和调试麻烦
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
RabbitMQ是基于Erlang语言开发的开源消息通信中间件.
开启 RabbitMQ
打开这个目录(我的电脑是)
C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.7.4\sbin
双击这个文件
等待命令框为这个状态
点击跳转官网地址
RabbitMQ
RabbitMQ对应的架构
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
简单的收发消息 (exchange & queue)
1. 添加 exchange
2. 添加 queue
3. 绑定 exchange 和 queue
点击之前创建的 exchange
添加绑定信息
4. 发送消息
输入信息, 点击发送
5. 查看接受的消息
点击被绑定的队列
获取消息
数据隔离
基于不同用户,将管理权限分离。
基于不同的 virtual host ,将每个项目的数据隔离。
创建用户
此时用户没有 virtral hosts
退出, 重新登录
创建 virtual hosts
此时用户 zrj 已经拥有 virtual hosts 了
将 virtual hosts 切换到 /zrj 之后, 查看 queue, 会发现之前创建的 test.queue 已经看不见了, 这就是基于 用户和 virtual hosts 的数据隔离效果
Spring AMQP
RabbitMQ 基于 AMQP 协议, 因此具有跨语言的特性.
Spring 官方基于 RabbitMQ 提供了一套消息收发管理工具 ---- Spring AMQP
Spring AMQP 提供的功能
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
项目中使用 RabbitMQ
如果你很懒, 不想自己创建, 就去bilibili黑马的课程里, 找资源吧
(毕竟我也是在那里学的)
发送消息
创建项目, 导入依赖
项目目录结构
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加一下配置
Spring 中发送消息
RabbitMQ 接收到消息
上面是使用管道简单的发送消息, 结构大概是这样
我们查看 的 convertAndSend 方法可以看到, 由于多态, 参数的选择有很多方案, 上面使用的是
convertAndSend(String routingKey, Object object)
因为所有的数据发送都是 Object , 因此我们接收方那里收到的, 也是 Object 类型, 如果想取出使用, 记得转换类型
接收消息
增加配置
编写监听代码
启动 consumer 类
publisher 中进行消息发送, 此时 consumer 中可以接收到消息
WorkQueues模型
简单的说就是多个消费者消费同一个队列中的消息
通常情况下, 队列会采用轮询的方式, 每个消费者均分 队列中的消息
两个消费者共同消费同一个队列
连续发送 50 条消息
查看结果会发现, 两个消费者虽然消费速率不同, 但是最终都消费了同样数量 (25条) 的消息
如何才能使得能者多劳, 不会浪费效率呐?
修改配置信息, 设定么个消费者每次获取消息的最大数量 (原先是 几个消费者 平均分掉 所有消息), 消费完成获取的消息之后才能获取下一条消息
引入交换机之后的 生产者消费者模型
注意: Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型:
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout 交换机
交换机把消息发送给 所有 绑定该交换机的队列
创建交换机
创建两个队列并绑定
发送消息, 并接收处理
消息发送方
消息接收方
消息处理
可以看到, Fanout 交换机把消息 广播 给了绑定它的所有队列
Direct 交换机
Direct 模型下, 队列与交换机的绑定会指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时吗也必须指定消息的 RoutingKey
Exchange 会将消息交给具有相同 RoutingKey 的队列
创建交换机
创建队列
将交换机和队列进行绑定 (填写 RoutingKey)
消息接收
此时会发现, 对于不同的消息发送, 会有不同的处理
当 RoutingKey 为 red 时, 两个消费者都会进行消息处理
当 RoutingKey 为 green / blue 时, 只有绑定对应的 RoutingKey 的消费者都会进行消息处理
Tocpic 交换机
和 Direct 交换机基本类似, 但是绑定的 RoutingKey 可以使用通配符
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
创建 tocpic 交换机
创建队列
将 topic 交换机与 队列 进行路由绑定
消息发送
消息接收
消息处理
声明队列和交换机
使用 Spring AMQP 提供的 API 来声明队列和交换机, 而不是使用 RabbitMQ 控制台来操作
使用 ExchangeBuilder 来创建队列和交换机
绑定队列和交换机
Fanout 实例
写一个类声明交换机和队列
运行 publisher 后, 可以在控制台看到 队列, 交换机, 绑定 都已完成
Direct示例 (Topic 与其几乎完全相同)
写一个类声明交换机和队列
运行 publisher 后, 可以在控制台看到 队列, 交换机, 绑定 都已完成
基于注解声明
由于要绑定多个 key 的情况, 基于 @Bean 的方式声明队列和交换机会很麻烦, 因此 Spring AMQP 提供了注解的方式来声明 交换机和队列
Fanout 示例
在消费的同时声明 Fanout 交换机和队列
运行 publisher 后查看 RabbitMQ 控制台
Direct示例 (Topic 与其几乎完全相同)
在消费的同时声明 Fanout 交换机和队列
运行 publisher 后查看 RabbitMQ 控制台