目录
1.MQ的相关概念
1.1 什么是MQ消息中间件
1.2 为什么使用MQ
(1) 应用解耦
(2) 异步提速
(3)削峰填谷
1.3 使用MQ的劣势
1.4 常见的MQ组件
2. RabbitMQ的概述
2.1 RabbitMQ的概念
2.2 RabbitMQ的原理
2.3 安装RabbitMQ
3. RabbitMQ 的工作模式
3.1 simple (简单模式)
3.2 Work queues(工作模式)
3.3.Publish/Subscribe(发布订阅模式)
3.4.Routing(路由模式)
3.5.Topics(主题模式)
4.springboot整合RabbitMQ
4.1.生产方
4.2.消费方
4.3.通过代码创建交换机和队列
1.MQ的相关概念
1.1 什么是MQ消息中间件
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。它是应用程序和应用程序之间的通信方法。
1.2 为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
MQ总结为三个好处:
(1) 应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内容被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。
(2) 异步提速
上面要完成下单需要花费的时间: 20 + 300 + 300 + 300 = 920ms 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ可以解决上述问题
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。 提升用户体验和系统吞吐量(单位时间内处理请求的数目)
(3)削峰填谷
举个例子,如果订单系统最多能处理一千次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两千次下单操作系统是处理不了的,只能限制订单超过一千后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。 简单来说: 就是在访问量剧增的情况下,但是应用仍然不能停,比如“双十一”下单的人多,但是淘宝这个应用仍然要运行,所以就可以使用消息中间件采用队列的形式减少突然访问的压力
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性
1.3 使用MQ的劣势
- 系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
- 系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
- 一致性问题 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
1.4 常见的MQ组件
目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征
2. RabbitMQ的概述
2.1 RabbitMQ的概念
- 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
- RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message
- RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
2.2 RabbitMQ的原理
名词解释:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销.
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
2.3 安装RabbitMQ
安装详情------>虚拟机安装RabbitMQ
3. RabbitMQ 的工作模式
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ
3.1 simple (简单模式)
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
演示:
创建工程
加依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> </dependencies>
生产者:
package com.wqg.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:简单模式-生产者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class SimpleProducer { public static void main(String[] args) throws Exception { //创建连接工厂对象并设置连接信息 -----获取连接对象(指定rabbitMQ服务端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服务端的地址 默认localhost connectionFactory.setHost("192.168.75.129"); //设置端口号 connectionFactory.setPort(5672); //设置账号 默认guest connectionFactory.setUsername("guest"); //设置密码 默认guest connectionFactory.setPassword("guest"); //设置虚拟主机 默认/ connectionFactory.setVirtualHost("/"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取Channel信道对象 Channel channel = connection.createChannel(); //创建一个队列对象 /** * String queue, 队列的名称. 如果该名称不存在 则创建 如果存在则不创建 * boolean durable, 该对象是否持久化 当rabbitmq重启后 队列就会消失 * boolean exclusive, 该队列是否被一个消费者独占 * boolean autoDelete,当没有消费者时,该队列是否被自动删除 * Map<String, Object> arguments: 额外参数的设置 * */ channel.queueDeclare("simple-queue",true,false,false,null); //发送信息 /** * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机 * String routingKey, 路由标识 如果是简单模式起名为队列的名称 * BasicProperties props, 消息的属性设置。 设置为null * byte[] body: 消息的内容 */ String msg = "Hello RabbitMQ~~~"; channel.basicPublish("","simple-queue",null,msg.getBytes()); //关闭资源 channel.close(); connectionFactory.clone(); } }
消费者:
3.2 Work queues(工作模式)
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
生产者:
package com.wqg.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:工作模式-生产者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class WorkProducer { public static void main(String[] args) throws Exception { //创建连接工厂对象并设置连接信息 -----获取连接对象(指定rabbitMQ服务端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服务端的地址 默认localhost connectionFactory.setHost("192.168.75.129"); //设置端口号 connectionFactory.setPort(5672); //设置账号 默认guest connectionFactory.setUsername("guest"); //设置密码 默认guest connectionFactory.setPassword("guest"); //设置虚拟主机 默认/ connectionFactory.setVirtualHost("/"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取Channel信道对象 Channel channel = connection.createChannel(); //创建一个队列对象 /** * String queue, 队列的名称. 如果该名称不存在 则创建 如果存在则不创建 * boolean durable, 该对象是否持久化 当rabbitmq重启后 队列就会消失 * boolean exclusive, 该队列是否被一个消费者独占 * boolean autoDelete,当没有消费者时,该队列是否被自动删除 * Map<String, Object> arguments: 额外参数的设置 * */ channel.queueDeclare("Work-Queue", true, false, false, null); //发送信息 /** * String exchange, 交换机的名称 简单模式没有交换机使用""表示采用默认交换机 * String routingKey, 路由标识 如果是简单模式起名为队列的名称 * BasicProperties props, 消息的属性设置。 设置为null * byte[] body: 消息的内容 */ for (int i=0; i<10; i++){ String msg = "Hello RabbitMQ~~~工作模式"; channel.basicPublish("", "Work-Queue", null, msg.getBytes()); } //关闭资源 channel.close(); connectionFactory.clone(); } }
消费者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:工作模式-消费者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class WorkConsumer01 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受队列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消费者的标签 * @param envelope : 设置 拿到你的交换机 路由key等信息 * @param properties: 消息的属性对象 * @param body: 消息的内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的内容===" + new String(body)); } }; /** * String queue, 队列名 * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。 * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调 */ channel.basicConsume("Work-Queue", true, consumer); } }
总结: 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
例如:短信服务部署多个,只需要有一个节点成功发送即可。
3.3.Publish/Subscribe(发布订阅模式)
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者:
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:发布订阅模式-生产者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class PublishProducer { public static void main(String[] args) throws Exception { //创建连接工厂对象并设置连接信息 -----获取连接对象(指定rabbitMQ服务端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服务端的地址 默认localhost connectionFactory.setHost("192.168.75.129"); //设置端口号 connectionFactory.setPort(5672); //设置账号 默认guest connectionFactory.setUsername("guest"); //设置密码 默认guest connectionFactory.setPassword("guest"); //设置虚拟主机 默认/ connectionFactory.setVirtualHost("/"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取Channel信道对象 Channel channel = connection.createChannel(); //创建交换机 /** * String exchange, 交换机的名称 如果不存在则创建 存在则不创建 * BuiltinExchangeType type, 交换机的类型 * boolean durable: 是否持久化。 */ channel.exchangeDeclare("Publish-exchange", BuiltinExchangeType.FANOUT,true); //创建队列 channel.queueDeclare("Publish-Queue01", true, false, false, null); channel.queueDeclare("Publish-Queue02", true, false, false, null); //队列和交换机绑定 /** * String queue, * String exchange, * String routingKey: 发布订阅模式 没有routingKey 则写为"" */ channel.queueBind("Publish-Queue01","Publish-exchange",""); channel.queueBind("Publish-Queue02","Publish-exchange",""); //发送信息 String msg = "Hello RabbitMQ~~~发布订阅模式"; channel.basicPublish("Publish-exchange","", null, msg.getBytes()); //关闭资源 channel.close(); connectionFactory.clone(); } }
消费者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:发布订阅模式-消费者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class PublishConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受队列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消费者的标签 * @param envelope : 设置 拿到你的交换机 路由key等信息 * @param properties: 消息的属性对象 * @param body: 消息的内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的内容===" + new String(body)); } }; /** * String queue, 队列名 * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。 * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调 */ //Publish-Queue02和Publish-Queue01只有一个里面有数据 channel.basicConsume("Publish-Queue02", true,consumer); } }
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机
3.4.Routing(路由模式)
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
生产者:
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:路由模式-生产者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class RouterProducer { public static void main(String[] args) throws Exception { //创建连接工厂对象并设置连接信息 -----获取连接对象(指定rabbitMQ服务端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服务端的地址 默认localhost connectionFactory.setHost("192.168.75.129"); //设置端口号 connectionFactory.setPort(5672); //设置账号 默认guest connectionFactory.setUsername("guest"); //设置密码 默认guest connectionFactory.setPassword("guest"); //设置虚拟主机 默认/ connectionFactory.setVirtualHost("/"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取Channel信道对象 Channel channel = connection.createChannel(); //创建交换机 channel.exchangeDeclare("Router-exchange", BuiltinExchangeType.DIRECT,true); //创建队列 channel.queueDeclare("Router-queue001",true,false,false,null); channel.queueDeclare("Router-queue002",true,false,false,null); //队列和交换机绑定 channel.queueBind("Router-queue001","Router-exchange","error"); channel.queueBind("Router-queue002","Router-exchange","error"); channel.queueBind("Router-queue002","Router-exchange","info"); channel.queueBind("Router-queue002","Router-exchange","warning"); String msg = "Hello RabbitMQ~~~路由模式"; channel.basicPublish("Router-exchange","info",null,msg.getBytes()); channel.close(); connectionFactory.clone(); } }
消费者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:路由模式-消费者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class RouterConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受队列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消费者的标签 * @param envelope : 设置 拿到你的交换机 路由key等信息 * @param properties: 消息的属性对象 * @param body: 消息的内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的内容===" + new String(body)); } }; /** * String queue, 队列名 * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。 * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调 */ channel.basicConsume("Router-queue002", true,consumer); } }
总结:
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
3.5.Topics(主题模式)
- Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.” 分割,例如: item.insert
- 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词, 例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
生产者:
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:主题模式-生产者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class TopicsProducer { public static void main(String[] args) throws Exception { //创建连接工厂对象并设置连接信息 -----获取连接对象(指定rabbitMQ服务端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服务端的地址 默认localhost connectionFactory.setHost("192.168.75.129"); //设置端口号 connectionFactory.setPort(5672); //设置账号 默认guest connectionFactory.setUsername("guest"); //设置密码 默认guest connectionFactory.setPassword("guest"); //设置虚拟主机 默认/ connectionFactory.setVirtualHost("/"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取Channel信道对象 Channel channel = connection.createChannel(); //创建交换机 channel.exchangeDeclare("Topics-exchange", BuiltinExchangeType.TOPIC,true); //创建队列 channel.queueDeclare("Topics-queue001",true,false,false,null); channel.queueDeclare("Topics-queue002",true,false,false,null); //队列和交换机绑定 channel.queueBind("Topics-queue001","Topics-exchange","*.orange.*"); channel.queueBind("Topics-queue002","Topics-exchange","*.*.rabbit"); channel.queueBind("Topics-queue002","Topics-exchange","lazy.#"); String msg = "Hello RabbitMQ~~~主题模式"; channel.basicPublish("Topics-exchange","lazy.rabbit.orange",null,msg.getBytes()); channel.close(); connectionFactory.clone(); } }
消费者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:路由模式-消费者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class TopicsConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受队列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消费者的标签 * @param envelope : 设置 拿到你的交换机 路由key等信息 * @param properties: 消息的属性对象 * @param body: 消息的内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的内容===" + new String(body)); } }; /** * String queue, 队列名 * boolean autoAck,是否自动确认。 当rabbitmq把消息发送给消费后,消费端自动确认消息。 * Consumer callback:回调。 当rabbitmq队列中存在消息 则触发该回调 */ channel.basicConsume("Topics-queue002", true,consumer); } }
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,
只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
4.springboot整合RabbitMQ
4.1.生产方
创建springboot项目----生产者(1) 引入 rabbitmq 整合的依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
(2) 再配置文件中添加 rabbit 服务的信息#rabbitmq的配置 spring.rabbitmq.host=192.168.75.129 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
(3) 调用 RabbitTemplate 中发送消息的方法@SpringBootTest class RabbitmqSpringbootApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("Topics-exchange","lazy.aaa","Hello RabbitMQ..."); } }
4.2.消费方
创建springboot项目----消费者
(1)引入rabbitmq整合的依赖------同上
(2)再配置文件中添加rabbit服务的信息------同上
(3) 创建类 --- 创建监听方法即可 @RabbitListener@Component public class MyListener { //queues:表示你监听的队列名 @RabbitListener(queues = {"Topics-queue002"}) public void h(Message message){ //把监听到的消息封装到Message类对象中 byte[] body = message.getBody(); System.out.println("消息内容==="+new String(body)); } }
4.3.通过代码创建交换机和队列
package com.wqg.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ fileName:RabbitConfig * @ description: * @ author:wqg * @ createTime:2023/7/12 19:41 */ @Configuration public class RabbitConfig { //定义了一个名为EXCHANGE_NAME的常量,用于表示交换器的名称 public static final String EXCHANGE_NAME = "Topics-queue002"; @Bean public Exchange exchange() { // 创建一个Topic类型的Exchange,名称为EXCHANGE_NAME,持久化 Exchange topic_exchange02 = ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); return topic_exchange02; } @Bean public Queue queue() { // 创建一个持久化的队列,名称为"Topics-queue003" Queue topic_queue03 = QueueBuilder.durable("Topics-queue003").build(); return topic_queue03; } @Bean public Binding binding() { // 创建一个绑定关系,将队列绑定到Exchange上,并指定routing key为"qy165.#",不使用任何参数 Binding noargs = BindingBuilder.bind(queue()).to(exchange()).with("qy165.#").noargs(); return noargs; } //如果交换机要绑定多个队列 需要再写一个bind方法 }