RabbitMQ-工作模式(简单模式工作队列)

在这里插入图片描述

文章目录

  • 简单模式(simple)
  • 工作队列(work)
    • 准备工作
    • 轮询调度
    • 消息确认
    • 消息持久性
    • 公平分发
    • 代码示例
  • 本篇总结

更多相关内容可查看

简单模式(simple)

P
你好
C

通俗概括:生产者-队列-消费者

想详细了解Rabbit的基础或简单模式的可查看以下链接
RabbitMQ–Hello World(基础详解)

工作队列(work)

在这里插入图片描述

准备工作

现在我们将发送代表复杂任务的字符串。我们没有一个真实的任务,比如要调整大小的图像或要渲染的 PDF 文件,所以让我们假装我们很忙 - 通过使用 Thread.sleep() 函数来实现。我们将以字符串中的点数作为其复杂性;每个点将代表一秒钟的 “工作”。例如,由 Hello… 描述的虚假任务将需要三秒钟。

从命令行发送任意消息。这个程序将任务安排到我们的工作队列中,所以让我们把它命名为 NewTask.java:

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

为消息体中的每个点模拟一秒钟的工作。它将处理传递的消息并执行任务,所以让我们将其命名为 Worker.java:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // 确认将在下面介绍
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

我们的虚假任务以模拟执行时间:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}
  • NewTask.java:这个程序用于将任务发布到 RabbitMQ 的队列中。我们使用 channel.basicPublish()
    函数来发送消息到名为 “hello” 的队列中。消息的内容从命令行获取,并以字节数组的形式发送。
  • Worker.java:这个程序用于接收队列中的任务,并模拟处理这些任务。它使用 channel.basicConsume()函数来订阅队列,并在消息到达时执行回调函数 deliverCallback。在回调函数中,我们解析并处理消息,并通过 doWork()函数模拟任务执行。每个点代表一秒钟的模拟任务执行时间。

轮询调度

使用任务队列的一个优点是能够轻松地并行处理工作。如果我们正在积累工作任务,我们可以添加更多的工作者,从而轻松扩展。

首先,让我们尝试同时运行两个工作者实例。它们都会从队列中获取消息,但具体是如何进行的呢?

你需要打开三个控制台。其中两个将运行工作者程序。这些控制台将成为我们的两个消费者 - C1 和 C2。

# 控制台 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

# 控制台 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布新的任务。一旦启动了消费者,你就可以发布一些消息:

# 控制台 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'

让我们看看传递给我们的工作者的消息:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会将每条消息按顺序发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式称为轮询调度。尝试使用三个或更多的工作者进行此操作。

消息确认

执行任务可能需要几秒钟,您可能会想知道如果消费者开始了一个长时间的任务,但在完成之前终止了会发生什么。通过我们当前的代码,一旦RabbitMQ将消息传递给消费者,它就会立即标记它以进行删除。在这种情况下,如果终止一个工作者,它正在处理的消息就会丢失。分派给这个特定工作者但尚未处理的消息也会丢失。

但我们不想丢失任何任务。如果一个工作者死了,我们希望任务被传递给另一个工作者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。确认由消费者发送回来,告诉RabbitMQ特定消息已被接收、处理,并且RabbitMQ可以安全删除它。

  • 如果一个消费者死了(其通道被关闭,连接被关闭,或TCP连接丢失)而没有发送确认,RabbitMQ将理解到消息没有被完全处理,将重新将其排队。如果此时有其他消费者在线,它将快速地将其重新传递给另一个消费者。这样,即使工作者偶尔死掉,您也可以确保没有消息会丢失。
  • 默认情况下对消费者的交付确认执行超时(默认为30分钟)。这有助于检测到永远不会确认交付的错误(卡住的)消费者。您可以按照交付确认超时中描述的方法增加此超时。

手动消息确认默认已打开。在以前的示例中,我们通过autoAck=true标志明确关闭了它们。现在是时候将此标志设置为false,并且一旦完成任务,就从工作者发送适当的确认。

channel.basicQos(1); // 一次只接受一个未确认的消息(见下文)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

使用此代码,即使您使用CTRL+C终止了正在处理消息的工作者,也可以确保不会丢失任何内容。在工作者终止后不久,所有未确认的消息都会被重新传递。

注:确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级别的协议异常。请参阅确认文档指南以了解更多信息。

忘记确认
错过basicAck是一个常见的错误。这是一个容易犯的错误,但后果很严重。当您的客户端退出时(可能看起来像是随机的重新传递),消息将被重新传递,但是由于无法释放任何未确认的消息,RabbitMQ将会消耗越来越多的内存。

为了调试这种类型的错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,去掉sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

这个命令可以列出队列名称、消息准备就绪和未确认的消息数量,从而帮助您诊断是否存在遗忘确认的问题。

消息持久性

我们已经学会了如何确保即使消费者死亡, 任务不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。为了确保消息不会丢失,需要两个步骤:我们需要将队列和消息都标记为持久性。

首先,我们需要确保队列在RabbitMQ节点重新启动后仍然存在。为了做到这一点,我们需要将其声明为持久性:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管这个命令本身是正确的,但在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的非持久队列。RabbitMQ不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queueDeclare的变化需要应用到生产者和消费者代码中。

到这一步,我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的,

通过将MessageProperties(实现了BasicProperties)设置为PERSISTENT_TEXT_PLAIN的值。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

关于消息持久性的注意事项
将消息标记为持久性并不完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但在RabbitMQ接受消息但尚未保存它之间仍然存在一个短暂的时间窗口。此外,RabbitMQ并不会对每条消息执行fsync(2) - 它可能只是保存到缓存中,而不是真正写入 磁盘。持久性保证并不强,但绰绰有余 用于我们的简单任务队列。如果您需要更强的保证,则可以使用发布者确认。

公平分发

你可能已经注意到,调度仍然不完全符合我们的期望。例如,在有两个工作者的情况下,当所有奇数消息很重,而偶数消息很轻时,一个工作者将不断忙碌,而另一个工作者几乎不会做任何工作。嗯,RabbitMQ 对此一无所知,仍然会均匀地分发消息。

这是因为 RabbitMQ 只是在消息进入队列时进行调度。它不会查看消费者的未确认消息数量。它只是盲目地将每个第 n 个消息分发给第 n 个消费者。
在这里插入图片描述

为了解决这个问题,我们可以使用basicQos方法,并将 prefetchCount 设置为 1。这告诉 RabbitMQ 一次不要给工作者超过一个消息。换句话说,不要在工作者处理和确认前一个消息之前向其分发新消息。相反,它会将其分发给下一个尚未忙碌的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

关于队列大小的注意事项
如果所有的工作者都在忙,您的队列可能会被填满。您将希望密切关注这一点,可能增加更多的工作者,或者采取其他策略。

代码示例

生产者示例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
    	//消息通道
        Channel channel = connection.createChannel()) {
        //队列持久性
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);
		//消息持久性
        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

消费者示例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
	//队列持久性
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
	//公平分发
    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    //消息确认
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

本篇总结

使用消息确认和 prefetchCount 可以建立一个工作队列

通过使用消息确认(message acknowledgments)和 prefetchCount,您可以设置一个工作队列。消息确认确保在处理完消息后,消费者向 RabbitMQ 确认消息已经处理完成,从而使消息不会在队列中丢失或重复处理。而 prefetchCount 则可以控制每个消费者一次获取的消息数量,以确保任务的公平分发。

持久性选项让任务即使在 RabbitMQ 重新启动后也能够存活

持久性选项允许您在声明队列和发布消息时设置,以确保即使 RabbitMQ 服务重新启动,任务也能够存活并在恢复后继续进行处理。这对于确保任务不会在系统故障或服务重启时丢失非常重要

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/686559.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ESD防护SP3232E真+3.0V至+5.5V RS-232收发器

特征 采用3.0V至5.5V电源,符合真正的EIA/TIA-232-F标准 满载时最低 120Kbps 数据速率 1μA 低功耗关断,接收器处于活动状态 (SP3222E) 可与低至 2.7V 电源的 RS-232 互操作 增强的ESD规格: 15kV人体模型 15kV IEC1000…

软件杯 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 基…

计算机毕业设计Spark+Flink+Hive地铁客流量预测 交通大数据 地铁客流量大数据 交通可视化 大数据毕业设计 深度学习 机器学习

项目说明​ ​ 1该项目主要分析通刷卡数据,通过大数据技术来研究地铁客运能力及探索优化服务的方向​ 2主要讲解Flink流处理实时分析部分,离线部分较简单,暂时略过​ ​ 技术架构​ ​项目流程:​ 采用python请求深圳地铁数…

70 Realistic Mountain Environment Textures Cliff(70+张真实的山地环境纹理)

大量适合山区和其他岩石环境的纹理--悬崖、岩石、砾石等等 每个纹理都是可贴的/无缝的,并且完全兼容各种不同的场景--标准Unity地形、Unity标准着色器、URP、HDRP等等都兼容。 所有的纹理都是4096x4096,并包括一个HDRP掩码,以完全支持HDRP。 特点。 70种质地 70种材料 70个地…

基于springboot实现农产品直卖平台系统项目【项目源码+论文说明】

基于springboot实现农产品直卖平台系统的设计演示 摘要 计算机网络发展到现在已经好几十年了,在理论上面已经有了很丰富的基础,并且在现实生活中也到处都在使用,可以说,经过几十年的发展,互联网技术已经把地域信息的隔…

内网快速传输工具

常见的有LANDrop,支持多种设备,如电脑、pad、手机等等之间互传。但本文介绍的这款是很小的电脑间互传工具。 特点是非常的快速,文件很小,不用安装解压就可用。

transformers peft加载lora模型;TextStreamer流式输出,kv cache使用

1、transformers peft加载lora模型 https://github.com/hiyouga/LLaMA-Factory/blob/cae47379079ff811aa385c297481a27020a8da6b/scripts/loftq_init.py#L13 代码: from peft import AutoPeftModelForCausalLM, PeftModel from transformers import AutoTokenizer…

《手把手教你》系列练习篇之13-python+ selenium自动化测试 -压轴篇(详细教程)

1. 简介 “压轴”原本是戏曲名词,指一场折子戏演出的倒数第二个剧目。在现代社会中有很多应用,比如“压轴戏”,但压轴也是人们知识的一个盲区。“压轴”本意是指倒数第二个节目,而不是人们常说的倒数第一个,倒数第一个…

Incredibuild for Mac 来了!

Mac 开发者在寻找适合自己需求的工具时可能会遇到一些困难,因为 Mac 操作系统相对封闭,不像其他系统那样开放和灵活。尽管如此,Mac 开发者在开发应用程序时的需求(比如功能、效率等)和使用其他操作系统的开发者是类似的…

C++ - 查找算法 和 其他 算法

目录 一. 查找算法: 1.顺序查找: 2.二分查找: 二. 其他算法: 1.遍历算法: 2.求和、求平均值等聚合算法。 a.求和算法: b.求平均值算法: 一. 查找算法: 1.顺序查找&#xff1…

如何访问内网数据库?

现如今,随着信息化的不断发展,数据库已经成为了企业管理和数据存储的重要组成部分。由于安全等原因,很多公司和组织将自己的数据库部署在内网中,限制了外部的访问。有些情况下,我们仍然需要在外部网络环境中访问内网的…

C++开发基础之初探CUDA计算环境搭建

一、前言 项目中有使用到CUDA计算的相关内容。但是在早期CUDA计算环境搭建的过程中,并不是非常顺利,编写此篇文章记录下。对于刚刚开始研究的你可能会有一定的帮助。 二、环境搭建 搭建 CUDA 计算环境涉及到几个关键步骤,包括安装适当的 C…

:长亭雷池社区版动态防护体验测评

序 长亭雷池在最近发布了动态防护功能,据说可以动态加密保护网页前端代码和阻止爬虫行为、阻止漏洞扫描行为等。今天就来体验测试一下 WAF 是什么 WAF 是 Web Application Firewall 的缩写,也被称为 Web 应用防火墙。区别于传统防火墙,WAF …

Error:..\FreeRTOS\portable\RVDS\ARM_CM7\r0p1\port.c,265

移植完FreeRTOS后,使用Keil进行编译,编译未报错,串口打印助手打印了错误报告。 串口打印的错误报告: Error:..\FreeRTOS\portable\RVDS\ARM_CM7\r0p1\port.c,265看一下265行 该行所在函数为prvTaskExitError函数,功能…

阅读笔记:Multi-threaded Rasterization in the Chromium Compositor

Multi-threaded Rasterization in the Chromium Compositor PPT 原始链接: https://docs.google.com/presentation/d/1nPEC4YRz-V1m_TsGB0pK3mZMRMVvHD1JXsHGr8I3Hvc/edit?uspsharing PPT主要介绍了Chromium浏览器中使用多线程光栅化(Impl-side painting)的机制&a…

目标检测——FGVC-Aircraft数据集

引言 亲爱的读者们,您是否在寻找某个特定的数据集,用于研究或项目实践?欢迎您在评论区留言,或者通过公众号私信告诉我,您想要的数据集的类型主题。小编会竭尽全力为您寻找,并在找到后第一时间与您分享。 …

【Vue】Vue路由-重定向

问题 网页打开时, url 默认是 / 路径,未匹配到组件时,会出现空白 解决方案 重定向 → 匹配 / 后, 强制跳转 /home 路径 语法 { path: 匹配路径, redirect: 重定向到的路径 }, 比如: { path:/ ,redirect:/home }代码示例 const…

docker构建jdk17镜像

资料参考 参考自黑马教程:10.Docker基础-自定义镜像_哔哩哔哩_bilibili 更多详细语法声明,请参考官网文档:https://docs.docker.com/engine/reference/builder 初步准备 1、下载jdk17包(linux版),我这边版…

微信小程序多端框架打包后发布到APP Store

IPA 上架 App Store 生成 iOS 证书和 Provisioning Profile iOS 开发者账号缴/续费的发票查看和获取 个人开发者把小程序发布到 App Store 5个步骤(保姆级教程) 一、参数的设置、证书的生成、生成profile文件 微信小程序多端应用Donut IOS相关的参数…

佳能5DMARK IV mov视频覆盖的恢复方法

5DMARK IV算是佳能比较经典的一款摄像机,是佳能早期使用MOV的摄像机之一,MOV是当初佳能高端机的象征,当然现在佳能已经不在通过MOV和MP4来区分硬件级别了。下边这个案例是文件拍摄时断电,结果变成0字节,然后覆盖了部分…