文章目录
- 1、RabbitMQ的概述
- 1.1、什么是消息队列?
- 1.2、为什么要使用消息队列?
- 1.3、RabbitMQ的特点:
- 2、RabbitMQ的安装
- 2.1 下载与安装
- 2.2 常用命令
- 3、RabbitMQ消息发送和接受
- 3.1 消息发送和接受机制
- 3.2 AMQP的消息路由
- 3.3 Exchange(交换机)的类型
- 3.4 Java发送和接收 Queue的消息
- 3.5 Java绑定Exchange 发送和接受消息
- Direct类型的交换机:
- Fanout类型的交换机:
- Topic类型的交换机:
- 3.6 消息的事务
- 3.6 消息的发送者确认模式
- 方式一:channel.waitForConfirms()普通发送方确认模式:
- 方式二:channel.waitForConfirmsOrDie() 批量确认模式:
- 方式三:channel.addConfirmListener() 异步监听发送确认模式:
- 3.7 消息的消费者确认模式
- 手动确认消息:
- 开启事务,必须提交事务,消息才会被确认:
- 消息的防重复确认:
- 4、SpringBoot继承RabbitMQ
- 发送者配置:分别对direct、fanout、topic类型的交换机做测试
- 接受者配置:
- 5、RabbitMQ集群
1、RabbitMQ的概述
1.1、什么是消息队列?
1.2、为什么要使用消息队列?
1.3、RabbitMQ的特点:
2、RabbitMQ的安装
2.1 下载与安装
…
2.2 常用命令
…
3、RabbitMQ消息发送和接受
3.1 消息发送和接受机制
3.2 AMQP的消息路由
3.3 Exchange(交换机)的类型
3.4 Java发送和接收 Queue的消息
导入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
发送消息模块:
/**
* 消息生产者,发送者
*/
public class SendMsg {
//队列的名称
final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置mq的连接信息
factory.setHost("192.168.65.128"); //主机ip
factory.setPort(5672); //端口,此端口是
factory.setUsername("root"); //账号
factory.setPassword("root"); //密码
factory.setVirtualHost("/mq"); //虚拟主机,没有设置可以不写
//定义连接
Connection conn = null;
//定义通道
Channel channel = null;
try {
conn = factory.newConnection(); //获取连接
channel = conn.createChannel(); //创建通道
/**
* 声明一个队列
* 参数1:队列的名称
* 参数2:是否为持久化的队列
* 参数3:是否为排外,如果是排外,则该队列只允许一个消费者监听
* 参数4:是否自动删除队列,如果队列中没有消息,也没有消费者连接时,就会删除该队列
* 参数5:为队列的一些其他属性设置,一般为null即可
* 注意事项:
* 1、队列存在,则不创建声明,队列不存在,则会创建一个新的队列
* 2、队列名可以是任意值,但接收消息必须保持和队列名一致,否则消息不知道发送到哪去了
* 3、下面这行代码可有可无,但是必须确保队列是存在的,否则消息不知道发送到哪去了
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送的消息
String message = "hello RabbitMQ, this is my first use1111111";
/**
* 发送消息到MQ
* 参数1:交换机的名称,为空字符串表示不使用交换机
* 参数2:为队列或者routingKey,当指定了交换机,这个值就是RoutingKey
* 参数3:为消息属性信息,一般为null即可
* 参数4:为具体的消息数据的字节数组
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
System.out.println("消息发送完毕......");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收消息模块:
/**
* 接收消息
*/
public class ReceiveMsg {
final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 接收消息
* 参数1:当前监听的队列名称,此名字必须与发送时的队列名称保持一致,不然接收不到消息
* 参数2:是否自动确认消息,true:会自动确认消息,并将消息从消息队列中移除
* 参数3:消息接收者的标签,用于当多个消费者同时监听一个队列时,用于区分不同的消费者,通常为空串即可
* 参数4:消息接收的回调函数,用于对接收的消息进行处理
* 注意:使用basicConsume方法后,会自动启动一个线程持续监听队列,如果队列中有消息会自动接收,所以不能关闭连接和通道对象
*/
channel.basicConsume(QUEUE_NAME, true, "",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf8");
System.out.println("接收的消息: => " + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.5 Java绑定Exchange 发送和接受消息
Direct类型的交换机:
发送消息:
/**
* 发送消息到direct类型的交换机,交换机与队列绑定通过routingKey,然后交换机把消息发送给队列
*/
public class DirectExchangeSendMsg {
//队列的名称
final static String QUEUE_NAME = "myDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "directExchange";
//消息的routingKey
final static String ROUTING_KEY = "directRoutingKey";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/**
* 声明一个交换机
* 参数1:交换机的名称
* 参数2:交换机的类型
* 参数3:是否为持久化交换机
* 注意:声明一个交换机,存在则不声明,不存在则声明,可有可无
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
/**
* 将队列绑定到交换机
* 参数1:队列名称
* 参数2:交换机名称
* 参数3:消息的routingKey,就是BindingKey,虽然参数名为routingKey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
/**
* 发送消息到指定的队列
* 参数1:交换机的名称
* 参数2:routingKey,如果这个routingKey与某个队列绑定的交换机的routingKey一致,则消息就会被发送到这个队列中
*/
String msg = "要发送的消息...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
System.out.println("消息发送完毕....");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收消息:
/**
* 消费者消费队列中的消息
*/
public class DirectExchangeReceiveMsg {
//队列的名称
final static String QUEUE_NAME = "myDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "directExchange";
//消息的routingKey
final static String ROUTING_KEY = "directRoutingKey";
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//接收消息
channel.basicConsume(QUEUE_NAME, true, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf8");
System.out.println("消费 ---> " + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Fanout类型的交换机:
发送消息:
/**
* 发送消息到fanout类型的交换机
*/
public class FanoutExchangeSendMsg {
//交换机的名称
final static String EXCHANGE_NAME = "fanOutExchange";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
//channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
* 由于是fanout类型的交换机,所以存在多个消费者,因此不建议在发送消息的时候创建队列,以及绑定交换机
* 建议在消费者中创建队列并绑定交换机,但必须在发送消息时确保队列存在,所以上面的代码不写
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
String msg = "要发送的消息...";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("utf8"));
System.out.println("消息发送完毕....");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收消息:
/**
* 消费者消费队列中的消息
*/
public class FanOutExchangeReceiveMsg {
//交换机的名称
final static String EXCHANGE_NAME = "fanOutExchange";
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
/**
* 声明队列,由于fanout类型的交换机类似于广播的模式,会有多个消费者来接收交换机中的数据,
* 所以创建一个随机的队列名称
*
* channel.queueDeclare():创建一个随机名称的队列,不是持久化,排外的,自动删除的
* getQueue():获取队列的名称
*/
String queueName = channel.queueDeclare().getQueue();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
//将队列绑定到交换机上,fanout类型的交换机没有routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "");
//接收消息
channel.basicConsume(queueName, true, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf8");
System.out.println("消费 ---> " + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Topic类型的交换机:
发送消息:
/**
* 发送消息到topic类型的交换机
*/
public class TopicExchangeSendMsg {
//交换机的名称
final static String EXCHANGE_NAME = "topicExchange";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
String msg = "要发送的消息...";
channel.basicPublish(EXCHANGE_NAME, "aa.bb.cc", null, msg.getBytes("utf8"));
System.out.println("消息发送完毕....");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接收消息:
/**
* 消费者消费队列中的消息
*/
public class TopicExchangeReceiveMsg01 {
//交换机的名称
final static String EXCHANGE_NAME = "topicExchange";
//队列的名称
final static String QUEUE_NAME = "topicQueue01";//取topicQueue01、topicQueue02、topicQueue03
//topic交换机的routingKey可以包含通配符, .分割单词 # 匹配0个或者多个单词 * 匹配恰好一个单词
final static String ROUTING_KEY = "aa"; //取aa、aa.*、aa.# 进行测试
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
//将队列绑定到交换机上,fanout类型的交换机没有routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//接收消息
channel.basicConsume(QUEUE_NAME, true, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf8");
System.out.println(QUEUE_NAME + "消费 ---> "+ROUTING_KEY+" --> " + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
总结:
Topic类型的交换机也是消息一对多的一种交换机类型,它和fanout都能实现一个消息同时发送给多个队列;
fanout更适用于使用在一个功能不同的进程来获取数据,例如,手机App中的消息推送,一个App可能会还有很多个用户来进行安装,然后他们都会启动一个随机的队列来接收者自己的数据;
Topic更适用于不同功能模块来接收同一个消息,例如商城下单成功后需要发送消息到队列中,例如:RoutingKey为 order.success, 物流系统监听订单 order.*
, 发票系统监听 order. *
Topic可以使用农随机的队列名也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中,建议是有个名明确的队列 并且要求持久化的队列。
3.6 消息的事务
事务消息与数据库的事务类似,只是MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的一种策略。
RabbitMQ有两种方式来解决这个问题:
- 通过AMQP提供的事务机制实现
- 使用发送者确认模式实现
事务的使用:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
- channel.txSelect()声明启动事务模式
- channel.txCommit()提交事务
- channel.txRollback()回滚事务
发送者
在声明事务后没有提交,队列中不会有消息存在,只有事务提交后,才会将内存中的消息写入队列。
消费者
在声明事务后,即使没有提交,也可以获取队列中的消息,并将消息从队列中移除。
回滚事务,就是将消息事务中的操作,进行撤销到事务开始前的状态。
发送者:
/**
* 消息生产者,发送者
*/
public class TransactionSendMsg {
final static String QUEUE_NAME = "myTransactionQueue";
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置mq的连接信息
factory.setHost("192.168.65.128"); //主机ip
factory.setPort(5672); //端口
factory.setUsername("root"); //账号
factory.setPassword("root"); //密码
factory.setVirtualHost("/mq"); //虚拟主机
//定义连接
Connection conn = null;
//定义通道
Channel channel = null;
try {
conn = factory.newConnection(); //获取连接
channel = conn.createChannel(); //创建通道
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//要发送的消息
String message = "hello RabbitMQ, this is my first use1111111";
//声明启动事务模式,只声明启动,如果没有提交事务,则事务中要发送的消息不会写入到队列中
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
//int i = 10/0;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf8"));
channel.txCommit();
System.out.println("消息发送完毕......");
} catch (Exception e){
channel.txRollback(); //事务回滚,放弃当前事务中所有要提交的消息,释放内存
} finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
接受者:
/**
* 接收消息
*/
public class TransactionReceiveMsg {
final static String QUEUE_NAME = "myTransactionQueue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//消费端声明开启事务,不需要提交,也可以获取消息
channel.txSelect();
channel.basicConsume(QUEUE_NAME, true, "",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf8");
System.out.println("接收的消息: => " + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.6 消息的发送者确认模式
事务适用于经常发生异常的情况下,当发生异常,会拒绝提交事务,而消息的发送者确认模式,适用于发生少量异常的情况,当发送消息发生异常或者消息的丢失,它会补发消息,即重新发送消息,来保证消息的一致性和正确性。
方式一:channel.waitForConfirms()普通发送方确认模式:
/**
* 发送消息到direct类型的交换机------发送者普通确认模式
*/
public class ConfirmDirectExchangeSendMsg {
//队列的名称
final static String QUEUE_NAME = "myConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "confirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "confirmDirectRoutingKey";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//启动发送者确认模式
channel.confirmSelect();
String msg = "要发送的消息...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
//阻塞线程等待服务返回响应,用于判断消息是否发送成功,如果服务端确认消息发送完成,则返回true
//还可以为这个方法指定毫秒值,用于确认我们需要等待服务端响应的超时时间,
//如果消息超过时间,则会抛出异常InterruptedException,表示服务器出现问题,需要补发消息
//或将消息缓存到redis中,稍后利用定时任务补发。
//无论是返回false,还是抛出异常,消息都有可能发送成功和失败
//如果我们的消息一定要发送到队列中,例如:订单数据,那么我们可以采用消息补发
//就是重新发送一次消息,可以使用递归或者redis + 定时任务完成补发
boolean b = channel.waitForConfirms();
System.out.println("消息发送完毕...." + b);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
方式二:channel.waitForConfirmsOrDie() 批量确认模式:
/**
* 发送消息到direct类型的交换机------发送者批量确认模式
*/
public class BatchConfirmDirectExchangeSendMsg {
//队列的名称
final static String QUEUE_NAME = "myBatchConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "batchConfirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "batchConfirmDirectRoutingKey";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//启动发送者确认模式
channel.confirmSelect();
String msg = "要发送的消息...";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
//waitForConfirmsOrDie:批量消息确认,它会同时向服务中确认之前通道中发送的所有消息是否已经全部成功写入
//这个方法没有返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时不可访问,都被认定为消息确认失败
//可能有消息没有发送成功,我们需要进行消息的补发。
//如果无法向服务器获取确认消息,那么方法抛出异常InterruptedException,这是就需要补发消息到队列中
//waitForConfirmsOrDie可以指定一个参数timeout,用于等服务器的确认时间,如果超过了这个时间也会抛出异常,表示消息需要补发。
//注意;
//批量消息的确认速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体是哪条消息没有完成发送,
//需要将本次发送的所有消息全部都进行补发。
channel.waitForConfirmsOrDie();
System.out.println("消息发送完毕....");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
方式三:channel.addConfirmListener() 异步监听发送确认模式:
/**
* 发送消息到direct类型的交换机------发送者异步监听确认模式
*/
public class AsyBatchConfirmDirectExchangeSendMsg {
//队列的名称
final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";
public static void main(String[] args) {
send();
}
public static void send(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//启动发送者确认模式
channel.confirmSelect();
//添加监听器
channel.addConfirmListener(new ConfirmListener() {
//消息确认后的回调函数
//deliveryTag:确认的消息编号,从1开始,依次递增
//multiple: 消息是否同时确认多个,true:同时确认多条 false:确认当前一条信息
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息 -- " + deliveryTag + " --- " + multiple);
}
//消息没有别确认的回调函数
//deliveryTag: 没没有确认的消息编号,从1开始,依次递增
//multiple: 消息是否同时没有确认多个
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息 -- " + deliveryTag + " --- " + multiple);
}
});
String msg = "要发送的消息...";
//批量发送消息
for (int i = 0; i < 10000; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes("utf8"));
}
System.out.println("消息发送完毕....");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(conn != null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.7 消息的消费者确认模式
手动确认消息:
/**
* 消费者消费队列中的消息,
*/
public class AsyBatchConfirmDirectExchangeReceiveMsg {
//队列的名称
final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//消息的手动确认
//参数2:false:消息的手动确认 true:消息自动确认
//自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
//判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf8");
System.out.println("消费 ---> " + msg);
//获取channel对象
Channel c = this.getChannel();
//获取消息的编号
long tag = envelope.getDeliveryTag();
//消息的手动确认
//参数1:消息的编号
//参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了
c.basicAck(tag, true);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
开启事务,必须提交事务,消息才会被确认:
/**
* 消费者消费队列中的消息,
*/
public class AsyBatchConfirmDirectExchangeReceiveMsg2 {
//队列的名称
final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//声明事务
channel.txSelect();
//消息的手动确认
//参数2:false:消息的手动确认 true:消息自动确认
//自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
//判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf8");
System.out.println("消费 ---> " + msg);
//获取channel对象
Channel c = this.getChannel();
//获取消息的编号
long tag = envelope.getDeliveryTag();
//消息的手动确认
//参数1:消息的编号
//参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了
c.basicAck(tag, true);
//当确认消息时,开启了事务,如果没有提交,那么消息不会被确认,也就不会从队列中移除,所以需要手动提交事务
c.txCommit();
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消息的防重复确认:
接收和处理完成两个状态。
/**
* 消费者消费队列中的消息,
*/
public class AsyBatchConfirmDirectExchangeReceiveMsg3 {
//队列的名称
final static String QUEUE_NAME = "myAsyBatchConfirmDirectQueue";
//交换机的名称
final static String EXCHANGE_NAME = "asyBatchConfirmDirectExchange";
//消息的routingKey
final static String ROUTING_KEY = "asyBatchConfirmDirectRoutingKey";
public static void main(String[] args) {
receive();
}
public static void receive(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/mq");
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//声明事务
channel.txSelect();
//消息的手动确认
//参数2:false:消息的手动确认 true:消息自动确认
//自动确认,如果在处理消息的过程中出现错误,会导致消息的丢失,所以改为手动确认
//判断消息是否手动确认,如果手动确认了,表示消息被处理了,需要从队列中移除
channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取channel对象
Channel c = this.getChannel();
//获取消息的编号
long tag = envelope.getDeliveryTag();
//false:表示消息没有被接收过
//true:表示消息被接收过了,或者被处理完成了,因此需要进行消息的防重复确认
boolean redeliver = envelope.isRedeliver();
if(!redeliver){ //消息没有被处理过
String msg = new String(body, "utf8");
System.out.println("消费 ---> " + msg);
//消息的手动确认
//参数1:消息的编号
//参数2:true:确认多条消息,表示 编号<=tag的消息都已经被确认了,false:表示确认当前一条消息
//c.basicAck(tag, true);
//当确认消息时,开启了事务,如果没有提交,那么消息不会被确认,也就不会从队列中移除,所以需要手动提交事务
//c.txCommit();
}else {
//消息被接收或处理过,则防止重复确认
//如果查询数据库是否已经添加了或者修改了记录,即表示消息是否被接收过一次
//如果经确认,该消息被接收过,但没被处理完成,则需要重新处理该消息,并确认该消息
//如果该消息已经处理完成,则不需要进行其他处理操作,直接调用下面的代码
//c.basicAck(tag, false);
}
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
4、SpringBoot继承RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.65.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/mq
发送者配置:分别对direct、fanout、topic类型的交换机做测试
配置:
@Configuration
public class RabbitMQConfig {
/**
* 配置一个direct类型的交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
/**
* 配置一个fanout类型的交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("bootFanoutExchange");
}
/**
* 配置一个topic类型的交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("bootTopicExchange");
}
/**
* 配置一个队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("bootQueue");
}
/**
* 配置一个队列和交换机绑定
* @param queue 队列
* @param directExchange 交换机
* @return
*/
@Bean
public Binding binding(Queue queue, DirectExchange directExchange){
//绑定一个队列到交换机
return BindingBuilder.bind(queue).to(directExchange).with("bootRoutingKey");
}
}
serviceIMpl:
@Service("sendService")
public class SendMsgImpl implements SendService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送消息到direct类型的交换机
* @param message
*/
@Override
public void sendMsg(String message) {
/**
* 参数1:交换机名称
* 参数2:routingKey
* 参数3:发送的消息对象
*/
amqpTemplate.convertSendAndReceive("bootDirectExchange","bootRoutingKey", message);
System.out.println("消息发送完毕....");
}
/**
* 发送消息到fanout类型的交换机
* @param message
*/
@Override
public void sendFanoutMsg(String message) {
amqpTemplate.convertAndSend("bootFanoutExchange","",message);
System.out.println("消息发送完毕....");
}
/**
* 发送消息到topic类型的交换机
* @param message
*/
@Override
public void sendTopicMsg(String message, String key) {
amqpTemplate.convertAndSend("bootTopicExchange",key,message);
System.out.println("消息发送完毕....");
}
}
启动类:
@SpringBootApplication
public class SpringbootMqSendApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(SpringbootMqSendApplication.class, args);
SendService sendService = context.getBean(SendService.class);
//sendService.sendMsg("springboot继承rabbitMq发送消息、.....");
//sendService.sendFanoutMsg("springboot继承rabbitMq发送消息、.....");
sendService.sendTopicMsg("springboot继承rabbitMq发送消息","aa.bb.cc");
}
}
接受者配置:
@Configuration
public class RabbitMQConfig {
/**
* 配置一个direct类型的交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
/**
* 配置一个队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("bootQueue");
}
/**
* 配置一个队列和交换机绑定
* @param queue 队列
* @param directExchange 交换机
* @return
*/
@Bean
public Binding binding(Queue queue, DirectExchange directExchange){
//绑定一个队列到交换机
return BindingBuilder.bind(queue).to(directExchange).with("bootRoutingKey");
}
}
serviceIMpl:
@Service("receiveService")
public class ReceiveMsgImpl implements ReceiveService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 注意,此方法不是持续接收消息的,每启动一次就接收一次消息,因此不使用这种方式
*/
/* @Override
public void receiveMsg() {
String msg = (String) amqpTemplate.receiveAndConvert("bootQueue");
System.out.println("接收到的消息 --- > " + msg);
}*/
/**
* direct类型的交换机
* @RabbitListener注解:作用是标记这是一个RabbitMQ的消息监听方法,用于持续的接收消息
* 被标记的方法不需要手动调用,由spring自动调用。
* queues:指定监听的消息队列
* message:接收到的具体的消息
*
* 注意:这个方法如果发生异常,能接受到消息,但是消息没有别确认,需要进行消息的防重复确认
* 如果正常,spring会确认消息,将消息从队列中移除
*/
@RabbitListener(queues = "bootQueue")
public void receiveMsg(String message) {
System.out.println("接收到的消息 --- > " + message);
//int i = 1/0;
}
//接收fanout类型绑定队列的消息,有两个,fanout类型的交换机为一对多
//fanout类型的交换机,@QueueBinding完成队列和交换机的绑定,@Queue:用于生成一个随机的队列,@Exchange:创建一个交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(),
exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
public void fanoutReceiveMsg01(String message) {
System.out.println("fanoutReceiveMsg01==》接收到的消息 --- > " + message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(),
exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
public void fanoutReceiveMsg02(String message) {
System.out.println("fanoutReceiveMsg02==》接收到的消息 --- > " + message);
}
//接收topic类型的绑定的队列的消息
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg01"),key = "aa",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg01(String message){
System.out.println("topicReceiveMsg01==》接收到的消息 aa --- > " + message);
}
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg02"),key = "aa.*",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg02(String message){
System.out.println("topicReceiveMsg02==》接收到的消息 aa.* --- 1 > " + message);
}
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg03"),key = "aa.#",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg03(String message){
System.out.println("topicReceiveMsg03==》接收到的消息 aa.# --- 0/n > " + message);
}
}
5、RabbitMQ集群
=》接收到的消息 — > " + message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(),
exchange = @Exchange(name = "bootFanoutExchange",type = ExchangeTypes.FANOUT)))
public void fanoutReceiveMsg02(String message) {
System.out.println("fanoutReceiveMsg02==》接收到的消息 --- > " + message);
}
//接收topic类型的绑定的队列的消息
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg01"),key = "aa",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg01(String message){
System.out.println("topicReceiveMsg01==》接收到的消息 aa --- > " + message);
}
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg02"),key = "aa.*",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg02(String message){
System.out.println("topicReceiveMsg02==》接收到的消息 aa.* --- 1 > " + message);
}
@RabbitListener(bindings = @QueueBinding(value=@Queue(name = "topicReceiveMsg03"),key = "aa.#",
exchange = @Exchange(value = "bootTopicExchange",type = ExchangeTypes.TOPIC)))
public void topicReceiveMsg03(String message){
System.out.println("topicReceiveMsg03==》接收到的消息 aa.# --- 0/n > " + message);
}
}
### 5、RabbitMQ集群