【RabbitMQ的监听器容器Simple和Direct】 实现和场景区别

在Spring Boot中,RabbitMQ的两种监听器容器(SimpleMessageListenerContainerDirectMessageListenerContainer)在实现机制和使用场景上有显著差异。以下是它们的核心区别、配置方式及最佳实践:


在这里插入图片描述
Simple类型
在这里插入图片描述
Direct类型
在这里插入图片描述

一、核心区别

特性SimpleMessageListenerContainerDirectMessageListenerContainer
线程模型单线程管理所有消费者(线程池复用)每个消费者独立线程(更轻量级)
并发控制动态调整消费者线程池concurrentConsumers固定每个队列的消费者数量(consumersPerQueue
消息预取(Prefetch)高预取可能导致消息堆积低预取(默认1)更公平的消息分配
资源消耗高(长连接、线程池)低(按需创建线程)
适用场景长耗时任务、需动态扩缩容消费者、负载均衡场景高吞吐、低延迟、短任务 ;固定消费者数量、严格顺序处理场景
版本支持旧版默认(Spring AMQP 1.x)新版默认(Spring Boot 2.0+)

二、Spring Boot配置示例

1. 全局配置(application.yml)
spring:
  rabbitmq:
    listener:
      type: direct  # 可选 simple 或 direct
      simple:
        concurrency: 5  # 初始消费者数
        max-concurrency: 10  # 最大动态扩展数
        prefetch: 50  # 每次预取消息数
     type: direct
     direct:
       consumersPerQueue: 1          # 保持默认,确保顺序性
       missingQueuesFatal: true      # 生产环境建议开启,避免消息丢失
       acknowledge-mode: manual      # 推荐手动确认模式(更可控)
       retry:
         enabled: true
         max-attempts: 3             # 总尝试次数 = 初始消费 + 2次重试
         initial-interval: 2000ms    # 首次重试间隔
         multiplier: 2               # 指数退避策略
         max-interval: 10000ms       # 最大间隔保护
         stateless: false            # 必须设为 false(确保事务性操作)
         
重试过程示范
假设一个订单处理场景,消息内容为 {"orderId": 1001}:
首次消费尝试
消费者线程开始处理消息。
若业务逻辑抛出异常(如数据库连接失败),触发重试机制。
消息进入 retry 状态,等待 2秒。

第二次重试
间隔时间 = initial-interval * multiplier = 2s * 2 = 4s。
若仍失败,继续等待 4秒。

第三次重试
间隔时间 = 4s * 2 = 8s
(但不超过 max-interval 的 10s,即再有下一次,那么4s * 3 = 12s,超过了最大间隔10s,仍按最大间隔10s执行)。
最终失败后,根据配置执行以下操作之一:
手动确认模式:调用 basicNack(requeue=false),消息进入死信队列。
自动确认模式:抛出 AmqpRejectAndDontRequeueException 拒绝消息。

监控与告警建议
监控指标:
重试次数 (rabbitmq_listener_retry_count)
死信队列堆积量 (rabbitmq_queue_messages_dlx)

日志记录:
使用 MDC 记录消息 ID 和重试次数。
在最后一次重试失败时发送告警通知(如钉钉、Slack)。

2. 注解式监听器
@RabbitListener(queues = "myQueue", containerFactory = "simpleContainerFactory")
public void handleMessage(String payload) {
    // 业务逻辑
}
3. 自定义容器工厂
@Configuration
public class RabbitConfig {
    
    // Simple 容器工厂
    @Bean(name = "simpleContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(50);
        return factory;
    }

    // Direct 容器工厂
    @Bean(name = "directContainerFactory")
    public DirectRabbitListenerContainerFactory directFactory(ConnectionFactory connectionFactory) {
        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConsumersPerQueue(2);
        factory.setPrefetchCount(1);
        return factory;
    }
}

三、使用场景与最佳实践

1. 选择 Simple 容器的场景
  • 长耗时任务:如生成PDF报表、视频转码,需控制并发避免资源耗尽。
  • 复杂错误处理:需自定义重试策略(如RetryTemplate)和死信队列(DLQ)配置。
  • 动态负载均衡:通过调整concurrencymax-concurrency,自动扩展消费者线程应对流量高峰。
  • 高吞吐场景:结合prefetch批量拉取消息,减少网络开销(例如日志处理、批量任务)。
2. 选择 Direct 容器的场景
  • 高吞吐低延迟:如订单创建、秒杀系统,要求快速响应。
  • 资源敏感型应用:容器轻量,适合云环境或容器化部署(如K8s)。
  • 公平消息分发:低预取(prefetch=1)确保消息均匀分配给消费者。
  • 固定资源分配:需严格控制每个队列的消费者数量(如支付回调等关键业务)。
  • 顺序性要求:单个队列绑定固定消费者,保证消息顺序处理(如库存扣减)。
  • 精细化重试控制:通过retry配置实现自定义重试逻辑(如短信发送失败重试)。
3. 最佳实践
  1. 根据业务选择类型

    • 若需弹性伸缩,选择Simple监听器
    • 若需资源隔离或顺序保证,选择Direct监听器
  2. 预取值(Prefetch)调优

    • 高吞吐场景:增大prefetch减少网络交互(但可能增加内存压力)。
    • 低延迟场景:减小prefetch以快速响应新消息。
  3. 消息确认与重试

    • 使用manual确认模式,并在异常时调用channel.basicNack()触发重试或死信队列。
  4. 错误处理

    • 始终配置Dead Letter Exchange(DLX)和重试机制。
  5. 监控与线程管理

    • 监控消费者线程状态,避免Simple模式下线程数过高导致资源耗尽。
    • Direct模式下需评估队列数量与消费者配比,避免队列闲置。
    • 通过RabbitMQ Management控制台监控队列堆积情况,调整prefetch和并发数。
  6. 版本适配

    • Spring Boot 2.x+默认使用Direct,如需切换回Simple需显式配置。

四、常见问题

Q1: 消息堆积时如何选择容器?
  • 若消息处理快,用Direct并增加consumers-per-queue
  • 若处理慢,用Simple并逐步提升max-concurrency,同时优化业务逻辑。
Q2: 如何避免消息重复消费?
  • 确保业务逻辑幂等(如数据库唯一约束)。
  • 启用手动确认模式(acknowledge-mode: manual),在业务完成后手动ACK。
Q3: Direct容器为何有时效率低?
  • 检查prefetch是否过小(如默认1),适当增加以平衡吞吐和公平性。

listener的类型,其中simple和direct有不同的配置参数。比如,simple监听器可以设置并发消费者数量(concurrency和max-concurrency),而direct监听器则设置每个队列的消费者数量(consumers-per-queue)。这说明两者的并发处理方式不同,simple可能更适合动态调整消费者数量,而direct则固定每个队列的消费者数量。

单模式适合负载均衡,通过多个消费者处理同一队列的消息,而direct监听器可能更适合需要严格顺序或固定消费者的场景。

simple监听器提供更多的动态并发配置,适合需要横向扩展消费者的场景,而direct监听器则提供更固定的消费者数量配置,适合需要精确控制的场景。配置时需要注意各自的参数,如并发数、预取值、确认模式等。

通过合理选择容器类型和调优参数,可以显著提升RabbitMQ在Spring Boot中的性能和可靠性。建议结合压力测试和实际业务场景进行验证。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

yolo11自训练极简教程 - 训练侦测

去年我处理过的最后一个版本是yolov10.新年再次着手处理视频识别的工作。发现自清华的v10之后,去年下半年v11再次发布了。国内访问github有些问题,但是yolo网站是可以方便访问的: Train - Ultralytics YOLO Docs 国外的知识库做的很棒&…

大数据学习之SparkStreaming、PB级百战出行网约车项目一

一.SparkStreaming 163.SparkStreaming概述 Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming 是核心 Spark API 的扩展,支持实时数据…

day5QT套接字通信

Widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);objtimer new QTimer (this);//连接定时器的timeout信号到启动的槽函数//connect(objtimer,&…

【免费】2007-2020年各省医疗卫生支出数据

2007-2020年各省医疗卫生支出数据 1、时间:2007-2020年 2、来源:国家统计局、统计年鉴 3、指标:行政区划代码、地区名称、年份、医疗卫生支出 4、范围:31省 5、指标说明:地方财政医疗卫生支出是指地方ZF从其财政预…

本地基于GGUF部署的DeepSeek实现轻量级调优之二:检索增强生成(RAG)

前文,我们在本地windows电脑基于GGUF文件,部署了DeepSeek R1 1.5B模型,如果想在离线模式下加载本地的DeepSeek模型自行对进行训练时,是不能直接使用GGUF文件进行训练的,但是可以对模型进行微调,以下说的是第…

16vue3实战-----动态路由

16vue3实战-----动态路由 1.思路2.实现2.1创建所有的vue组件2.2创建所有的路由对象文件(与上述中的vue文件一一对应)2.3动态加载所有的路由对象文件2.4根据菜单动态映射正确的路由2.5解决main页面刷新的问题2.6解决main的第一个页面匹配显示的问题2.7根据path匹配menu 1.思路 …

WPS如何接入DeepSeek(通过JS宏调用)

WPS如何接入DeepSeek 一、文本扩写二、校对三、翻译 本文介绍如何通过 WPS JS宏调用 DeepSeek 大模型,实现自动化文本扩写、校对和翻译等功能。 一、文本扩写 1、随便打开一个word文档,点击工具栏“工具”。 2、点击“开发工具”。 3、点击“查看代码”…

前端快速生成接口方法

大家好,我是苏麟,今天聊一下OpenApi。 官网 : umijs/openapi - npm 安装命令 npm i --save-dev umijs/openapi 在根目录(项目目录下)创建文件 openapi.config.js import { generateService } from umijs/openapi// 自…

云消息队列 ApsaraMQ Serverless 演进:高弹性低成本、更稳定更安全、智能化免运维

如今,消息队列已成为分布式架构中不可或缺的关键服务,为电商、物联网、游戏和教育等行业,提供了异步解耦、集成、高性能和高可靠的核心价值。 过去一年,我们发布了云消息队列 ApsaraMQ 全系列产品 Serverless 化,面向…

Spring依赖注入方式

写在前面:大家好!我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正,感谢大家的不吝赐教。我的唯一博客更新地址是:https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油,冲鸭&#x…

Mysql索引失效的场景

对索引列使用函数或表达式,或参与计算(优化方法:将计算移到条件右侧:)例 优化 对索引列进行隐式类型转换,条件中的数据类型与索引列的数据类型不匹配,会进行隐式类型转换 以like 通配符开头索…

CTFHub-RCE系列wp

目录标题 引言什么是RCE漏洞 eval执行文件包含文件包含php://input读取源代码远程包含 命令注入无过滤过滤cat过滤空格过滤目录分隔符过滤运算符综合过滤练习 引言 题目共有如下类型 什么是RCE漏洞 RCE漏洞,全称是Remote Code Execution漏洞,翻译成中文…

算法学习笔记之并查集

简介 问题描述:将编号为1-N的N个对象划分为不相交集合,在每个集合中,选择其中的某个元素代表所在集合。 常见两种操作: 1.合并两个集合 2.查找某元素属于哪个集合 实现方法1 用编号最小的元素标记所在集合; 定义…

渗透利器工具:Burp Suite 联动 XRAY 图形化工具.(主动扫描+被动扫描)

Burp Suite 联动 XRAY 图形化工具.(主动扫描被动扫描) Burp Suite 和 Xray 联合使用,能够将 Burp 的强大流量拦截与修改功能,与 Xray 的高效漏洞检测能力相结合,实现更全面、高效的网络安全测试,同时提升漏…

菌贝:云南鸡枞菌走向世界的第一品牌

云南,这片神奇的土地,孕育了无数珍稀的野生菌,而鸡枞菌无疑是其中的佼佼者。它以其独特的口感和丰富的营养价值,被誉为“菌中之王”。在云南鸡枞菌的品牌化进程中,菌贝以其卓越的品质和广泛的影响力,成为云…

如何恢复使用 Command+Option+Delete 删除的文件:完整指南

在日常使用 Mac 时,我们经常会使用 CommandOptionDelete 组合键来快速删除文件。这种删除方式会将文件直接移出废纸篓,而不会经过废纸篓的中间步骤,因此文件看似被永久删除。然而,即使文件被这样删除,仍然有几种方法可…

windows生成SSL的PFX格式证书

生成crt证书: 安装openssl winget install -e --id FireDaemon.OpenSSL 生成cert openssl req -x509 -newkey rsa:2048 -keyout private.key -out certificate.crt -days 365 -nodes -subj "/CN=localhost" 转换pfx openssl pkcs12 -export -out certificate.pfx…

用户认证综合实验

实验需求 需求一:根据下表,完成相关配置 需求二:配置DHCP协议,具体要求如下 需求三:防火墙安全区域配置 需求四:防火墙地址组信息 需求五:管理员 为 FW 配置一个配置管理员。要求管理员可以通…

5.7.2 进度管理

文章目录 进度管理Gantt图PERT图 进度管理 进度安排:通过将项目分解成多个活动,分析活动间的依赖关系,估算工作量,分配资源,制定活动时序。 Gantt图 Gantt图横坐标表示时间,纵坐标表示不同任务。使用一条条…

MQTT(Message Queuing Telemetry Transport)协议(二)

下面为你详细介绍如何基于 TCP 协议对 MQTT 进行封装&#xff0c;包括实现思路、代码示例及代码解释。 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>…