文章目录
- 1、简介
- 2、MQ
- 优点
- 缺点
- MQ的应用场景
- AMQP
- 工作原理
- 市面上常见的MQ
- 3、Linux安装RabbitMQ
- 3.1 版本对应
- 3.2 安装socat
- 3.3 下载 Erlang/OTP、安装、验证 erlang
- 方法一:
- 1. 下载
- 2. 将下载的Erlang服务上传到服务器上面
- 3. 解压
- 4. 编译erlang的依赖环境
- 5. 安装Erlang
- 6. 配置Erlang环境
- 7. 测试Erlang是否安装成功
- 方法二:
- 1. 下载
- 2. 安装
- 3. 验证 erlang 是否安装成功。
- 4. 卸载 erlang(遇到下载的erlang与rabbitmq-server 版本冲突)
- 5. 重新安装 erlang 和验证 erlang
- 3.4 安装、验证rabbitmq-server(rabbitMQ服务器)
- 1. 下载RabbitMQ
- 2. 将RabbitMQ上传到服务器
- 3. 解压RabbitMQ服务
- 4. 配置环境变量
- 5. 开启web管理插件
- 6. 启动RabbitMQ服务
- 7. 访问RabbitMQ管理界面
- 8. 设置允许远程访问
- 方法一——新加用户
- 方法二——设置guest
- 4、RabbitMQ实战
- 4.1 什么是消息队列
- 4.2 RabbitMQ简介
- 4.3 消息队列应用场景
- 1. 任务异步处理:
- 2. 应用程序解耦合:
- 4.4 RabbitMQ的工作原理
- 1. 组成部分说明:
- 2. 生产者发送消息流程:
- 3. 消费者接收消息流程:
- 4.5 六种工作模式
- 4.5.1 基本消息模式(简单消息模式)
- 1.1 案例实战
- 1、 新建一个maven工程
- 2、 添加依赖
- 3、 再到java目录下创建org.example.util包,在此包下创建连接工具类:
- 4、 生产者发送消息
- 4.1 在org.example.simple包下创建Send类,用于生产者发送消息。
- 4.2 运行上述main方法,在控制台打印信息如下所示:
- 4.3 打开浏览器访问:http://IP:15672
- 4.4 如下图点击Queues,可以在队列列表中可以看到名为simple_queue的队列。
- 4.5 点击队列名称simple_queue,进入详情页 --->Get messages,可以查看消息:
- 5、消费者接收消息
- 5.1 在org.example.simple包下创建Receiver类,用于消费者接收消息
- 5.2 运行上述main方法,在控制台打印信息如下:
- 5.3 打开浏览器访问:http://IP:15672
- 5.4 再看看队列的消息,已经被消费了,**Ready值为0,Total值也为0了**。
- 1.2 消息确认机制ACK
- 1、 在org.example.simple包下创建ACKReceiver类,用于消费者接收消息
- 2、自动ACK存在的问题
- 2.1 修改消费者**Receiver类**的代码
- 2.2 生产者Send类不做任何修改,**直接运行Send类中的main方法**,
- 2.3 运行Receiver类消费者中的main方法,程序抛出异常:
- 2.4再查看rabbitmq的web管理界面:
- 3、演示手动ACK
- 3.1 重新运行生产者Send中的main方法,实现发送消息,
- 3.2 再修改ACKReceiver类中的handleDelivery方法
- 3.3 再运行ACKReceiver类中的main方法,程序抛出异常:
- 3.4 查看web管理页面:
- 4、最后消息确认机制的正确做法
- 4.1 我们要在监听队列时设置第二个参数为false,代码中手动进行ACK
- 4.2 最后运行ACKReceiver类中的main方法,查看web管理页面
- 4.5.2 work消息模型
- 2.1 案例实战
- 1、生产者
- 2、消费者1
- 3、消费者2
- 4、进行消息消费
- 2.2 能者多劳
- 2.3 订阅模型分类
- 1. 说明
- 2. Exchange类型有以下几种:
- 4.5.3 Publish/subscribe(交换机类型:Fanout,也称为广播)
- 3.1 案例实战
- 1、生产者
- 2、消费者1(注册成功发给短信服务)
- 3、消费者2(注册成功发给邮件服务)
- 4、进行消息消费
- 5、思考
- 5.1 publish/subscribe与work queues有什么区别。
- 5.2 实际工作用 publish/subscribe还是work queues?
- 4.5.4 Routing 路由模型(交换机类型:direct)
- 4.1 案例实战
- 1、生产者
- 2、消费者1(使用routing key为sms来绑定队列与交换机)
- 3、消费者2(用routing key为email来绑定队列与交换机)
- 4.5.5 Topics通配符模式(交换机类型:topics)
- 4.1 Topics模型示意图:
- 4.2 通配符规则
- 4.3 举例
- 4.4 案例实战
- 1、生产者
- 2、消费者1
- 3、消费者2
- 4.6 SpringBoot整合RabbitMQ
- 4.6.1 创建SpringBoot项目
- 4.6.2 添加依赖
- 4.6.3 添加配置
- 4.6.4 添加配置类
- 4.6.5 在测试类中添加生产者,并发送消息
- 4.6.6 生产者发送消息测试结果
- 4.6.7 创建消息接收器类
- 4.6.8 消费者消费结果
- 5、RabbitMQ问题相关解决方案
- 5.1 生产端可靠性投递方案介绍
- 5.1.1 关于怎么保证生产者的可靠性投递
- 5.1.2 消息发送
- 方案一:消息落库,对消息状态进行标记
- 方案二:消息延迟投递,做二次确认,回调检查
- 5.1.3 消息落库,对消息状态打标的具体实现
- 开启消息回调机制
- 1. 创建数据库
- 2. 创建Spring Boot项目
- 3. 添加依赖
- 4. 添加配置
- 5. 创建实体类
- 6. 创建mapper
- 7. 创建Controller
- 8. 创建配置类
- 9. 进行任务调度
- 10.最终结果
- 1、发送消息成功
- 2、模拟生产者消息首次发送失败
- 3、模拟消息首次发送失败,定时任务重试也失败
- 11.在高并发的场景下是否合适?
- 5.2 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息重复消费?
- 5.2.1 幂等性
- 5.2.2 高并发的情况下如何避免消息重复消费
- 5.2.3 解决重复消费的案例代码
- 1. 添加依赖
- 2. 添加配置
- 3. 生产者代码
- 4. 消费者代码
- 5. 最终结果
- 1、消息消费成功
- 2、模拟消息重复消费场景
- 5.3 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息积压?
- 5.3.1 解决方案
- 更多命令
1、简介
素材:链接:https://pan.baidu.com/s/1YjVM9WBEIVCbYZmlzowyKw?pwd=lpkz
官网:https://www.rabbitmq.com/
RabbitMQ 是一个开源的消息队列中间件,采用 Erlang 语言编写,支持多种消息协议,如 AMQP、MQTT、STOMP 等。它可以作为消息的中转站,在分布式系统中协调不同组件之间的数据传输,实现松耦合的系统架构。
2、MQ
优点
- 灵活的路由方式,支持消息的广播、点对点、主题订阅等多种路由方式;
- 异步处理消息,提高系统的并发性能;
- 持久化机制,保证在服务器宕机、重启等情况下消息的可靠性;
- 高可用和负载均衡,支持集群和镜像模式,提供高可用和负载均衡的目标;
- 插件机制,支持丰富的插件,如认证授权、可视化管理等。
缺点
- 系统可用性降低: 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
- 系统复杂度提高: MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
- 一致性问题 : A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。
MQ的应用场景
- 高峰流量:抢红包、秒杀活动、抢火车票等这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。 而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。
- 消息分发:如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。
- 数据同步:假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、 搜索引擎中。
- 异步处理:在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时
- 离线处理:在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。
AMQP
1、什么是 AMQP : 即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。
2、AMQP工作过程: 生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者
工作原理
- Producer【消息的生产者】 一个向交换机发布消息的客户端应用程序。
- Connection 【连接】 生产者/消费者和RabbitMQ服务器之间建立的TCP连接。
- Channel【信道】 是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。
- Broker 消息队列服务器实体。即RabbitMQ服务器
- Virtual Host【虚拟主机】 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是 /
- Exchange 【交换机】 用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。
- Queue【消息队列】 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个 队列将其取走。
- Binding 【绑定】 消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。
- Consumer【消息的消费者】 表示一个从消息队列中取得消息的客户端应用程序。
市面上常见的MQ
3、Linux安装RabbitMQ
安装rabbitmq分3个步: 1、先安装socat, ——》2、安装erlang, ——》3、安装rabbitmq-server。
3.1 版本对应
网址:https://www.rabbitmq.com/which-erlang.html
3.2 安装socat
命令:yum -y install socat
3.3 下载 Erlang/OTP、安装、验证 erlang
官网:下载 - Erlang/OTP
方法一:
1. 下载
2. 将下载的Erlang服务上传到服务器上面
cd /home
mkdir /home/rabbitMQ
cd /home/rabbitMQ
3. 解压
tar -zvxf otp_src_24.0.tar.gz
4. 编译erlang的依赖环境
跟大家讲一下,erlang依赖的环境特别特别多,就拿gcc来说,如果以前安装过这个环境还不止,所以我们重新安装一下也无所谓所以我们执行以下的命令:
解压成功,安装编译所需要的依赖文件
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel
等待安装完毕
创建Erlang文件夹
mkdir /home/rabbitMQ/erlang
cd /home/rabbitMQ/otp_src_24.0
然后执行下面的命令
./configure --prefix=/home/rabbitMQ/erlang --without-javac
5. 安装Erlang
make : 编译
make install : 安装
&& : 前面的命令执行成功后面的命令才会执行
make && make install
6. 配置Erlang环境
vi /etc/profile
加入
#set erlang environment
export ERLANG_HOME=/home/rabbitMQ/erlang
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:${ERLANG_HOME}/bin:$PATH
按Esc键 输入 :wq (退出并保存) :q! (退出不保存)
刷新配置文件
7. 测试Erlang是否安装成功
输入命令: erl
如图所示说明已经安装成功了!!
方法二:
1. 下载
下载命令: sudo yum install erlang
2. 安装
接着上一步, 继续回复“y”,
提示:见到Complete!(成功),表示安装erlang 成功了。
3. 验证 erlang 是否安装成功。
命令: yum info erlang
提示: erlang 的版本信息、软件网址、占用大小空间等,就表示安装成功了。
4. 卸载 erlang(遇到下载的erlang与rabbitmq-server 版本冲突)
执行3条命令
yum list | grep erlang
yum -y remove erlang-*
yum remove erlang.x86_64
5. 重新安装 erlang 和验证 erlang
安装已经下载好的erlang包, 文件路径 ./rabbitMQ/ 文件下
安装命令: rpm -ivh erlang-23.3-2.el8.x86_64.rpm
验证erlang命令: yum info erlang
3.4 安装、验证rabbitmq-server(rabbitMQ服务器)
注意:需要下载Linux版本的
官网:https://www.rabbitmq.com/
在RabbitMQ官网可以看到RabbitMQ对应的Erlang版本
1. 下载RabbitMQ
2. 将RabbitMQ上传到服务器
cd /home/rabbitMQ
3. 解压RabbitMQ服务
根据压缩包后缀不同使用不同的命令进行解压
xz -d rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar.xz
tar -xvf rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar
4. 配置环境变量
vi /etc/profile
加入
#set rabbitmq environment
export RABBITMQ=/home/rabbitMQ/rabbitmq_server-3.9.5
export PATH=$PATH:${RABBITMQ}/sbin
按Esc键 输入 :wq (退出并保存) :q! (退出不保存)
刷新配置文件
source /etc/profile
5. 开启web管理插件
cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin
./rabbitmq-plugins enable rabbitmq_management # 启动指定的插件:
启动插件成功
6. 启动RabbitMQ服务
ls
./rabbitmq-server -detached # 以守护进程启动
7. 访问RabbitMQ管理界面
浏览器访问:http://IP:15672
看到如下这个界面就是正常启动了
8. 设置允许远程访问
从上面截图可以看到使用guest登录,提示“User can only log in via localhost”,无法登录,原因是3.3.0版本后禁止用户在除locahost外的访问,只能通过本地主机登录。
方法一——新加用户
新加个用户,设置权限,设置角色。
rabbitmqctl add_user admin admin
:这个命令是用来添加一个新的RabbitMQ用户这个命令将创建一个名为admin
的用户,并设置其密码为admin
请注意,这两个参数(用户名和密码)在你的问题中是硬编码的,这在实际生产环境中并不安全,建议使用更复杂和随机化的用户名和密码rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
:这个命令是设置用户admin
在RabbitMQ的权限这里的-p /
参数表示设置的是全局权限而".*" ".*" ".*"
表示赋予admin
用户所有权限,包括配置权限、写权限和读权限rabbitmqctl set_user_tags admin administrator
:这个命令是为用户admin
添加了一个标签(或者权限等级)在这个例子中,添加的是administrator
标签这个命令可能不是必要的,因为RabbitMQ通常不会直接使用这种用户标签
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
登录成功
方法二——设置guest
在/home/rabbitMQ/rabbitmq_server-3.9.5/plugins/rabbit-3.9.5/ebin
目录下找到rabbit.app
文件 (find / -name rabbit.app),修改参数。
{loopback_users, [<<"guest">>]}, 修改成{loopback_users, []},
重启服务
rabbitmqctl stop #停止RabbitMQ
cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin
./rabbitmq-server -detached # 以守护进程启动RabbitMQ
使用guest账号登录成功
4、RabbitMQ实战
4.1 什么是消息队列
MQ全称为Message Queue,即消息队列。”消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
下图中Producer为生产者,Queue为消息队列,Consumer为消费者
4.2 RabbitMQ简介
RabbitMQ是一个开源的消息中间件,它实现了高效、可靠的消息传递机制,主要用于应用程序之间的异步通信。它基于AMQP(高级消息队列协议)规范设计,支持多种编程语言,并提供了丰富的特性和灵活的架构。
RabbitMQ的工作原理是利用队列来存储消息,并通过发布-订阅模式实现消息的发送和接收。在这个模式下,消息的发送者将消息发布到一个交换器,交换器根据预定义的规则将消息路由到一个或多个队列,然后消息的接收者从队列中订阅并消费这些消息。
4.3 消息队列应用场景
1. 任务异步处理:
高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
2. 应用程序解耦合:
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合
4.4 RabbitMQ的工作原理
1. 组成部分说明:
· Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
· Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤。
· Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的接受者
· Producer:消息生产者,即生产方客户端,生产方客户端将消息发送给消息队列
· Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
2. 生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立Channel通道(信道)。
3、生产者通过Channel通道(信道)把消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
3. 消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立Channel通道(信道)
3、消费者监听指定的Queue(队列) (每个队列都有一个名字)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack(消息确认机制)回复
4.5 六种工作模式
RabbitMQ有六种工作模式:基本消息模式、work消息模式、Publish/subscribe (交换机类型:Fanout,也称为广播模式)、Routing 路由模型(交换机类型:direct)、Topics 通配符模式(交换机类型:topics)、RPC
我们这里给大家重点介绍基本消息模式, Routing路由模式(重点)、Topic通配符模式(重点)。
4.5.1 基本消息模式(简单消息模式)
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1.1 案例实战
1、 新建一个maven工程
根据下面的步骤建立maven项目
2、 添加依赖
<!--rabbitmq依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<!--再额外添加slf4j的依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
3、 再到java目录下创建org.example.util包,在此包下创建连接工具类:
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址 (因为rabbitmq安装到linux上面,这里填写linux的IP地址)
factory.setHost("192.168.181.128");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码(rabbitmq的用户名和密码)
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
4、 生产者发送消息
4.1 在org.example.simple包下创建Send类,用于生产者发送消息。
public class Send {
private final static String QUEUE_NAME = "simple_queue"; // 队列名
public static void main(String[] argv) throws Exception {
// 1、获取到连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、声明(创建)队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
channel.close();
connection.close();
}
}
4.2 运行上述main方法,在控制台打印信息如下所示:
4.3 打开浏览器访问:http://IP:15672
web管理页面:服务器地址/端口号 默认用户及密码:guest,如果没有配置可根据 设置允许远程访问
中进行用户名密码配置
4.4 如下图点击Queues,可以在队列列表中可以看到名为simple_queue的队列。
4.5 点击队列名称simple_queue,进入详情页 —>Get messages,可以查看消息:
5、消费者接收消息
5.1 在org.example.simple包下创建Receiver类,用于消费者接收消息
public class Receiver{
private final static String QUEUE_NAME = "simple_queue"; //队列名
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!");
}
};
// 监听队列,第二个参数:是否自动进行消息确认(用于监听queue队列中是否收到了消息,如果收到消息自动调用上面DefaultConsumer进行默认消费)。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,如果设置为false要通过编程实现回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.2 运行上述main方法,在控制台打印信息如下:
5.3 打开浏览器访问:http://IP:15672
web管理页面:服务器地址/端口号 默认用户及密码:guest,如果没有配置可根据 设置允许远程访问
中进行用户名密码配置
5.4 再看看队列的消息,已经被消费了,Ready值为0,Total值也为0了。
我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印
1.2 消息确认机制ACK
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
Ø 自动ACK:消息一旦被接收,消费者自动发送ACK
Ø 手动ACK:消息接收后,不会发送ACK,需要手动调用
大家觉得哪种更好呢?
这需要看消息的重要性:
Ø 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
Ø 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码。
1、 在org.example.simple包下创建ACKReceiver类,用于消费者接收消息
public class ACKReceiver {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
// 手动进行ACK
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用来标识消息的id
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列,第二个参数false,手动进行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
ACKReceiver类与Receiver类最大的区别就是在消息消费的时候添加了channel.basicAck(envelope.getDeliveryTag(), false);channel.basicConsume(QUEUE_NAME, false, consumer);
2、自动ACK存在的问题
2.1 修改消费者Receiver类的代码
因为Receiver类是采用自动ACK,在handleDelivery方法中添加异常,如下:
2.2 生产者Send类不做任何修改,直接运行Send类中的main方法,
消息发送成功,再访问到RabbitMQ的web界面(注:之前启动的Receiver消费者要停掉服务),
2.3 运行Receiver类消费者中的main方法,程序抛出异常:
2.4再查看rabbitmq的web管理界面:
消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。
3、演示手动ACK
注意:先把Receiver消费者服务停止掉
3.1 重新运行生产者Send中的main方法,实现发送消息,
消息发送成功后,再次查看web管理界面,效果如下所示,队列中收到消息一条。
3.2 再修改ACKReceiver类中的handleDelivery方法
增加如下图红框里的异常代码(模拟手动进行ack前抛出异常)。
3.3 再运行ACKReceiver类中的main方法,程序抛出异常:
3.4 查看web管理页面:
消息没有被消费掉!
这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。
4、最后消息确认机制的正确做法
4.1 我们要在监听队列时设置第二个参数为false,代码中手动进行ACK
代码如下图红框所示(之前异常的代码需要注释掉)
4.2 最后运行ACKReceiver类中的main方法,查看web管理页面
消费者消费成功!
生产者避免数据丢失:https://www.cnblogs.com/vipstone/p/9350075.html
4.5.2 work消息模型
工作队列或者竞争消费者模式
work queues与入门程序(基本消息模式)相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
2.1 案例实战
1、生产者
在org.example.work包中创建Send类,生产者循环发送50条消息
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "task .. " + i;
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 模拟网络延时
Thread.sleep(i * 2);
}
// 关闭通道和连接
channel.close();
connection.close();
}
}
2、消费者1
在org.example.work包中创建Receiver1类
public class Receiver1 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [消费者1] received : " + msg + "!");
//模拟任务耗时1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消费者2
在org.example.work包中创建Receiver2类
public class Receiver2 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4、进行消息消费
接下来,两个消费者一同启动,然后发送50条消息(先将两个消费者一起启动,再启动生产者):
2.2 能者多劳
刚才的实现有问题吗?
Ø 消费者1比消费者2的效率要低,一次任务的耗时较长
Ø 然而两人最终消费的消息数量是一样的
Ø 消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
通过BasicQos方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
注意: 需要在Receiver1和Receiver2中添加红框中的代码进行设置
2.3 订阅模型分类
1. 说明
1、一个生产者多个消费者
2、每个消费者都有一个自己的队列
3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
4、每个队列都需要绑定到交换机上
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
例子:注册->发邮件、发短信
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
2. Exchange类型有以下几种:
Ø Fanout: 广播,将消息交给所有绑定到交换机的队列 (它是没有routing key路由键)
Ø Direct:定向,把消息交给符合指定routing key 的队列 (重点) (路由键是写死的字符串)
Ø Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列(重点) (路由键是采用通配符#、进行动态匹配)
Ø Header: header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
4.5.3 Publish/subscribe(交换机类型:Fanout,也称为广播)
(广播模式中没有routing key,是从路由模式开始才有)
Publish/subscribe模型示意图 :
3.1 案例实战
1、生产者
和前面两种模式不同:
1) 声明Exchange,不再声明Queue
2) 发送消息到Exchange,不再发送到Queue
在org.example.publishsubscribe
包中创建Send
类
public class Send {
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "注册成功!!";
// 发布消息到Exchange (广播模式下是没有routingKey,所以参数二为””)
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2、消费者1(注册成功发给短信服务)
在org.example.publishsubscribe
包中创建Receiver1
类
public class Receiver1 {
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [短信服务] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消费者2(注册成功发给邮件服务)
在org.example.publishsubscribe
包中创建Receiver2
类
public class Receiver2 {
//邮件队列
private final static String QUEUE_NAME = "fanout_exchange_queue_email";
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [邮件服务] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4、进行消息消费
我们运行两个消费者,然后发送1条消息:
注意: 启动有可能会报错:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘test_fanout_exchange’ in vhost ‘/’, class-id=50, method-id=20)
报这个错误,证明我们没有声明交换机,却拿来使用了,所以我们需要先启动生产者进行交换机声明,然后在按照上面的流程走就没有问题了
5、思考
5.1 publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
5.2 实际工作用 publish/subscribe还是work queues?
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式(广播模式)可以指定自己专用的交换机。
4.5.4 Routing 路由模型(交换机类型:direct)
(路由模式中的routing key路由键格式为写死的字符串,而Topic通配符模式中的routing key是使用通配符#和*来匹配多个或一个routing key,而通过routing key来实现将队列与交换机进行绑定)
Routing模型示意图:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
4.1 案例实战
1、生产者
在org.example.routingkey
包中创建Send
类
public class Send {
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息内容,
String message = "注册成功!请短信回复[T]退订";
// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
注:上述生产者在发送消息时,是指定routing key为sms,而根据上面提到的Routing模型示意图,生产者将消息发送给exchange交换机,交换机再通过routing key与queue队列进行绑定,我们把sms作为短信的路由键。
在下面的消费者中使用routing key为sms将队列与交换机进行绑定后,就可以接收到生产者routing key为sms的消息了,换句话其他消费者如果没有使用routing key为sms绑定队列与交换机,就获取不到生产者发送的消息了(消费者2就没有使用routing key为sms来绑定队列与交换机)。
2、消费者1(使用routing key为sms来绑定队列与交换机)
在org.example.routingkey
包中创建Receiver1
类
public class Receiver1 {
private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [短信服务] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消费者2(用routing key为email来绑定队列与交换机)
在org.example.routingkey包中创建Receiver2类
public class Receiver2 {
//邮件队列
private final static String QUEUE_NAME = "direct_exchange_queue_email"; private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [邮件服务] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我们发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了(因为消费者1在绑定交换机的时候使用了sms这个routingkey)
4.5.5 Topics通配符模式(交换机类型:topics)
路由模式中的routing key路由键格式为写死的字符串,而Topic通配符模式中的routing key是使用通配符#和*来匹配多个或一个routing key,而通过routing key来实现将队列与交换机进行绑定
4.1 Topics模型示意图:
每个消费者监听自己的队列,并且设置带通配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
4.2 通配符规则
- 星号(*):匹配不多不少恰好1个词。
- 井号(#):匹配一个或多个词
4.3 举例
如示意图所示
*.orange.* : 只能匹配 test.orange.test (只能匹配一个词)
*.*.rabbit : 只能匹配 test.test.rabbit (只能匹配两个词)
lazy.# : 可以匹配 lazy.test 和 lazy.test.test (可以匹配一个或多个词)
4.4 案例实战
1、生产者
在org.example.topics
包中创建Send
类
public class Send {
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 消息内容
String message = "这是一只行动迅速的橙色的兔子";
// 发送消息,并且指定routing key为:quick.orange.rabbit
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
System.out.println(" [动物描述:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2、消费者1
在org.example.topics
包中创建Receiver1
类
public class Receiver1 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
注:上框代码中消费者在将队列与交换机进行绑定时,routing key是使用通配符模式来匹配生产者在发送消息时所指定的routing key,而非之前Routing路由模式是指定具体的某个routing key(之前路由模式是:生产者发送消息并指定routing key为test,这时消费者在将队列与交换机进行绑定时,如果指定了routing key为test,则可以接收到生产者发送的消息,反之不行,之前路由模式中的生产者与消费者的routing key是直接写死,而通配模式中消费者绑定队列与交换机时的routing key为使用通配符的形式进行匹配)
上框代码中*.orange.*是可以匹配到生产者中的quick.orange.rabbit,因此消费者1是可以接收到生产者发送的消息。
3、消费者2
在org.example.topics
包中创建Receiver2
类
public class Receiver2 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
上框代码中*.*.rabbit是可以匹配到生产者中的quick.orange.rabbit,它是可以接收到生产者发送的消息,但是lazy.#是不可以匹配quick.orange.rabbit,故懒兔子是接收不到生产者发送的消息。结果消费者1和消费者2都接收到消息了
4.6 SpringBoot整合RabbitMQ
4.6.1 创建SpringBoot项目
根据下图进行SpringBoot项目创建
注意: 选择JDK1.8
的建议用3.0
以下的SpringBoot
版本,3.0
或以上的SpringBoot
版本建议使用JDK17
或更高版本
4.6.2 添加依赖
在pom文件里面添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.6.3 添加配置
yml配置文件添加
server:
port: 8080
spring:
rabbitmq:
host: 服务器的主机名或IP地址
port: 5672
username: rabbitMQ账号
password: rabbitMQ密码
# 虚拟主机名,默认为"/"
virtual-host: /
# 发布者确认模式
publisher-confirm-type: correlated
# 是否启用发布者的返回功能
publisher-returns: true
# 模版配置
template:
retry:
# 发布重试,默认false
enabled: true
# 重试时间 默认1000ms
initial-interval: 10000ms
# 重试最大间隔时间
max-interval: 300000ms
# 重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 2
# 交换机类型
exchange: topic.exchange
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
# 消费者调用程序线程的最小数量
concurrency: 10
# 消费者最大数量
max-concurrency: 10
# 限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
# 启动时是否默认启动容器
auto-startup: true
# 被拒绝时重新进入队列
default-requeue-rejected: true
4.6.4 添加配置类
在com.example.rabbitmq.config
包中创建RabbitmqConfig
类
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_EMAIL = "queue_email"; // email队列
public static final String QUEUE_SMS = "queue_sms"; // sms队列
public static final String EXCHANGE_NAME="topic.exchange"; // topics类型交换机
// routingkey的值通常是使用了通配符,#代表匹配一个或多个,*代表匹配一个
public static final String ROUTINGKEY_EMAIL="topic.#.email.#"; // routingkey路由键
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
// 声明交换机(构建topic类型的交换机)
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
// durable(true) 持久化,rabbitmq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 声明email队列
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
// 声明sms队列
@Bean(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
// ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
// 使用routingkey实现queue与exchange两者间的绑定
// norags()表示无参
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
4.6.5 在测试类中添加生产者,并发送消息
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
/**
* 参数:
* 1、交换机名称
* 2、routingKey 是用来让交换机通过routingKey将消息发送给所对应的队列
* 3、消息内容
*/
for (int i = 0; i < 5; i++) {
String message = "恭喜您,注册成功!userid=" + i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "topic.sms.email", message);
// 交换机绑定队列是通过routingkey,而此处代表的routingkey为topic.sms.email,它与可以匹配到RabbitmqConfig类中的ROUTINGKEY_EMAIL和ROUTINGKEY_SMS,
// 即交换机通过ROUTINGKEY_EMAIL和ROUTINGKEY_SMS将message变量的值作为消息发送到queue_email和queue_sms两个队列,因为此处代码是使用for循环,即向这两个队列分别发送了5次消息。
System.out.println(" [x] Sent '" + message + "'");
}
}
启动测试类
4.6.6 生产者发送消息测试结果
web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息:
4.6.7 创建消息接收器类
在com.example.rabbitmq.receiver
包中创建ReceiveHandler
类
@Component
public class ReceiveHandler {
// 监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(Message msg, Channel channel){
// 消息的唯一标识
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
System.out.println(" [邮件服务] received : " + new String(msg.getBody()) + "!");
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// 监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(Message msg, Channel channel){
// 消息的唯一标识
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
System.out.println(" [短信服务] received : " + new String(msg.getBody()) + "!");
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
我们在配置文件中设置了手动ACK机制,所以我们代码也要进行手动ACK,不然会报错
属性说明:
@Componet: 添加到类上的注解,将注解注解所标识的类注册到Spring容器
@RabbitListener: 方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings: 指定绑定关系,可以有多个。值是@QueueBinding的数组。
@QueueBinding
包含下面属性:
Ø value: 这个消费者关联的队列。值是@Queue,代表一个队列
Ø exchange: 队列所绑定的交换机,值是@Exchange类型
Ø key: 队列和交换机绑定的RoutingKey,可指定多个
Message msg
: 在这个上下文中,Message
是一个RabbitMQ消息的封装。它包含了消息的内容(body)以及一些其他的属性(例如,消息的唯一标识,即deliveryTag
,以及消息的优先级等)。你可以将Message
对象看作是一个封装了消息和其相关属性的对象。Channel channel
:Channel
是RabbitMQ的一个关键组件,它提供了一个高效和可靠的方式来发送和接收消息。在生产者-消费者模型中,生产者通过Channel
将消息发送到RabbitMQ,消费者则从Channel
接收消息。Channel
对象可以创建多个,但是每个Channel
都应该有唯一的标识符。
启动SPringBoot项目
4.6.8 消费者消费结果
效果如下所示
5、RabbitMQ问题相关解决方案
5.1 生产端可靠性投递方案介绍
既然我们项目中用到的RabbitMQ,它有它的优点比如:解耦、异常、流量削峰,但是我们还需要考虑额外的东西,比如消息的可靠性。
什么是消息的可靠性,我们从两个方面来讲解,第一个方面是怎么保证生产者的可靠性投递,即确保生产端发出的消息能真真正正地投递到了队列,最后给消费者消费。第二个消费者怎么去做幂等性的保证,也就是说我们使用RabbitMQ发送消息时,会出现同一条消息会重复多发的情况。(即怎么保存消费者只消费一条消息,另外的重复发多的消息做丢弃)
5.1.1 关于怎么保证生产者的可靠性投递
可以从以下三点去做:
- 怎么去保证消息的成功发出。
- 去保证RabbitMQ成功接收到消息,即队列要成功接收到消息。
- 保证生产者能够接收到RabbitMQ的确认应答,即队列收到了消息需要应答给生产者,生产者就能够知道RabbitMQ是收到了消息,这条消息是发送成功。因为大家都知道RabbitMQ本质上就是生产者—>队列—>消费者,其中生产者只负责发送消息,然后队列只负责消息的中转,而消费者只负责消息的消费。
- 需要完善消息的补偿机制。
关于具体实现保证生产者的可靠性投递,市面上有两种主流的方案。
5.1.2 消息发送
方案一:消息落库,对消息状态进行标记
解析:
如上图所示,MSG_DB为消息数据库,BIZ_DB为业务数据库,
- 第一步就是将业务数据库入库,还有需要发送的消息入库到消息数据库中(入库后的消息其状态为投递中)。
- 第二步是producer生产者再将消息发送给RabbitMQ
- 第三步RabbitMQ会开启确认回调,producer生产者会监听来自RabbitMQ的确认回调。
- 第四步producer生产者监听到了回调表示消息发送成功了,生产者就会更新MSG_DB消息数据库中刚刚发送的消息的状态为投递成功,上图status 1表示成功,0表示投递中。
- 使用分布式定时任务get status:0表示获取状态为投递中的消息
- 分布式定义任务将获取状态为投递中的消息进行Retry Send重发(再执行第2、3、4步)。
- Retry count > 3表示重发次数超过3次,就更新当前消息的状态为2,同时停止重发。
方案二:消息延迟投递,做二次确认,回调检查
解析:
上图中BIZ DB为业务数据库,Upstream service上层业务(看成生产者发送消息),Downstream service下层业务(看成消费者接收消息)
- Upstream service(生产端)将业务数据入库到BIZ DB,再执行first Send即第一次发送消息到RabbitMQ中
- Step2: Second Send Delay Check第二次延迟发送消息(算上第一次,即生产端会向RabbitMQ发送两次消息)
- Step3: Listener Consume 消费者监听队列推送过来的消息然后进行消费。
- 消费者消费完后会生成确认消息发送给RabbitMQ中,即Downstream service不仅仅做消费者,它也可以做生产者去发送消息,将确认消息发送到队列中。
- Callback Service 回调服务,Listener Confirm监听队列推送过来的确认信息。监听到确认信息后,回调函数会将消息入库到消息数据库中。
- Check Detail 如果监听到的是延迟投递的第二次消息,回调函数就会到MSG_DB消息数据库里检查这个消息数据在数据库中是否存在(因为在第五步的时候把消息入库了消息数据库),如果数据库中不存在这个消息就说明消费者没有把确认消息发送给队列(即消费者消费失败了),这时Callback servcie回调服务就会执行RPC ReSend Command(RPC 重发命令)再重新从第一步开始执行。
这个方案相对于第一种方案的优点是:数据库操作减少了。
其流程为:
- 发送消息时,将当前消息数据库存入数据库,投递状态为消息投递中。
- 开启消息确认回调机制。确认成功,更新投递状态为消息投递成功。
开启定时任务,重新投递失败的消息。重试超过3次,更新投递状态为投递失败。
5.1.3 消息落库,对消息状态打标的具体实现
开启消息回调机制
1. 创建数据库
DROP DATABASE IF EXISTS `rabbit_msg_rk`;
CREATE DATABASE `rabbit_msg_rk`;
USE `rabbit_msg_rk`;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for message_log
-- ----------------------------
DROP TABLE IF EXISTS `message_log`;
CREATE TABLE `message_log` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`message_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`order_id` int(0) NULL DEFAULT NULL,
`try_count` int(0) NULL DEFAULT 0,
`status` int(0) NULL DEFAULT 0,
`create_time` datetime(3) NULL DEFAULT NULL,
`update_time` datetime(3) NULL DEFAULT NULL,
`try_time` datetime(3) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `order_id`(`order_id`) USING BTREE,
CONSTRAINT `message_log_ibfk_1` FOREIGN KEY (`order_id`) REFERENCES `orders` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for orders
-- ----------------------------
DROP TABLE IF EXISTS `orders`;
CREATE TABLE `orders` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`stock_id` int(0) NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
2. 创建Spring Boot项目
如下图所示创建Spring Boot项目
注意: 选择JDK1.8
的建议用3.0
以下的SpringBoot
版本,3.0
或以上的SpringBoot
版本建议使用JDK17
或更高版本
3. 添加依赖
在pom.xml文件中添加如下依赖
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
4. 添加配置
在yml文件中添加如下配置
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://服务器的主机名或IP地址:端口/数据库名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: mysql账号
password: mysql密码
rabbitmq:
host: 服务器的主机名或IP地址
port: 5672
username: rabbitMQ账号
password: rabbitMQ密码
# 虚拟主机名,默认为"/"
virtual-host: /
# 消息确认回调
# none:表示禁用发送方确认机制
# correlated:表示开启发送方确认机制
# simple:表示开启发送方确认机制,并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的调用。
publisher-confirm-type: correlated
# 消息失败回调
publisher-returns: true
# 模版配置
template:
retry:
# 发布重试,默认false
enabled: true
# 重试时间 默认1000ms
initial-interval: 10000ms
# 重试最大间隔时间
max-interval: 300000ms
# 重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 2
exchange: topic.exchange
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
# 消费者调用程序线程的最小数量
concurrency: 10
# 消费者最大数量
max-concurrency: 10
# 限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
# 启动时是否默认启动容器
auto-startup: true
# 被拒绝时重新进入队列
default-requeue-rejected: true
5. 创建实体类
在com.example.rabbitmqmsgrk.model
包里面创建下面两个类
订单类:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("orders")
public class Order implements Serializable {
/**
* 订单id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 库存id
*/
@TableField("stock_id")
private Integer stockId;
}
消息类:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("message_log")
public class MassageLog implements Serializable {
/**
* 消息uid
*/
@TableField("message_id")
private String messageId;
/**
* 订单id
*/
@TableField("order_id")
private Integer orderId;
/**
* 重试时间
*/
@TableField("try_time")
private LocalDateTime tryTime;
/**
* 重试次数,阈值:3
*/
@TableField("try_count")
private Integer tryCount;
/**
* 消息状态,0:未发送成功、1:发送成功、2:失败消息
*/
private Integer status;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("update_time")
private LocalDateTime updateTime;
}
6. 创建mapper
在com.example.rabbitmqmsgrk.mapper
包下面分别创建MessageLogMapper
、OrderMapper
两个接口
// MessageLog接口
@Mapper
public interface MessageLogMapper extends BaseMapper<MessageLog> {
}
// Order 接口
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
在resources
下面创建mapper
包,然后在包下面创建MessageLogMapper
、OrderMapper
这两个接口的接口映射文件
MessageLogMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.rabbitmqmsgrk.mapper.MessageLogMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.example.rabbitmqmsgrk.model.MessageLog">
<id column="message_id" property="messageId" />
<result column="order_id" property="orderId" />
<result column="try_count" property="tryCount" />
<result column="try_time" property="tryTime" />
<result column="status" property="status" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
message_id,order_id,try_count,try_time,status,create_time,update_time
</sql>
</mapper>
OrderMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.rabbitmqmsgrk.mapper.OrderMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.example.rabbitmqmsgrk.model.Order">
<id column="id" property="id" />
<result column="stock_id" property="stockId" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,stock_id
</sql>
</mapper>
7. 创建Controller
在com.example.rabbitmqmsgrk.web
包下面创建OrderController
类
@RestController
public class OrderController {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/msgTest")
public String msgTest(Integer orderId,Integer stockId){
Order order = new Order();
order.setId(orderId);
order.setStockId(stockId);
//数据库记录发送的消息
String msgId= UUID.randomUUID().toString();
MessageLog messageLog = new MessageLog();
messageLog.setMessageId(msgId);
messageLog.setStatus(0);
messageLog.setOrderId(orderId);
messageLog.setTryCount(0);
messageLog.setTryTime(LocalDateTime.now().plusMinutes(1));
messageLog.setCreateTime(LocalDateTime.now());
messageLogMapper.insert(messageLog);
/**
* 发送消息
* @param exchange 为交换机名字
* @param routingKey 为路由键
* @param object 为需要发送消息的内容
* @param correlationData 为本次消息的ID
*/
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(msgId));
return "成功";
}
}
8. 创建配置类
在com.example.rabbitmqmsgrk.config
包中创建RabbitMQConfig
类
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER= LoggerFactory.getLogger(RabbitMQConfig.class);
// 注入缓存连接工厂依赖对象
@Resource
private CachingConnectionFactory cachingConnectionFactory;
@Resource
private MessageLogMapper messageLogMapper;
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(cachingConnectionFactory);
/**
* 消息确认回调,确认消息是否会到达broker
* data:消息的唯一标识
* ack:确认结果
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((data,ack,cause)->{
String msgId = data.getId();
if(ack){
LOGGER.info("{}==============>消息发送成功",msgId);
//在生产者发送消息的时候会把消息入库到MSG_DB消息数据库中,此消息的状态status值为0,表示消息投递中。当消费者监听到消息后,这里的setConfirmCallback()方法中实现消息确认回调,更新status的值为1表示投递成功。
messageLogMapper.update(new MessageLog(),new UpdateWrapper<MessageLog>().set("status",1).eq("message_id",msgId));
}else {
LOGGER.error("{}=============>消息发送失败",msgId);
}
});
/**
* 消息失败回调
* msg:消息主题
* repCode:响应码
* repText:响应内容
* exchange:交换机
* routingKey:路由键
*/
rabbitTemplate.setReturnCallback((msg,reCode,repText,exchange,routingKey)->{
LOGGER.error("{}==============>消息发送失败 ",msg.getBody());
});
return rabbitTemplate;
}
// 创建名为msg.queue的队列
@Bean
public Queue queue(){
return new Queue("msg.queue");
}
//声明交换机(构建direct类型的交换机, 交换机名为msg.exchange)
@Bean
public DirectExchange directExchange(){
return new DirectExchange("msg.exchange");
}
// 通过msg.routing.key将队列绑定到交换机
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange ()).with("msg.routing.key");
}
}
注意: 需要再ym
l中spring.rabbitmq
进行如下设置
# 消息确认回调
publisher-confirm-type: correlated
# 消息失败回调
publisher-returns: true
9. 进行任务调度
在com.example.rabbitmqmsgrk.schedule
包中创建MsgSchedule
类,进行定时任务
@Component
public class MsgSchedule {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private OrderMapper orderMapper;
@Resource
private RabbitTemplate rabbitTemplate;
// 定时任务,使用cron表达式实现每隔10秒执行一次下面的msgTask()方法
@Scheduled(cron = "0/10 * * * * ?")
public void msgTask(){
// 查询消息状态为0即正在投递中的,并且tryTime重试时间小于当前时间。
List<MessageLog> list = messageLogMapper.selectList(new QueryWrapper<MessageLog>().eq("status", 0).lt("try_time", LocalDateTime.now()));
list.forEach(messageLog -> {
//判断是否尝试次数到3,代表发送失败,修改当前消息的status为2
if(messageLog.getTryCount()>=3){
messageLogMapper.update(new MessageLog(),new UpdateWrapper<MessageLog>().set("status",2).eq("message_id",messageLog.getMessageId()));
}
//没到3,继续发送,并且修改状态
messageLogMapper.update(new MessageLog(),new UpdateWrapper<MessageLog>().set("try_count",messageLog.getTryCount() + 1)
.set("update_time",LocalDateTime.now()).set("try_time", LocalDateTime.now().plusMinutes(1))
.eq("message_id",messageLog.getMessageId()));
Order order = orderMapper.selectById(messageLog.getOrderId());
//重新发送消息
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(messageLog.getMessageId()));
});
}
}
注意:在启动类上面添加@EnableScheduling
注解
- 需要再
orders
表中添加以下数据
-
访问:http://localhost:8080/msgTest?orderId=1&stockId=1
请求接口成功
Cron表达式学习:https://blog.csdn.net/ITKidKid/article/details/126386738
10.最终结果
1、发送消息成功
2、模拟生产者消息首次发送失败
我们将controller
中msgTest
方法里面的converAndSend
中交换机的名字改掉,改成 “msg.exchange.test”,如此生产者发送消息肯定失败,这个消息的状态值为0,后面记得改回来
访问: http://localhost:8080/msgTest?orderId=1&stockId=1
消息发送失败
定时任务去查询消息状态为投递中的消息进行重发
消息重发成功
再查看数据库message_log表,可以看到这个消息的try_count值为1,表示它进行过一次重试,最后status值由0被修改成了1
3、模拟消息首次发送失败,定时任务重试也失败
将定时任务中的converAndSend
中交换机的名字也改掉,改成 “msg.exchange.test”,如此来模拟生产者首次发送的消息失败,定时任务重试也失败。后面记得改回来
访问: http://localhost:8080/msgTest?orderId=1&stockId=1
访问成功立刻查看数据库message_log表,可以看到这个消息的try_count值为0,status值也是0,重试次数为0,即状态是投递中
定时任务MsgSchedule类中的定时任务每隔10秒就会去重试下,当重试次数超过3次,就直接将当前消息的状态值修改为2,再次查看message_log表的数据。
注:等待时间大约3分钟,即3分钟后当前消息的status状态才会变成2
最后不要忘记更正controller中以及MsgSchedule类中交换机的名字。
11.在高并发的场景下是否合适?
第一种方案对数据有两次入库,一次业务数据入库,一次消息入库。这样对数据的入库是一个瓶颈。
其实我们只需要对业务进行入库。
5.2 RabbitMQ 如何避免消息重复消费?
5.2.1 幂等性
消息的幂等性是指一次消息传递可能会发生多次,但最终业务状态只会改变一次。换句话说,即使多次收到了同一消息,也不会导致重复的业务处理。幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
保证消息的幂等性在开发中是很重要的,例如在客户点击付款的情况下,如果点击了多次,系统也只能扣一次费。此外,实现幂等性操作可以免去因重试等造成系统产生的未知问题。
然而,消息队列如RabbitMQ、RocketMQ、kafka等,都可能出现消息的重复发送,这个是消息队列无法保障的。在这种情况下,我们需要开发人员去保证消息的幂等性。实际上,消息队列没法帮你做到消费端的幂等性,消费端的幂等性得基于业务场景进行实现。但是,至少得保证消息不能丢,且至少被消费一次。
5.2.2 高并发的情况下如何避免消息重复消费
- 唯一id+加指纹码,利用数据库主键去重。优点:实现简单缺点:高并发下有数据写入瓶颈。
- 利用Redis的原子性来实习。使用Redis进行幂等是需要考虑的问题是否进行数据库落库,落库后数据和缓存如何做到保证幂等(Redis和数据库如何同时成功同时失败)?如果不进行落库,都放在Redis中如何这是Redis和数据库的同步策略?还有放在缓存中就能百分之百的成功吗?
5.2.3 解决重复消费的案例代码
1. 添加依赖
在pom.xml
中添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 添加配置
在application.yml
中添加如下配置,记住,一定要设置手动ACK,不然会报错
acknowledge-mode: manual
spring:
# redis配置
redis:
# 超时时间
timeout: 10000ms
# 服务器地址
host: 服务器地址
# 服务器端口
port: 6379
database: 0
lettuce:
pool:
# 连接池最大连接数 默认8 ,负数表示没有限制
max-active: 1024
# 最大连接阻塞等待时间,默认-1
max-wait: 10000ms
# 最大空闲连接
max-idle: 200
# 最小空闲连接
min-idle: 5
password: redis密码
# rabbitmq配置
rabbitmq:
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
3. 生产者代码
在com.example.rabbitmqmsgrk.web
包中的OrderController
类中创建repetition
方法用于发送消息
@GetMapping("/repetition")
public String repetition(){
// 给消息封装一个唯一id对象
String msgId= UUID.randomUUID().toString();
/**
* 发送消息
* @param exchange 为交换机名字
* @param routingKey 为路由键
* @param object 为需要发送消息的内容
* @param correlationData 为本次消息的ID
*/
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", "消息重复消费问题处理", new CorrelationData(msgId));
return "成功";
}
4. 消费者代码
在com.example.rabbitmqmsgrk.config
中创建ReceiveHandler
消费者接收类,用于进行消息消费
@Component
public class ReceiveHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;
// 监听队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "msg.queue", durable = "true"),
exchange = @Exchange(
value = "msg.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"msg.routing.*", "msg.#"}))
public void repetition(String msg, Channel channel, Message message) throws IOException {
// 1. 消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 2. 获取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
// 3. 设置key到Redis
if (stringRedisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
// 4. 消费消息
System.out.println("接收到消息:" + msg);
// 5. 设置key的value为1
stringRedisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);
// 6. 手动ack
channel.basicAck(deliveryTag, false);
} else {
// 4. 获取Redis中的value即可 如果是1,手动ack
if ("1".equalsIgnoreCase(stringRedisTemplate.opsForValue().get(messageId))) {
System.out.println("消息:" + messageId + "已消费");
// 5. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
}
5. 最终结果
1、消息消费成功
访问:http://localhost:8080/repetition
查看控制台和redis,可以看到redis中的值变成1了,表示消息成功消费了
2、模拟消息重复消费场景
在OrderController
类中repetition`方法添加for循环模拟消息重复消费场景,连续发送3次重复消息
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend("msg.exchange", "msg.routing.key", "消息重复消费问题处理", new CorrelationData(msgId));
}
访问:http://localhost:8080/repetition
查看控制台,可以看到消息重复进行了消费
5.3 RabbitMQ 如何避免消息积压?
5.3.1 解决方案
- 优化业务流程:检查并优化业务逻辑,确保消费者能够及时处理消息。这可能需要对业务逻辑进行重新设计,或者增加更多的消费者来提高处理速度。
- 增加消费者数量:通过增加消费者的数量,可以并行处理更多的消息,从而减轻单个消费者的负担。
- 调整RabbitMQ的参数:根据实际情况,调整RabbitMQ的参数,如消息的生存时间(TTL)、最大队列长度等。例如,可以设置消息的生存时间为较短的时间,以便消息能够在队列中保留的时间更短,从而减少队列的压力。
- 使用死信队列:在RabbitMQ中,可以使用死信队列来处理无法正常处理的消息。当消息在队列中过期或者被拒绝时,可以将其发送到死信队列中,以便进行后续处理。
- 监控和告警:建立监控系统,实时监控RabbitMQ的运行状态和队列情况。当出现消息积压时,及时发出告警通知,以便能够及时采取措施解决问题。
- 临时扩容:如果以上措施无法满足需求,可以考虑临时扩容,增加更多的消费者和资源来处理积压的消息。但这只是应急措施,需要在后续对系统进行进一步的优化和改进。
更多命令
一、卸载rabbitmq相关的
1、卸载前先停掉rabbitmq服务,执行命令
service rabbitmq-server stop
2、查看rabbitmq安装的相关列表
yum list | grep rabbitmq
3、卸载rabbitmq已安装的相关内容
yum -y remove rabbitmq-server.noarch
二、卸载erlang
1、查看erlang安装的相关列表
yum list | grep erlang
2、卸载erlang已安装的相关内容
yum -y remove erlang-*
yum remove erlang.x86_64
启动服务:rabbitmq-server -detached # 以后台守护进程方式启动
查看状态:rabbitmqctl status
关闭服务:rabbitmqctl stop
列出角色:rabbitmqctl list_users
rabbitmqctl list_permissions # 查看(指定vhostpath)所有用户的权限
rabbitmqctl list_permissions -p / # 查看virtual host为/的所有用户权限
rabbitmqctl list_user_permissions developer # 查看指定用户的权限
rabbitmqctl clear_permissions developer # 清除用户权限
rabbitmqctl delete_user guest # 删除用户
rabbitmqctl change_password developer dev123456 # 修改密码
感谢博主:
https://blog.csdn.net/Bejpse/article/details/126424250
https://blog.csdn.net/hengheng__/article/details/123390048