1.SleepUtil线程睡眠工具类
package com.hong.utils;
/**
* @Description: 线程睡眠工具类
* @Author: hong
* @Date: 2023-12-16 23:10
* @Version: 1.0
**/
public class SleepUtil {
public static void sleep(int second) {
try {
Thread.sleep(1000*second);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2.消息生产者
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @Description: 消息手动应答时不丢失,放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 22:33
* @Version: 1.0
**/
public class Task3 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完成------" + message);
}
}
}
3.两个消费者
模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)
3.1.处理时间短
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description: 消息手动应答时不丢失,放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 23:05
* @Version: 1.0
**/
public class Worker3 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
System.out.println("worker3等待接收消息,处理速度快");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
SleepUtil.sleep(1);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 第一个参数:消息标识
* 第二个参数是否批量:true批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
//手动应答false
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
3.2.处理时间长
package com.hong.rabbitmq3;
import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description: 消息手动应答时不丢失, 放回队列重新消费
* @Author: hong
* @Date: 2023-12-16 23:05
* @Version: 1.0
**/
public class Worker4 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
System.out.println("worker4等待接收消息,处理速度慢");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
SleepUtil.sleep(20);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 第一个参数:消息标识
* 第二个参数是否批量:true批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
//手动应答false
channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
4.结果
启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
5.持久化
5.1.队列持久化
package com.hong.rabbitmq4;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @Description: 队列持久化
* @Author: hong
* @Date: 2023-12-17 22:52
* @Version: 1.0
**/
public class Task4 {
public static final String TASK_QUEUE_NAME = "persist_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
//true持久化
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("消息发送完成------" + message);
}
}
}
5.2.消息持久化
package com.hong.rabbitmq4;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
/**
* @Description: 队列持久化与消息持久化
* @Author: hong
* @Date: 2023-12-17 22:52
* @Version: 1.0
**/
public class Task4 {
public static final String TASK_QUEUE_NAME = "persist_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
//队列持久化 true持久化
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
while (scanner.hasNext()){
String message = scanner.next();
//消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("消息发送完成------" + message);
}
}
}