文章目录
- 简单模式(simple)
- 工作队列(work)
- 准备工作
- 轮询调度
- 消息确认
- 消息持久性
- 公平分发
- 代码示例
- 本篇总结
更多相关内容可查看
简单模式(simple)
通俗概括:生产者-队列-消费者
想详细了解Rabbit的基础或简单模式的可查看以下链接
RabbitMQ–Hello World(基础详解)
工作队列(work)
准备工作
现在我们将发送代表复杂任务的字符串。我们没有一个真实的任务,比如要调整大小的图像或要渲染的 PDF 文件,所以让我们假装我们很忙 - 通过使用 Thread.sleep()
函数来实现。我们将以字符串中的点数作为其复杂性;每个点将代表一秒钟的 “工作”。例如,由 Hello… 描述的虚假任务将需要三秒钟。
从命令行发送任意消息。这个程序将任务安排到我们的工作队列中,所以让我们把它命名为 NewTask.java:
String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
为消息体中的每个点模拟一秒钟的工作。它将处理传递的消息并执行任务,所以让我们将其命名为 Worker.java:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // 确认将在下面介绍
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
我们的虚假任务以模拟执行时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
- NewTask.java:这个程序用于将任务发布到 RabbitMQ 的队列中。我们使用 channel.basicPublish()
函数来发送消息到名为 “hello” 的队列中。消息的内容从命令行获取,并以字节数组的形式发送。 - Worker.java:这个程序用于接收队列中的任务,并模拟处理这些任务。它使用 channel.basicConsume()函数来订阅队列,并在消息到达时执行回调函数 deliverCallback。在回调函数中,我们解析并处理消息,并通过 doWork()函数模拟任务执行。每个点代表一秒钟的模拟任务执行时间。
轮询调度
使用任务队列的一个优点是能够轻松地并行处理工作。如果我们正在积累工作任务,我们可以添加更多的工作者,从而轻松扩展。
首先,让我们尝试同时运行两个工作者实例。它们都会从队列中获取消息,但具体是如何进行的呢?
你需要打开三个控制台。其中两个将运行工作者程序。这些控制台将成为我们的两个消费者 - C1 和 C2。
# 控制台 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# 控制台 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新的任务。一旦启动了消费者,你就可以发布一些消息:
# 控制台 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'
让我们看看传递给我们的工作者的消息:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ会将每条消息按顺序发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式称为轮询调度。尝试使用三个或更多的工作者进行此操作。
消息确认
执行任务可能需要几秒钟,您可能会想知道如果消费者开始了一个长时间的任务,但在完成之前终止了会发生什么。通过我们当前的代码,一旦RabbitMQ将消息传递给消费者,它就会立即标记它以进行删除。在这种情况下,如果终止一个工作者,它正在处理的消息就会丢失。分派给这个特定工作者但尚未处理的消息也会丢失。
但我们不想丢失任何任务。如果一个工作者死了,我们希望任务被传递给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。确认由消费者发送回来,告诉RabbitMQ特定消息已被接收、处理,并且RabbitMQ可以安全删除它。
- 如果一个消费者死了(其通道被关闭,连接被关闭,或TCP连接丢失)而没有发送确认,RabbitMQ将理解到消息没有被完全处理,将重新将其排队。如果此时有其他消费者在线,它将快速地将其重新传递给另一个消费者。这样,即使工作者偶尔死掉,您也可以确保没有消息会丢失。
- 默认情况下对消费者的交付确认执行超时(默认为30分钟)。这有助于检测到永远不会确认交付的错误(卡住的)消费者。您可以按照交付确认超时中描述的方法增加此超时。
手动消息确认默认已打开。在以前的示例中,我们通过autoAck=
true标志明确关闭了它们。现在是时候将此标志设置为false,并且一旦完成任务,就从工作者发送适当的确认。
channel.basicQos(1); // 一次只接受一个未确认的消息(见下文)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
使用此代码,即使您使用CTRL+C
终止了正在处理消息的工作者,也可以确保不会丢失任何内容。在工作者终止后不久,所有未确认的消息都会被重新传递。
注:确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级别的协议异常。请参阅确认文档指南以了解更多信息。
忘记确认
错过basicAck是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时(可能看起来像是随机的重新传递),消息将被重新传递,但是由于无法释放任何未确认的消息,RabbitMQ将会消耗越来越多的内存。
为了调试这种类型的错误,您可以使用rabbitmqctl
打印messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
这个命令可以列出队列名称、消息准备就绪和未确认的消息数量,从而帮助您诊断是否存在遗忘确认的问题。
消息持久性
我们已经学会了如何确保即使消费者死亡, 任务不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。为了确保消息不会丢失,需要两个步骤:我们需要将队列和消息都标记为持久性。
首先,我们需要确保队列在RabbitMQ节点重新启动后仍然存在。为了做到这一点,我们需要将其声明为持久性:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管这个命令本身是正确的,但在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的非持久队列。RabbitMQ不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
这个queueDeclare的变化需要应用到生产者和消费者代码中。
到这一步,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的,
通过将MessageProperties(实现了BasicProperties)设置为PERSISTENT_TEXT_PLAIN的值。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
关于消息持久性的注意事项
将消息标记为持久性并不完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但在RabbitMQ接受消息但尚未保存它之间仍然存在一个短暂的时间窗口。此外,RabbitMQ并不会对每条消息执行fsync(2) - 它可能只是保存到缓存中,而不是真正写入 磁盘。持久性保证并不强,但绰绰有余 用于我们的简单任务队列。如果您需要更强的保证,则可以使用发布者确认。
公平分发
你可能已经注意到,调度仍然不完全符合我们的期望。例如,在有两个工作者的情况下,当所有奇数消息很重,而偶数消息很轻时,一个工作者将不断忙碌,而另一个工作者几乎不会做任何工作。嗯,RabbitMQ 对此一无所知,仍然会均匀地分发消息。
这是因为 RabbitMQ 只是在消息进入队列时进行调度。它不会查看消费者的未确认消息数量。它只是盲目地将每个第 n 个消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用basicQos
方法,并将 prefetchCount 设置为 1。这告诉 RabbitMQ 一次不要给工作者超过一个消息。换句话说,不要在工作者处理和确认前一个消息之前向其分发新消息。相反,它会将其分发给下一个尚未忙碌的工作者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
关于队列大小的注意事项
如果所有的工作者都在忙,您的队列可能会被填满。您将希望密切关注这一点,可能增加更多的工作者,或者采取其他策略。
代码示例
生产者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
//消息通道
Channel channel = connection.createChannel()) {
//队列持久性
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
//消息持久性
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//队列持久性
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//公平分发
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//消息确认
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
本篇总结
使用消息确认和 prefetchCount 可以建立一个工作队列
通过使用消息确认(message acknowledgments)和 prefetchCount,您可以设置一个工作队列。消息确认确保在处理完消息后,消费者向 RabbitMQ 确认消息已经处理完成,从而使消息不会在队列中丢失或重复处理。而 prefetchCount 则可以控制每个消费者一次获取的消息数量,以确保任务的公平分发。
持久性选项让任务即使在 RabbitMQ 重新启动后也能够存活
持久性选项允许您在声明队列和发布消息时设置,以确保即使 RabbitMQ 服务重新启动,任务也能够存活并在恢复后继续进行处理。这对于确保任务不会在系统故障或服务重启时丢失非常重要