初识RabbitMQ:高性能异步通讯组件
同步调用
异步调用
场景:1.对结果不关心时异步。订单状态-异步,查询-同步
2.影响性能。调用链超长,可改成异步
MQ技术对比
kafka日志收集
RabbitMQ整体架构
快速入门
- 交换机只负责路由消息,没有消息保存能⼒
- 发信息前,queue需要binding绑定交换机
数据隔离
给每个项目创建自己的用户user和虚拟主机virtual host。不同virtual host数据隔离
Java客户端
RabbitMQ Tutorials — RabbitMQ
AMQP 和 Spring AMQP
RabbitMQ 中的 AMQP 协议:AMQP 的全称是 Advanced Message Queue Protocol,即高级消息队列协议。RabbitMQ 就是根据这个协议开发的。AMQP 是一个标准的协议,不仅 RabbitMQ,如果你想自己实现一个消息队列,也可以按照这个协议来设计。 AMQP 协议主要由几个部分组成,如下图所示。
Publisher:消息的生产者,它负责发送消息。
Consumer:消息的消费者,它负责接收消息。
中间的部分,用兔子头标识的就是 RabbitMQ,功能如下:
生产者:发消息到某个交换机。
消费者:从某个队列中取消息。
交换机(Exchange):像路由器一样,负责将消息从生产者发送到对应的队列。
队列(Queue):存储消息的地方。
路由(Routes):转发,就是怎么把消息从一个地方转到另一个地方(比如从生产者转发到某个队列)。
快速入门
1.引入依赖
2.配置RabbitMQ信息(application.yml)
3.使⽤RabbitTemplate 发送信息(Publisher)
4.使⽤注解 @RabbitListener 监听
work模型
需求:模拟WorkQueue,实现⼀个队列绑定多个消费者
默认一个消费者一个消费者轮流处理消息,但消费者处理能力不同,消费者1处理块。结果:消费者1率先处理完25条信息,消费者2缓慢处理25条信息。 问题:资源分配不合理。需要考虑队列的处理业务能⼒,能者多劳。
⾯试题:如何处理消息队列中堆积问题
1.绑定多个消费者,加快消息处理速度,解决消息堆积
2.优化业务代码:缓存、池化、异步(MQ、Java代码的异步-事件监听、线程池)
交换机
直发给队列,只能有一个服务接收到消息,其他服务无法获取到,所以需要交换机路由转发。
Fanout交换机
效果:Fanout⼴播交换机,两个队列都收到了信息
交换机作⽤:接收信息,路由信息
Direct交换机
需求:不同服务需要收到不同的消息
效果:根据Key接收消息
Topic交换机
Direct能做的 Topic也能做,Topic在Key上更灵活方便
声明队列和交换机
问题:控制台创建队列和交换机很麻烦且易错(开发、生成、测试都要重新创建一遍) ,需要使用Java代码声明队列和交换机
注意:⼀般将配置类创建在消费者中,因为消费者更加关⼼交换机和队列及其关系,而生产者只要给对应交换机发信息即可。
方式一(基于Bean,麻烦)
代码实现:在Config类中 new 或者 ⽤⼯⼚直接创建交换机与队列,并进⾏绑定
缺点:当创建Direct交换机需要绑定多个Key时,要写多个Bean类,代码冗余
方式二(基于注解,简单)
基于@RabbitListener注解对监听器进⾏绑定
@RabbitListener(bindings = @QueueBinding(xxx))
总结
消息转换器
发信息为⼀个Map 对象,⽽不是String
追源码
解决:使⽤JSON序列化代替默认的JDK序列化(Spring MVC包含Jackson依赖,只需设定Bean即可)
业务改造
1.生产者和消费者都引入amqp依赖
2.生产者和消费者都配置amqp,application.yml
3.定义JSON转化器(common中定义Jackson Bean类:Jackson2JsonMessageConverter())
4.消费者编写监听器
队列名、Key、交换机(Topic)已在需求中定义好
message 观察原先调用支付服务中的已支付接口
5.生产者发消息
写⽣产者接⼝(修改具体业务接⼝的代码逻辑),在业务内使⽤ RabbitTemplate发信息,message为Long类型的订单ID。业务内发信息的⽚段建议try-catch,以免影响业务⽽回滚
参考:MQ基础-01.RabbitMQ课程介绍_哔哩哔哩_bilibili