一、RabbitMQ的六种模式
- RabbitMQ共有六种工作模式:
- 简单模式(Simple)
- 工作队列模式(Work Queue)
- 发布订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 通配符模式(Topics)
- 远程调用模式(RPC,不常用,不对此模式进行讲解)
1、RabbitMQ的简单模式
- rabbitmq简单模式的特点:
-
- 一个生产者对应一个消费者,通过队列进行消息传递。
-
- 该模式使用direct交换机,
direct交换机是RabbitMQ默认交换机
。
- 该模式使用direct交换机,
-
JMS
- 由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
- JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
- JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ等产品。
- RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。
java操作rabbitmq的简单模式
- 操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq
- 注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依赖
- 依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>
</dependencies>
第二步:生产者和消费者代码的编写
- 创建生产者(producer)代码
package com.knife.demo01.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建队列,如果队列已存在,则使用该队列
/**
* 参数1:队列名
* 参数2:是否持久化,true表示MQ重启后队列还在。
* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数4:是否自动删除,true表示不再使用队列时自动删除队列
* 参数5:其他额外参数
*/
channel.queueDeclare("simple_queue",false,false,false,null);
// 5.发送消息
String message = "hello!rabbitmq!";
/**
* 参数1:交换机名,""表示默认交换机direct
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
channel.basicPublish("","simple_queue",null,message.getBytes());
// 6.关闭信道和连接
channel.close();
connection.close();
System.out.println("===发送成功===");
}
}
运行生产者代码结果:
- 创建消费者(consumer)代码
package com.knife.demo01.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
connectionFactory.setPort(5672); // 端口号
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
/**
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
*/
channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 接收消息
String message = new String(body, "UTF-8");
System.out.println("接受消息,消息为:"+message);
}
});
}
}
运行消费者代码结果:
2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)
-
工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct交换机(默认使用的交换机),应用于处理消息较多的情况。特点如下:
-
- 一个队列对应多个消费者。
-
- 一条消息只会被一个消费者消费。
-
- 消息队列默认采用轮询的方式将消息平均发送给消费者。
-
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
-
-
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
java操作rabbitmq的工作队列模式
- Work Queues 的入门程序与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
- 在简单模式的基础下编写代码(直接用同一个项目)
- 消息生产者代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建工厂连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130"); //虚拟机地址
cf.setPort(5672); //rabbitmq的端口号
cf.setUsername("admin"); // 用户名
cf.setPassword("admin"); // 密码
cf.setVirtualHost("/");
// 创建连接
Connection connection = cf.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 创建队列
/**
* 参数1:队列名称
* 参数二:是否持久化
* 参数三:是否私有化
* 参数4:队列使用完毕后是否自动删除
*/
channel.queueDeclare("work_queue",true,false,false,null);
// 发送消息
/**
* 参数1:交换机名,""表示默认交换机direct
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
for (int i = 1; i <= 10; i++) {
channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
("你好,这是今天的第"+i+"条消息").getBytes());
}
// 关闭资源
channel.close();
connection.close();
System.out.println("消息发送成功====");
}
}
- 消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
// 4. 接收消息
channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("消费者1 = " + msg);
}
});
channel.close();
conn.close();
}
}
代码运行的效果和上面简单模式的类似。
3、RabbitMQ的发布订阅模式(相比工作队列模式,交换机使用的类型是Fanout(广播类型))
- 图解说明
- P:生产者,向 Exchange 发送消息
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与交换机绑定的队列
- C1、C2:消费者,其所在队列要与交换机进行绑定
- 在开发过程中,有一些消息
需要不同的消费者
进行不同的处理- 如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
- 在订阅模型中,多了一个
Exchange
角色,而且过程略有变化:- Producer:生产者,也就是要发送消息的程序,但是
不再发送到队列中,而是发给X(交换机)
- Consumer:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播类型,将消息交给所有
绑定到交换机的队列
- Direct:定向类型,把消息交给
符合指定routing key 的队列
- Topic:通配符类型,把消息交给
符合routing pattern(路由模式) 的队列
,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
- Fanout:广播类型,将消息交给所有
- Producer:生产者,也就是要发送消息的程序,但是
- 特点:
-
- 生产者(Producer)将消息发送给交换机,交换机
将消息转发到绑定此交换机的每个队列中
(即可以转发到多个队列中)。
- 生产者(Producer)将消息发送给交换机,交换机
-
- 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给
多个队列
。发布订阅模式使用fanout交换机
。
- 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给
-
java操作rabbitmq的发布订阅模式
- 在简单模式的基础下编写代码(直接用同一个项目)
其实代码还是差不多的,只是使用到的交换机不一样了而已。
- Producer
public class Producer {
// 生产者
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.70.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
// BuiltinExchangeType.FANOUT:表示广播模式
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);
// 5.创建队列
// 队列1
channel.queueDeclare("SEND_MAIL", true, false, false, null);
// 队列2
channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
// 队列3
channel.queueDeclare("SEND_STATION", true, false, false, null);
// 6.交换机绑定队列
/**
* 参数1:队列的名称
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL", "exchange_fanout", "");
channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");
channel.queueBind("SEND_STATION", "exchange_fanout", "");
// 7.发送消息
channel.basicPublish("exchange_fanout", "", null,
("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
// for (int i = 1; i <= 10; i++) {
// channel.basicPublish("exchange_fanout", "", null,
// ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
// }
// 8.关闭资源
channel.close();
connection.close();
}
}
- ConsumerMail(消费者代码都类似)
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送邮件消息 = " + msg);
}
});
}
}
- ConsumerMesage
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送短信消息 = " + msg);
}
});
}
}
- ConsumerStation
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送站內信 = " + msg);
}
});
}
}
4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)
- 使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;
而一些小的促销活动为了节约成本,只发布到站内信队列
。此时需要使用路由模式(Routing)
完成这一需求。
- 图解说明
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
- 队列与交换机的绑定,不能是任意绑定了,而是
要指定一个 RoutingKey(路由key)
,消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。
java操作rabbitmq的路由模式
- 在简单模式的基础下编写代码(直接用同一个项目)
其实代码还是差不多的,只是使用到的交换机不一样了,多了一个route Key。
- Producer
public class Producer {
// 生产者
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.70.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
// BuiltinExchangeType.DIRECT
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);
// 5.创建队列
// 队列1
channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
// 队列2
channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
// 队列3
channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);
// 6.交换机绑定队列
/**
* 参数1:队列的名称
* 参数2:交换机名
* 参数3:路由关键字(路由名称)
*/
channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");
channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");
channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");
channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");
// 7.发送消息
channel.basicPublish("routing_exchange", "import", null,
("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
channel.basicPublish("routing_exchange", "normal", null,
("normal路由的息").getBytes(StandardCharsets.UTF_8));
// for (int i = 1; i <= 10; i++) {
// channel.basicPublish("exchange_fanout", "", null,
// ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
// }
// 8.关闭资源
channel.close();
connection.close();
}
}
- ConsumerMail(消费者代码都类似)
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送邮件消息 = " + msg);
}
});
}
}
- ConsumerMesage
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送短信消息 = " + msg);
}
});
}
}
- ConsumerStation
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.70.130");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送站內信 = " + msg);
}
});
}
}
5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)
- 通配符模式(Topic)是在路由模式的基础上,
给队列绑定带通配符的路由关键字
,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机
。 - 通配符规则:
-
- 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
-
- 队列设置RoutingKey时,
#
可以匹配任意多个单词,*
可以匹配任意一个单词。
- 队列设置RoutingKey时,
-
- 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
- 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
java操作rabbitmq的通配符模式
- 在简单模式的基础下编写代码(直接用同一个项目)
其实代码还是差不多的,只是route Key的组成多了通配符。
- Producer
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.126.10");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.创建交换机
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);
/**
* 参数1:队列名
* 参数2:是否持久化,true表示MQ重启后队列还在。
* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数4:是否自动删除,true表示不再使用队列时自动删除队列
* 参数5:其他额外参数
*/
//5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列
channel.queueDeclare("SEND_MAIL3", true, false, false, null);
channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);
channel.queueDeclare("SEND_STATION3", true, false, false, null);
//6.将队列和交换机绑定
/**
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");
channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");
channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");
//8.发送消息
channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));
//9.关闭资源
channel.close();
conn.close();
System.out.println("发送消息成功");
}
}
- ConsumerMail(消费者代码都类似)
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.126.10");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送邮件消息 = " + msg);
}
});
}
}
- ConsumerMesage
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.126.10");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送短信消息 = " + msg);
}
});
}
}
- ConsumerStation
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.126.10");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("admin");
cf.setVirtualHost("/");
//2.创建连接
Connection conn = cf.newConnection();
//3.创建信道
Channel channel = conn.createChannel();
//4.监听队列
channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("发送站內信 = " + msg);
}
});
}
}
RabbitMQ一、RabbitMQ的介绍与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)