【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、常见用法
    • 1.消息可靠性
    • 2.持久化机制
    • 3.消息积压
      • 批量消费:增加 prefetch 的数量,提高单次连接的消息数
      • 并发消费:多部署几台消费者实例
    • 4.重复消费
  • 二、其他
    • 1.队列存在大量unacked数据
    • 2.重试连接:建立连接
    • 3.rabbitmq心跳连接
    • 4.重试连接:消费ack确认前连接异常断开时


前言


一、常见用法

1.消息可靠性

RabbitMQ 提供了多种机制来确保消息的可靠性,以防止消息丢失或被意外删除。以下是几种提高消息可靠性的方法:

  1. 持久化消息(Durable Message):在发布消息时,将消息的 deliveryMode 设置为 2,即可将消息设置为持久化消息。持久化消息会将消息写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。

  2. 持久化队列(Durable Queue):创建队列时,将队列的 durable 参数设置为 true,即可创建一个持久化队列。持久化队列会将队列的元数据和消息都存储在磁盘上,即使消息队列服务器重启,队列的元数据和消息仍然可以恢复。

  3. 确认模式(Publisher Confirms):使用确认模式可以确保消息被成功发送到 RabbitMQ 服务器,并得到确认。通过在信道上使用 channel.confirmSelect() 启用确认模式,然后通过 channel.waitForConfirms() 方法来等待服务器的确认。

  4. 事务模式(Transactions):使用事务模式可以保证消息的原子性,要么全部发送成功,要么全部失败。通过在信道上使用 channel.txSelect() 开启事务模式,在发送消息后使用 channel.txCommit() 提交事务,或使用 channel.txRollback() 进行回滚。

  5. 消费者应答(Consumer Acknowledgement):在消费者接收和处理消息后,必须发送确认应答给 RabbitMQ 服务器。通过使用 channel.basicAck() 方法发送确认应答,以告知服务器消息已经成功处理。

通过使用上述机制,可以在 RabbitMQ 中实现消息的可靠性传输和处理,以防止消息的丢失和重复传递。
这里有篇博客,大家可以看看。

2.持久化机制

在RabbitMQ中,消息持久化是一种机制,可以确保消息在服务器宕机或重启之后不丢失。默认情况下,RabbitMQ的消息是存储在内存中的,如果服务器宕机,则会导致消息的丢失。要实现消息的持久化,可以采取以下步骤:

  1. 创建一个持久化的交换机(Exchange):
    在定义交换机时,将其durable参数设置为true,例如:

    channel.exchangeDeclare("exchange_name", "direct", true);
    
  2. 创建一个持久化的队列(Queue):
    在定义队列时,将其durable参数设置为true,例如:

    channel.queueDeclare("queue_name", true, false, false, null);
    
  3. 将持久化的队列与交换机进行绑定:
    使用队列和交换机的bind方法进行绑定,例如:

    channel.queueBind("queue_name", "exchange_name", "routing_key");
    
  4. 发布持久化的消息:
    在发布消息时,将消息的deliveryMode属性设置为2,表示消息是持久化的,例如:

    String message = "Hello RabbitMQ!";
    channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    

通过以上步骤,就可以实现消息的持久化。当RabbitMQ服务器宕机或重启后,消息会被保存在磁盘中,并在服务器恢复后重新投递给消费者。需要注意的是,虽然消息被持久化了,但是在发送到队列之前,仍然有可能发生丢失,所以在实际的应用中,还需要考虑一些因素,比如网络故障、消费者的可靠性等。

3.消息积压

批量消费:增加 prefetch 的数量,提高单次连接的消息数

为了提高消费性能,可以将多个消息批量进行消费,减少消费者和消息队列的交互次数。通过设置合适的批量消费大小,可以在一次网络往返中消费多个消息,从而提高消费性能。
要实现RabbitMQ的批量消费,可以使用RabbitMQ的channel.basicQos方法来设置每次消费的消息数量。以下是一个示例代码,演示如何实现批量消费:

import pika

def callback(ch, method, properties, body):
    print("Received message: %s" % body)
    # 处理消息的逻辑

    # 发送确认给RabbitMQ
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_messages():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 设置每个消费者一次性获取的消息数量
    channel.basic_qos(prefetch_count=10)

    # 注册消费者并开始消费消息
    channel.basic_consume(queue='my_queue', on_message_callback=callback)

    # 进入一个循环,一直等待消息的到来
    channel.start_consuming()

consume_messages()

在这里插入图片描述

在上面的代码中,我们通过channel.basic_qos(prefetch_count=10)设置每次处理的消息数量为10。这样,在消费者处理完10条消息之前,RabbitMQ将不会再向其发送更多消息。

这样,就实现了RabbitMQ的批量消费。你可以根据需求,在basic_qos方法中设置适合你的消息数量。

并发消费:多部署几台消费者实例

可以采用多线程或多进程的方式进行消息的并发消费,将多个消费者并行处理消息。通过增加并发消费者的数量,可以提高消息的处理速度,提高消费的性能。
使用进程池来消费RabbitMQ的消息可以更好地管理并发性能。通过使用进程池,可以在一个固定的池子中创建多个进程,并且复用它们来消费消息,从而减少进程创建和销毁的开销。

以下是一个使用进程池消费RabbitMQ消息的示例:

import multiprocessing
import os
import time
import pika

def consumer(queue_name):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)

    def callback(ch, method, properties, body):
        print(f'Process {os.getpid()} received message: {body}')
        time.sleep(1)

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    channel.start_consuming()

def main():
    # 创建进程池
    pool = multiprocessing.Pool(processes=5)

    # 在进程池中提交任务
    for _ in range(5):
        pool.apply_async(consumer, ('my_queue',))

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

在上述示例中,我们使用multiprocessing.Pool来创建一个包含5个进程的进程池。然后,我们使用apply_async方法向进程池中提交任务,每个任务都是调用consumer函数来消费"my_queue"队列中的消息。进程池会自动分配任务给闲置的进程来处理。通过closejoin方法,我们可以确保所有任务都被完成。

4.重复消费

  1. 消息确认:在消费者处理完一条消息后,通过调用basic_ack方法手动确认消息已经成功消费。这样,RabbitMQ就会将该消息标记为已经处理,不会再次发送给其他消费者。同时,还可以设置auto_ack参数为False,禁用自动消息确认机制,以确保消息被正确确认。

  2. 消息持久化:可以通过设置消息的delivery_mode属性为2来将消息标记为持久化消息。这样,即使消费者在处理消息时发生故障,消息也会被保存在磁盘上,待消费者恢复正常后会重新投递。

  3. 唯一消费者:可以通过设置队列的exclusive参数为True,创建一个排他队列。这样,只有一个消费者可以连接到该队列,并独占地消费其中的消息,避免重复消费。

  4. 消息去重:在消费者端可以维护一个已消费消息的记录,例如在数据库或缓存中记录已消费的消息的ID或唯一标识。每次消费消息时,先检查记录中是否已经存在该消息,如果存在则跳过,避免重复处理。

  5. 幂等操作:在消费者的处理逻辑中,要确保操作是幂等的,即多次执行同一个操作的效果和执行一次的效果是一样的。这样,即使消息被重复消费,也不会产生副作用。

二、其他

1.队列存在大量unacked数据

通过rabbitmq的后台管理,进入相应的队列,滑到最下边,找到purge。purge将清空这个队列的消息。
在这里插入图片描述

2.重试连接:建立连接

import pika
from retry import retry

	@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
    def consume(self, callback):
        """Start consuming AMQP messages in the current process"""
	    try:
	        self.start_consuming_message()
	    except ConnectionClosed as e:
	        self.clear()
	        self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
	    except ChannelClosed as e:
	        self.clear()
	        self.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
	   finally:
	        self.start_consuming_message()

3.rabbitmq心跳连接

RabbitMQ 心跳是一种保持连接活跃的机制。当 RabbitMQ 与客户端建立连接后,它会定期发送心跳包来确认连接仍然有效。如果在一段时间内没有收到心跳回复,RabbitMQ 将会关闭连接。心跳属于ConnectionParameters参数heartbeat,我理解是应该用于生产者,确保能够成功发送消息,如果消费者中设置了heartbeat,一定要大于消费程序的处理时间,保证消费期间结束后,可以响应心跳。

parameters = pika.ConnectionParameters(host, int(port), '/', credentials=userx, heartbeat=int(heartbeat))

如果消费者使用心跳,还可以参考这个博客

4.重试连接:消费ack确认前连接异常断开时

这个需要开启链接断开的重试,属于ConnectionParameters的retry_delay和connection_attempts参数。rabbitmq重启,消费者中使用heartbeat时间不足以覆盖消费时间。

connectionParameters = pika.ConnectionParameters(
    host='localhost',
    virtual_host=5672,
    credentials=credentials,
    socket_timeout=10,
    heartbeat=0,
    retry_delay=10, # 连接尝试重连间隔
    connection_attempts=10, # 连接尝试次数
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/279407.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Qt Designer中各个模块的详细介绍,小白一看就会!!第3部分——Item Views (Model-Based) 模块介绍

Item Views (Model-Based) 模块的详细介绍 在Qt Designer中,Item Views (Model-Based) 模块是一组基于模型/视图(Model/View)架构的控件,用于展示和操作数据。这些控件与数据模型紧密结合,使得数据展示变得更加灵活和…

关于Sql数据库中去掉字段的所有空格

这篇文章主要介绍了Sql数据库中去掉字段的所有空格小结篇,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下 − Sql数据库中去掉字段的所有空格 字符前的空格,用ltrim(string) 字符…

oracle-sga-shared_pool

shared pool 缓冲sql语句和执行计划 shared pool由三部分组成 free libray:缓存sql执行计划 row cathe :缓存数据字典 硬解析:1判断语法2判断对象是否存在3有没有权限4 从n个执行方案中选出最优解,生成执行计划,这一…

Collector收集器的高级用法

Collectors收集器的高级用法 场景1:获取关联的班级名称 原先如果需要通过关联字段拿到其他表的某个字段,只能遍历List匹配获取 for (Student student : studentList) {Long clazzId student.getClazzId();// 遍历班级列表,获取学生对应班级…

【VS】NETSDK1045 当前 .NET SDK 不支持将 .NET 6.0 设置为目标。

问题描述 报错 NETSDK1045 严重性代码说明项目文件行禁止显示状态错误NETSDK1045当前 .NET SDK 不支持将 .NET 6.0 设置为目标。请将 .NET 5.0 或更低版本设置为目标,或使用支持 .NET 6.0 的 .NET SDK 版本。RCSoftDrawMicrosoft.NET.TargetFrameworkInference.ta…

word中MathType公式编号

直接上效果图: 步骤如下: 安装MathTypeword中安装MathType选项卡。设置MathType选项卡添加分隔符插入公式,自动生成右编码 接下来介绍每一步。 文章目录 1. 安装MathType2. Word中安装MathType选项卡3. 配置MathType选项4. 添加分隔符5. 插…

vue连接本地服务器

vue 连接本地服务器做后端。 后端服务 使用springboot新建一个基于restful的接口,访问如下的地址,返回值。 vue构建 新建一个vue项目,安装访问服务器的插件。 npm install axios vue-axios --save 修改main.js使用axios,最终…

ssm基于vue框架的点餐系统的设计与实现+vue论文

基于vue框架的点餐系统的设计与实现 摘要 当下,正处于信息化的时代,许多行业顺应时代的变化,结合使用计算机技术向数字化、信息化建设迈进。传统的点餐信息管理模式,采用人工登记的方式保存相关数据,这种以人力为主的…

论文阅读——UniRepLKNet

UniRepLKNet: A Universal Perception Large-Kernel ConvNet for Audio, Video, Point Cloud, Time-Series and Image Recognition 当我们将一个33的conv添加到一个小卷积核ConvNet中时,我们预计它会同时产生三种效果——1)使感受野更大,2&am…

TSR勾画学习

1:勾画建议 文献:Scoring the tumor-stroma ratio in colon cancer: procedure and recommendations 主要讲述的是结肠癌(CRC)的勾画建议和流程。 1,切片选择建议: 原发肿瘤的载玻片最具侵袭性的部分(即常规病理学…

FileZilla工具的使用以及主动模式与被动模式

文章目录 前言:一、FileZilla的好处二、FileZilla的使用2.1下载地址:2.2在虚拟机上使用: 三、FileZilla的主动模式与被动模式 前言: FileZilla是一个功能强大、易于使用和安全的FTP解决方案,适用于个人用户和企业用户…

ssm基于Web的老年公寓信息管理系统论文

摘 要 互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱,出错率高,信息安全性差&#x…

Nature Machine Intelligence 人形机器人的层次化生成建模

2023年11月2日,德国英特尔研究院,英国伦敦大学学院和美国VERSES研究实验室的研究人员在《Nature Machine Intelligence》杂志发表了一篇题为“Hierarchical generative modelling for autonomous robots”的论文。 研究内容 人类通过规划、执行和…

关于“Python”的核心知识点整理大全51

目录 17.2.2 添加自定义工具提示 bar_descriptions.py 17.2.3 根据数据绘图 python_repos.py 17.2.4 在图表中添加可单击的链接 python_repos.py 17.3 Hacker News API hn_submissions.py 17.4 小结 往期快速传送门👆(在文章最后)&a…

Pytorch安装—CPU版(极速版)

准备工作: 下载python下载安装anaconda 一、anaconda创建名为pytorch的虚拟环境 #创建一个名为pytorch的虚拟环境 conda create create -n pytorch #通过以下语句进行检查: conda env list二、下载pytorch&测试 (Pytorch官网现在pip包…

欧洲十大跨境电商平台,自养号测评下单的重要性及优势

在欧洲站,用户体量非常庞大,这与近几年人们的消费习惯密不可分,越来越多的人开始网购,据欧盟委员的最新调研显示,在欧盟,近一半(42%)的中小企业通过在线市场销售产品和服务。 所以,逸居海外给大…

计算机网络【DNS】

DNS 基本概述 与 HTTP、FTP 和 SMTP 一样,DNS 协议也是应用层的协议,DNS 使用客户-服务器模式运行在通信的端系统之间,在通信的端系统之间通过下面的端到端运输协议来传送 DNS 报文。但是 DNS 不是一个直接和用户打交道的应用。DNS 是为因特…

Halcon阈值处理的几种分割方法threshold/auto_threshold/binary_threshold/dyn_threshold

Halcon阈值处理的几种分割方法 文章目录 Halcon阈值处理的几种分割方法1. 全局阈值2. 基于直方图的自动阈值分割方法3. 自动全局阈值分割方法4. 局部阈值分割方法5. var_threshold算子6 . char_threshold 算子7. dual_threshold算子 在场景中选择物体或特征是图像测量或识别的重…

Linux之缓冲区的理解

目录 一、问题引入 二、缓冲区 1、什么是缓冲区 2、刷新策略 3、缓冲区由谁提供 4、重看问题 三、缓冲区的简单实现 一、问题引入 我们先来看看下面的代码:我们使用了C语言接口和系统调用接口来进行文件操作。在代码的最后,我们还使用fork函数创建…

C#中使用is关键字检查对象是否与给定类型兼容

目录 一、定义 二、示例 三、生成 在程序的开发过程中经常会使用类型转换,如果类型转换不成功则会出现异常,从抛出异常到捕获并处理异常,无形中增加了系统的开销,而且太过频繁地处理异常还会严重地影响系统的稳定性。is关键字可…