RabbitMQ 消费者

  RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。
  消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制

消费者

  • 拉模式
    • 拉模式的实现
  • 推模式
  • 消费确认与拒绝
    • 消息确认的实现
    • 消息拒绝的实现
    • basicRecover
  • basicQos 限制消费
  • 总结

拉模式

  顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。
在这里插入图片描述
  RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者,消费者接收处理消息后向RabbitMQ发送消费应答,然后该消息将从队列中移除。
  需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景,如果以循环的方式获取批量数据将影响RabbitMQ的性能。

拉模式的实现

  拉模式通过以下方法实现:

/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)

  如上述代码所示channel.basicGet方法返回的是一个GetResponse,在GetResponse对象中包含了一条消息内容,消费者可以获取该消息并进行处理。

推模式

  推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费,只要队列中存在消息则向消费者推送,当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。

    /**
     * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
     * method.
     * Provide access to <code>basic.deliver(Broker推送消息)</code>, <code>basic.cancel</code>
     * and shutdown signal callbacks (which is sufficient
     * for most cases). See methods with a {@link Consumer} argument
     * to have access to all the application callbacks.
     * @param queue 队列名称
     * @param autoAck 是否自动确认
     * @param consumerTag 消费者标签,消费者的唯一标识符
     * @param noLocal 是否可以接收同Connection中生产者的消息(true不能接收)
     * @param exclusive 是否设置排他
     * @param arguments 其他参数
     * @param deliverCallback 消息接收回调
     * @param cancelCallback 消费取消回调
     * @param shutdownSignalCallback 连接或者信道关闭回调
     * @return the consumerTag associated with the new consumer
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  可以通过上述两种方法(设置参数最多的)实现声明消费者。其中Consumer的定义如下:


public interface Consumer {
    /**
     * 消费者通过basicConsume被注册后调用
     */
    void handleConsumeOk(String consumerTag);

    /**
     * 消费者通过basicCancel取消时调用
     */
    void handleCancelOk(String consumerTag);

    /**
     * 消费者不通过basicCancel取消时调用
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * 通道或者连接关闭时调用
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
     * 接收重新发送的未被确认的消息时调用
     */
    void handleRecoverOk(String consumerTag);

    /**
     * 接收消息时调用
     * @param consumerTag 消费者标签
     * @param envelope 打包消息的数据
     * @param properties 消息的内容标头数据
     * @param body 消息内容
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

消费确认与拒绝

  为了保障消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认,然后从队列中删除;当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息(先标记再删除)。
  autoAck参数意为自动应答,但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认,无需消费者进行应答。

  当autoAck参数为false时,对于RabbitMQ服务器而言,队列中的消息分成两部分:一部分时等待投递给消费者的消息;一部分时已经投递给消费者,但是还未收到消费者确认消息的消息
  RabbitMQ不会为未确认的消息设置过期时间,如果一个消息一直未被消费者确认,那么这个消息再RabbitMQ中将一直保存为投递未确认状态,指导消费者确认或者消费者断开连接,如果消费者断开连接,则该消费者接收但未确认的消息将重新入队。

消息确认的实现

  消息的显式确认需要消费者再声明的过程中设置autoAck=false。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下:

	 /**
     * @param 消息的标签,可通过Delivery.getEnvelope().getDeliveryTag()获取
     * @param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答,如果为false则仅应答一条消息
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

  当进行消息的批量确认时,将所有发送给该消费者未确认的消息进行确认,而针对监听同一队列的其他消费者的未确认消息并不进行处理。

消息拒绝的实现

  RabbitMQ提供了两种消息拒绝的方法:Basic.Reject和Basic.Nack命令;其两者的区别时Nack可以进行批量拒绝。

    /**
     * @param deliveryTag 消息标签
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

    /**
     * @param deliveryTag 消息标签
     * @param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

basicRecover

该方法可以将某个消费者未应答(确认或者拒绝)的消息重新入队,该方法会导致:

  • 投递而未被应答的消息可以重新发送给消费者进行处理
  • 消费者的消息队列被清空,可以重新接收到其他消息
    /**
     * <p>
     *  Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * </p>
     * Equivalent to calling <code>basicRecover(true)</code>, messages
     * will be requeued and possibly delivered to a different consumer.
     * @see #basicRecover(boolean)
     */
     Basic.RecoverOk basicRecover() throws IOException;

    /**
     * Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * @param requeue If true, messages will be requeued and possibly
     * delivered to a different consumer. If false, messages will be
     * redelivered to the same consumer.
     */
    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

basicQos 限制消费

  默认情况下,消费者对于接收的消息数量并未限制,也就是说,一旦RabbitMQ中接收到消息并且存在消费者,则RabbitMQ将把消息发送到相关的消费者中,并不关心消费者是否消息完信息。
  轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下:

    /**
     * @param prefetchSize 消息大小
     * @param prefetchCount 消息数量
     * @param global 是否全局
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

针对global参数需要注意一下内容:

  • 当global=true时信道上所有的消费者都需要遵从消息数量限定值(某个信道上所有消费者未确认消息数量<=prefetchCount)
  • 等global=false时新的消费者需要遵从消息数量的限定值。
  • 可以调用两次basicQos方法,并使用不同的global参数,这种情况下两次配置都可以生效。

总结

  消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式,推模式的是使用场景相对比较多。
  为确保消息被合法的消费,RabbitMQ提供了消费确认机制,投递的消息并不能被理解完成了消费,仅消费者确认消费该消息才会被移除队列。
  默认的消息投递机制时轮询,轮询的消息分发并会关系消费者的性能以及消息积压的问题,因此需要限制每个消费者所能保持的最大未确认的消息数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/94766.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

用Kubernetes(k8s)的ingress部署https应用

用Kubernetes的ingress部署https应用 环境准备Ingress安装域名证书准备 部署应用通过ingress暴露应用根据ssl证书生成对应的secret创建ingress暴露部署的应用确认自己安装了ingress创建ingress 访问你暴露的应用 环境准备 Ingress安装 我之前有一片文章写的是用ingress暴露应…

在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字

一、说明 为了预测从0到9的数字&#xff0c;我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中&#xff0c;我将根据像素数据&#xff08;即数值数据&#xff09;和卷积神经网络预测数字。 二、 卷积…

基于微服务、Java、Springcloud、Vue、MySQL开发的智慧工地管理系统源码

智慧工地聚焦施工现场岗位一线&#xff0c;围绕“人、机、料、法、环”五大要素&#xff0c;数字化工地平台与现场多个子系统的互联实现了工地业务间的互联互通和协同共享。数字化工地管理平台能够盘活工地各大项目之间孤立的信息系统&#xff0c;实现数据的统一接入、处理与维…

❤ windows 安装后台java开发环境JDK 、MySQL 、Redis

❤ windows 安装后台java开发环境 1、windows 安装 JDK. 下载地址&#xff1a; http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 1、下载安装 官网点击下载安装 网盘 jdk安装包 链接&#xff1a;https://pan.baidu.com/s/1sdxA6B…

李宏毅 2022机器学习 HW2 上分路线

strong baseline上分路线 baseline增加concat_nframes &#xff08;提升明显&#xff09;增加batchnormalization 和 dropout增加hidden layer宽度至512 &#xff08;提升明显&#xff09; 提交文件命名规则为 prediction_{concat_nframes}[{n_hidden_layers}{dropout}_bn].c…

如何DIY制作干洗店洗护小程序

洗护行业正逐渐迎来线上化的浪潮&#xff0c;传统的干洗店也开始尝试将业务线上化&#xff0c;以提供更便捷的服务给消费者。而制作一款洗护小程序&#xff0c;成为了干洗店实现线上化的重要一环。今天&#xff0c;我们就来分享一下如何使用第三方制作平台制作洗护小程序的教程…

禅道后台命令执行漏洞 (二)

漏洞简介 禅道是第一款国产的开源项目管理软件。它集产品管理、项目管理、质量管理、文档管理、 组织管理和事务管理于一体&#xff0c;是一款专业的研发项目管理软件&#xff0c;完整地覆盖了项目管理的核心流程。禅道管理思想注重实效&#xff0c;功能完备丰富&#xff0c;操…

纯 CSS 开关切换按钮

<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>纯 CSS 开关切换按钮</title><style>html {font-size: 62.5%;}body {background-color: #1848a0;}.wrapper {position: absolute;left: …

激光三角测距原理

激光三角测距原理 1. 简介2. 直射式激光三角测距法3. 斜射式激光三角测距法 1. 简介 激光三角测量法是利用光线空间传播过程中的光学反射规律和相似三角形原理&#xff0c;在接收透镜的物空间与像空间构成相似三角形关系&#xff0c;同时利用边角关系计算出待测位移。根据入射…

『赠书活动 | 第十八期』《深入浅出SSD:固态存储核心技术、原理与实战》

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 『赠书活动 &#xff5c; 第十八期』 本期书籍&#xff1a;《深入浅出SSD&#xff1a;固态存储核心技术、原理与实战》 赠书规则&#xff1a;评论区&#xff1a;点赞&…

Android中的APK打包与安全

aapt2命令行实现apk打包 apk文件结构 classes.dex&#xff1a;Dex&#xff0c;即Android Dalvik执行文件 AndroidManifest.xml&#xff1a;工程中AndroidManifest.xml编译后得到的二进制xml文件 META-INF&#xff1a;主要保存各个资源文件的SHA1 hash值&#xff0c;用于校验…

神经网络的工作原理

目录 神经网络的介绍 神经网络的组成 神经网络的工作原理 Numpy 实现神经元 Numpy 实现前向传播 Numpy 实现一个可学习的神经网络 神经网络的介绍 神经网络受人类大脑启发的算法。简单来说&#xff0c;当你睁开眼睛时&#xff0c;你看到的物体叫做数据&#xff0c;再由你…

项目 - 后端技术栈转型方案

前言 某开发项目的后端技术栈比较老了&#xff0c;现在想换到新的技术栈上。使用更好的模式、设计思想、更合理的架构等&#xff0c;为未来的需求迭代做铺垫。怎么办呢&#xff1f;假设系统目前在线上运行着的&#xff0c;直接整体换的话耗时太久&#xff0c;且中间还有新的需…

GaussDB技术解读系列:高级压缩之OLTP表压缩

8月16日&#xff0c;第14届中国数据库技术大会&#xff08;DTCC2023&#xff09;在北京国际会议中心顺利举行。在GaussDB“五高两易”核心技术&#xff0c;给世界一个更优选择的专场&#xff0c;华为云数据库GaussDB首席架构师冯柯对华为云GaussDB数据库的高级压缩技术进行了详…

《Flink学习笔记》——第六章 Flink的时间和窗口

6.1 时间语义 6.1.1 Flink中的时间语义 对于一台机器而言&#xff0c;时间就是系统时间。但是Flink是一个分布式处理系统&#xff0c;多台机器“各自为政”&#xff0c;没有统一的时钟&#xff0c;各自有各自的系统时间。而对于并行的子任务来说&#xff0c;在不同的节点&…

开发一款AR导览导航小程序多少钱?ar地图微信小程序 ar导航 源码

随着科技的不断发展&#xff0c;增强现实&#xff08;AR&#xff09;技术在不同领域展现出了巨大的潜力。AR导览小程序作为其中的一种应用形式&#xff0c;为用户提供了全新的观赏和学习体验。然而&#xff0c;开发一款高质量的AR导览小程序需要投入大量的时间、人力和技术资源…

WebGPU加载Wavefront .OBJ模型文件

在开发布料模拟之前&#xff0c;我想使用 WebGPU 开发强大的代码基础。 这就是为什么我想从 Wavefront .OBJ 文件加载器开始渲染 3D 模型。 这样&#xff0c;我们可以快速渲染 3D 模型&#xff0c;并构建一个简单而强大的渲染引擎来完成此任务。 一旦我们有了扎实的基础&#x…

为C# Console应用化个妆

说到Windows的cmd&#xff0c;刻板印象就是黑底白字的命令行界面。跟Linux花花绿绿的界面比&#xff0c;似乎单调了许多。但其实C#开发的Console应用也可以摆脱单调非黑即白的UI。 最近遇到个需求&#xff0c;要在一堆纯文本文件里找指定的关键字&#xff08;后续还要人肉判断…

m4s格式转换mp4

先安装 ffmpeg&#xff0c;具体从官网可以查到&#xff0c;https://ffmpeg.org&#xff0c;按流程走。 转换代码如下&#xff0c;可以任意选择格式导出 import subprocess import osdef merge_audio_video(input_audio_path, input_video_path, output_mp4_path):# 构建 FFmpe…

bootloader串口更新程序[瑕疵学习板]

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、储备知识二、程序步骤2.程序展示1.bootloader2.然后是主运行函数总结前言 很久没有更新文章了。最近工作太忙,没有学习很多的知识,然后这两天不忙了,就学习了一下bootloader的程序升级…