1.单个确认
单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。
缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布
package com.hong.rabbitmq5;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.UUID;
/**
* @Description: 发布确认-单个确认发布
* 单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。
* 缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布
* @Author: hong
* @Date: 2023-12-21 20:52
* @Version: 1.0
**/
public class Task5 {
public static void main(String[] args) throws Exception{
String queueName = UUID.randomUUID().toString();
Channel channel = RabbitMQUtil.getChannel();
//队列持久化 true持久化
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认 默认关闭
channel.confirmSelect();
long startTime = System.currentTimeMillis();
for(int i = 0; i <= 1000; i++){
String message = i + "";
//消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//发布就确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("第"+i+"个消息发布成功!");
}
}
long endTime = System.currentTimeMillis();
System.out.println("发布1000个消息单个确认发布总耗时:"+(endTime - startTime)+"ms");
}
}
2. 批量确认
批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量
缺点:一旦出故障难以确认到底是哪个消息出问题了
package com.hong.rabbitmq5;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.UUID;
/**
* @Description: 批量确认发布
* 批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量
* 缺点:一旦出故障难以确认到底是哪个消息出问题了
* @Author: hong
* @Date: 2024-01-03 17:44
* @Version: 1.0
**/
public class BatchConfirmPublish {
public static void main(String[] args) throws Exception {
String queueName = UUID.randomUUID().toString();
Channel channel = RabbitMQUtil.getChannel();
//队列持久化 true持久化
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认 默认关闭
channel.confirmSelect();
//100个确认一次
int batchSize = 100;
long startTime = System.currentTimeMillis();
for(int i = 1; i <= 1000; i++){
String message = i + "";
//消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
if(i%batchSize == 0){
//发布就确认
channel.waitForConfirms();
}
}
long endTime = System.currentTimeMillis();
System.out.println("发布1000个消息,批量确认发布总耗时:"+(endTime - startTime)+"ms");
}
}
3. 异步确认
package com.hong.rabbitmq5;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* @Description: 异步确认发布
* @Author: hong
* @Date: 2024-01-06 20:41
* @Version: 1.0
**/
public class AsynConfirmPublish {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
//队列持久化 true持久化
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认 默认关闭
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> concurrentSkipListMap =
new ConcurrentSkipListMap<>();
//消息确认回调的函数
ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
if(multiple) {
//2.删除掉已经确认的消息 剩下的就是未确认的消息
ConcurrentNavigableMap<Long, String> confirmed =
concurrentSkipListMap.headMap(deliveryTag);
confirmed.clear();
}else {
concurrentSkipListMap.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
//消息确认失败回调函数
ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
//3.打印一下未确认的消息都有哪些
String message = concurrentSkipListMap.get(deliveryTag);
System.out.println("未确认的消息tag:" + deliveryTag + "-----------未确认的消息是:" + message);
};
/*
* 消息的监听器 异步通知
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
//批量发送消息
for (int i = 1; i <= 1000; i++) {
String message= "消息" + i;
//消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//1.此处记录下所有要发送的消息 消息的总和
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布1000个消息,异步确认消息总耗时:"+(end-begin)+"ms");
}
}