一、MQ
高性能的异步通讯组件
课程背景
同步通讯:并发能力弱
异步通讯:并发能力强
1. 初始MQ
1.1 同步调用
以黑马商城的余额支付为例:
(1)同步调用的优势是什么?
- 时效性强,等待到结果后才返回
(2)同步调用的问题是什么?
- 拓展性差
- 性能下降
- 级联失败问题
1.2 异步调用
异步调用通常是基于消息通知的方式,包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用者
- 消息接收者:接收和处理消息的人,就是原来的服务提供者
- 消息代理:管理、暂存、转发消息,可以理解为微信服务器。
支付服务不再同步调用业务关联度低的服务,而是发送消息通知到Broker。
有点类似于观察者模式
具备下列优势:
- 解除耦合,拓展性强
- 无需等待,性能好
- 故障隔离
- 缓存消息,流量削峰填谷
异步调用的问题:
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker(消息代理)的可靠性
1.3 MQ技术选项
MQ(MessageQueue),消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scale&Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
二、RabbitMQ
RabbitMQ: One broker to queue them all | RabbitMQ
1. 安装部署
①基于Docker来安装RabbitMQ
把day06资料中提供的mq.tar上传到虚拟机root/目录下,加载镜像
docker load -i mq.tar
删除tar包
rm -rf *.tar
创建并运行容器
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management
查看mq的运行日志
docker logs -f mq
②登录查看 http://192.168.126.151:15672/
用户名:itheima,密码:123321
基础介绍
RabbitMQ的整体架构及和核心概念:
- publisher:消息发送者
- consumer:消息的消费者
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责路由消息。生产者发的消息由交换机决定投递到哪个队列
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
2. 快速入门
需求:在RabbitMQ的控制台完成下列操作:
- 新建队列hello.queue1和hello.queue2
- 向默认的amp.fanout交换机发送一条消息
- 查看消息是否到达hello.queue1和hello.queue2
- 总结规律
①添加队列
②发送消息
没配置路由
③在交换机和队列之间建立关系
发送消息
消息到达hello.queue1和hello.queue2
④消息发送的注意事项有哪些?
- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给预期绑定的队列,因此队列必须与交换机绑定
3. 数据隔离
需求:在RabbitMQ的控制台完成下列操作:
- 新建一个用户hmall
- 为hmall用户创建一个virtual host
- 测试不同virtual host之间的数据隔离现象
对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host
的隔离特性,将不同项目隔离。一般会做两件事情:
-
给每个项目创建独立的运维账号,将管理权限分离。
-
给每个项目创建不同的
virtual host
,将每个项目的数据隔离。
步骤:
①新建一个用户hmall
②用hmall用户登录,添加虚拟主机
③添加sample.queue队列
④交换机与队列绑定
⑤发送消息
⑥获取消息
三、Java客户端
1. 快速入门
SpringAmqp的官方地址:Spring AMQP
导入课前资料提供的Demo工程来测试:
案例需求:
- 利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
(1)创建队列simple.queue
(2)在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
步骤:
①引入spring-amqp依赖
在父工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②配置RabbitMQ服务端信息
在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ
spring:
rabbitmq:
host: 192.168.126.151 # 改为自己的主机名
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
③发送消息
SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
创建单元测试类
执行单元测试
④接收消息
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("spring 消费者接收到消息:[" + message + "]");
}
}
把父工程的pom.xml里lombok依赖的版本指定为1.18.30
启动ComsumerApplication
总结
SpringAMQP如何收发消息?
①引入spring-boot-starter-amqp依赖;
②配置rabbitamqp服务端信息;
③利用RabbitTemplate发送消息;
④利用@RabbitListener注解声明要监听的队列,监听消息
2. Work Queues
Work queues,任务模型。简单来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
案例:模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
1. 在RabbitMQ的控制台创建一个队列,名为work.queue
2. 在publisher服务中定义测试方法,发送50条消息到work.queue
3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
4. 消费者1每秒处理40条消息,消费者2每秒处理5秒消息
步骤:
①添加一个新队列work.queue
②在publisher服务中定义测试方法,发送50条消息到work.queue
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 1. 队列名
String queueName = "simple.queue";
// 2. 消息
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
@Test
public void testWorkQueue() {
// 1. 队列名
String queueName = "work.queue";
// 2. 消息
for (int i = 1; i <= 50; i++) {
String message = "hello, spring amqp_" + i;
rabbitTemplate.convertAndSend(queueName, message);
}
}
}
③在consumer服务中定义两个消息监听者,都监听work.queue队列
实际:多实例部署
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("监听到simple.queue的消息:[" + message + "]");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {
System.out.println("消费者1接收到消息:" + message + ", " + LocalTime.now());
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {
System.err.println("消费者2......接收到消息:" + message + ", " + LocalTime.now());
}
}
④先重启ConsumerApplication,然后运行单元测试testWorkQueue()
⑤加上延时
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
log.info("监听到simple.queue的消息:[" + message + "]");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("消费者1接收到消息:" + message + ", " + LocalTime.now());
Thread.sleep(25);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.err.println("消费者2......接收到消息:" + message + ", " + LocalTime.now());
Thread.sleep(200);
}
}
⑥消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此,我们需要修改application.yml(消费者),设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:(能者多劳)
spring:
rabbitmq:
listener:
simple:
prefetch: 1
总结:
Work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳。
3. Fanout交换机
交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。
常见的交换机的类型有以下三种:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,基于RoutingKey(路由key)发送给订阅了消息的队列
Topic:通配符订阅,与Direct类似,不过RoutingKey可以使用通配符
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
案例:利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
4. 在publisher中编写测试方法,向hmall.fanout发送消息
步骤:
①在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
②在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
③hmall.fanout交换机与两个队列fanout.queue1、fanout.queue2绑定
④在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
log.info("消费者1监听到fanout.queue1的消息:[{}]", message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
log.info("消费者1监听到fanout.queue2的消息:[{}]", message);
}
}
⑤在publisher中编写测试方法,向hmall.fanout发送消息
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanoutQueue() {
// 1. 交换机名
String exchangeName = "hmall.fanout";
// 2. 消息
String message = "hello, everyone!";
// 发送消息,参数:交换机名称、RoutingKey(暂时为空)、message
rabbitTemplate.convertAndSend(exchangeName, null, message);
}
}
⑥先启动ConsumerApplication,后运行单元测试
4. Direct交换机
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为定向路由。
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例:利用SpringAMQP演示DirectExchange的使用
需求如下:
- 1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
- 2. 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
- 3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall、direct发送消息
步骤:
①在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
②在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
③在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {
log.info("消费者1监听到direct.queue1的消息:[{}]", message);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {
log.info("消费者2监听到direct.queue2的消息:[{}]", message);
}
}
④ 在publisher中编写测试方法,利用不同的RoutingKey向hmall、direct发送消息
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirectQueue() {
// 1. 交换机名
String exchangeName = "hmall.direct";
// 2. 消息
String message = "红色!";
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
}
RoutingKey为 "red":
RoutingKey为 "blue":
RoutingKey为 "yellow":
总结
Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
5. Topic交换机
TopicExchange也是基于RoutingKey做消息路由,但是RoutingKey通常是多个单词的组合,并且以.分割。Queue与Exchange指定BindingKey时可以使用通配符。
通配符规则:
-
#
:匹配0个或多个词 -
*
:匹配不多不少恰好1个单词
举例:
-
item.#
:能够匹配item.spu.insert
或者item.spu
-
item.*
:只能匹配item.spu
假如此时publisher发送的消息使用的RoutingKey
共有四种:
-
china.news
代表有中国的新闻消息; -
china.weather
代表中国的天气消息; -
japan.news
则代表日本新闻 -
japan.weather
代表日本的天气消息;
案例:利用SpringAMQP演示DirectExchange的使用
需求如下:
1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
2. 在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
步骤:
①在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
②在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
③在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {
log.info("消费者1监听到topic.queue1的消息:[{}]", message);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {
log.info("消费者2监听到topic.queue2的消息:[{}]", message);
}
}
④在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopicQueue() {
// 1. 交换机名
String exchangeName = "hmall.topic";
// 2. 消息
String message = "新闻!";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
}
先重启ConsumerApplication,后启动单元测试
-
message = "今天天气不错!",routingKey:"china.weather"
- message = "八嘎!",routingKey:"japan.news"
6. 声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
例如,声明一个Fanout类型的交换机,并且创建队列与其绑定:
@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout");
}
// 声明第1队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 绑定队列1和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// ... ...,以相同方式声明第2个队列,并完成绑定
}
步骤:
①在RabbitMQ控制台把之前创建的fanout.queue1和fanout.queue2以及hmall.fanout删除
②在consumer模块新增config.FanoutConfiguration
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout");
// return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}
@Bean
public Queue fanoutQueue1() {
// return QueueBuilder.durable("fanout.queue1").build();
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2() {
// return QueueBuilder.durable("fanout.queue2").build();
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutQueue2Binding(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
③启动ConsumerApplication,在RabbitMQ控制台查看
案例:利用SpringAMQP声明DirectExchange并与队列绑定
需求如下:
1. 在consumer服务中,声明队列direct.queue1和direct.queue2
2. 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
步骤:
①SpringAmqpTest
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {
log.info("消费者1监听到direct.queue1的消息:[{}]", message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String message) {
log.info("消费者2监听到direct.queue2的消息:[{}]", message);
}
}
7. 消息转换器
案例:消息转换器
需求:测试利用SpringAMQP发送对象类型的消息
①声明一个队列,名为object.queue
②编写单元测试,向队列中直接发送一条消息,消息类型为Map
③在控制台查看消息,总结发现的问题
步骤:
①声明一个队列,名为object.queue
②编写单元测试,向队列中直接发送一条消息,消息类型为Map
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendObject() {
// 1. 准备消息
Map<String, Object> msg = new HashMap<>(2);
msg.put("name", "Jack");
msg.put("age", 21);
// 2. 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
}
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
存在下列问题:
- JDK的序列化有安全风险
- JDK序列化的消息太大
- JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
在publisher和consumer中都要配置MessageConverter:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
改进步骤:
①在父工程中引入依赖pom.xml(mp-demo)
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
②publisherApplication
package com.itheima.publisher;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
运行单元测试testSendObject()
③consumerApplication
package com.itheima.consumer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
④SpringRabbitListener
package com.itheima.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
import java.util.Map;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> message) {
log.info("消费者2监听到object.queue的消息:[{}]", message);
}
}
⑤运行ConsumerApplication
四、黑马商城业务改造
需求:改造余额支付功能,不再同步调用交易服务的OpenFeign接口,而是采用异步MQ通知交易服务更新订单状态。
步骤:
①在trade-service和pay-service模块的pom.xml中引入依赖
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②在trade-service和pay-service模块的application.yaml中配置MQ地址
spring:
rabbitmq:
host: 192.168.126.151 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
③在hm-common模块的config包下新增一个MqConfig
package com.hmall.common.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
同时,在该模块下跌的resources包下的META-INF.spring.factories中让springBoot扫描到MqConfig,使其生效
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.hmall.common.config.MyBatisConfig,\
com.hmall.common.config.MvcConfig,\
com.hmall.common.config.JsonConfig,\
com.hmall.common.config.MqConfig
④在trade-service模块下新增listener.PayStatusListener(消费者)
package com.hmall.trade.listener;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.direct"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId) {
orderService.markOrderPaySuccess(orderId);
}
}
⑤在hm-common模块中引amqp依赖,否则会报以下错
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
重启TradeApplication
⑥pay-service模块下的PayOrderServiceImpl
package com.hmall.pay.service.impl;
// ... ...
/**
* <p>
* 支付订单 服务实现类
* </p>
*
* @author 虎哥
* @since 2023-05-16
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {
private final UserClient userClient;
private final TradeClient tradeClient;
private final RabbitTemplate rabbitTemplate;
@Override
@GlobalTransactional
public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
// 1.查询支付单
PayOrder po = getById(payOrderFormDTO.getId());
// 2.判断状态
if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
// 订单不是未支付,状态异常
throw new BizIllegalException("交易已支付或关闭!");
}
// 3.尝试扣减余额
userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());
// 4.修改支付单状态
boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());
if (!success) {
throw new BizIllegalException("交易已支付或关闭!");
}
// 5. TODO 修改订单状态
// tradeClient.markOrderPaySuccess(po.getBizOrderNo());
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
} catch (Exception e) {
log.error("发送支付状态通知失败,订单id:{}", po.getBizOrderNo(), e);
}
}
}
重启PayApplication
⑦在RabbitMQ控制台查看交换机和队列是否创建成功
⑧去黑马商城下单进行测试
查看数据库表