RabbitMQ之消费者并发消费

为什么要引入消费者的并发消费?
当生产者的推送速度是远远超过消费者的能力的,可以提高消费者的消费速度。比如在java中我们可以启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度,在mq中也可以通过设置配置。
@RabbitListener 注解中,有 concurrency 属性,它可以指定并发消费的线程数。
下面演示消费者并发消费实现
配置文件yml

spring:
  rabbitmq:
  	#host为一般模式 若集群模式 将key换成addresses的形式
    host: 192.168.9.104
    port: 5672
    #账号密码自行替换
    username: admin
    password: admin
    listener:
      # 选择的 ListenerContainer 的类型。默认为 simple 类型
      type: simple
      simple:
        # 每个 @ListenerContainer 的并发消费的线程数
        concurrency: 2
        # 每个 @ListenerContainer 允许的并发消费的线程数
        max-concurrency: 10 

rabbitmq.listener.type 的枚举值可以参考 ContainerType

SIMPLE 对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两类线程:

Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑

==================================》配置类代码
@Configuration
public class DirectExchangeConfiguration {
    /**
     * 创建一个 Queue
     *
     * @return Queue
     */
    @Bean
    public Queue queue08() {
        // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除
        return new Queue(
                Message08.QUEUE,
                true,
                false,
                false);
    }

    /**
     * 创建 Direct Exchange
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange exchange08() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message08.EXCHANGE,
                true,
                false);
    }

    /**
     * 创建 Binding
     * Exchange:Message08.EXCHANGE
     * Routing key:Message08.ROUTING_KEY
     * Queue:Message08.QUEUE
     *
     * @return Binding
     */
    @Bean
    public Binding binding08() {
        return BindingBuilder
                .bind(queue08()).to(exchange08())
                .with(Message08.ROUTING_KEY);
    }
========================》生产者代码
@Component
public class Producer08 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void syncSend(String id, String routingKey) {
        // 创建 Message08 消息
        Message08 message = new Message08();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(Message08.EXCHANGE, routingKey, message);
    }
}
===========================》消费者代码
@Component
// 开启并发消费
@RabbitListener(queues = Message08.QUEUE, concurrency = "2")
@Slf4j
public class Consumer08 {

    @RabbitHandler
    public void onMessage(Message08 message) throws InterruptedException {
        log.info("[{}][Consumer08 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 模拟消费耗时,为了让并发消费效果更好的展示
        TimeUnit.SECONDS.sleep(1);
    }
}
@Resource
    Producer08 producer08;

    @Test
    void mock() throws InterruptedException {
        TimeUnit.SECONDS.sleep(20);
    }

    @SneakyThrows
    @Test
    void syncSend() {
        // 循环发送十个,观察消费者情况
        for (int i = 0; i < 10; i++) {
            String id = UUID.randomUUID().toString();
            producer08.syncSend(id, Message08.ROUTING_KEY);
        }
        log.info("[{}][test producer08 syncSend] 发送成功", LocalDateTime.now());
        // 这里多睡一会,确保消息全部消费完成
        TimeUnit.SECONDS.sleep(10);
    }

以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/600973.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Momentum靶机系列Momentum2

先进行arp扫描&#xff1a; 获得渗透靶机的IP&#xff1a;192.168.13.142 扫描一下靶机的使用的端口&#xff1a; 具有tcp端口和http服务的80端口 可以扫描一下80端口的http服务&#xff1a; 可以发现一个网站&#xff1a;http://192.168.13.142 打开该网址&#xff1a; 查看…

error code [1449]; The user specified as a definer (‘root‘@‘%‘) does not exist

其实就是说我的root用户权限不够&#xff0c;那就要加上权限&#xff0c;网上其他地方也有好多处理办法&#xff0c;但是要注意数据库版本。我用的是MySQL8.0.32版本&#xff0c;我是这样处理的&#xff0c;简单可行&#xff1a; GRANT ALL ON *.* TO root% ;FLUSH PRIVILEGES…

当AI遇见现实:数智化时代的人类社会新图景

文章目录 一、数智化时代的机遇二、数智化时代的挑战三、如何适应数智化时代《图解数据智能》内容简介作者简介精彩书评目录精彩书摘强化学习什么是强化学习强化学习与监督学习的区别强化学习与无监督学习的区别 前言/序言 随着科技的日新月异&#xff0c;我们步入了一个前所未…

爬虫学习:XPath匹配网页数据

目录 一、安装XPath 二、XPath的基础语法 1.选取节点 三、使用XPath匹配数据 1.浏览器审查元素 2.具体实例 四、总结 一、安装XPath 控制台输入指令&#xff1a;pip install lxml 二、XPath的基础语法 XPath是一种在XML文档中查找信息的语言&#xff0c;可以使用它在HTM…

B端系统菜单栏中使用阿里图标

B端系统菜单栏中使用阿里图标 1.需求说明 由于组件库自带的图标数量和内容有限&#xff0c;采用丰富多样的阿里图标是不错的选择 2.阿里图标使用 2.1官网 iconfont-阿里巴巴矢量图标库 2.2使用 2.2.1.先根据关键词搜索并选择对应的图标 注意&#xff1a;若只是少量的sv…

自动驾驶学习1-超声波雷达

1、简介 超声波雷达&#xff1a;利用超声波测算距离的雷达传感器装置&#xff0c;通过发射、接收 40kHz、48kHz或 58kHz 频率的超声波&#xff0c;根据时间差测算出障碍物距离&#xff0c;当距离过近时触发报警装置发出警报声以提醒司机。 超声波&#xff1a;人耳所不能听到的…

FMEA助力智能电网升级:构建安全、高效、可靠的电力网络

随着科技的不断进步&#xff0c;智能电网已成为现代电力行业的重要发展方向。而在这个过程中&#xff0c;FMEA&#xff08;失效模式和影响分析&#xff09;作为一种重要的质量管理工具&#xff0c;正日益发挥着其在智能电网建设中的赋能作用。本文将从FMEA的基本概念出发&#…

Study--Oracle-02-单实例部署Oracle19C

一、CentOS 7 环境准备 1、软件准备 操作系统&#xff1a;CentOS 7 数据库版本: Oracle19C 2、操作系统环境配置 关闭selinux &#xff0c;编辑 /etc/selinux/config文件&#xff0c;设置SELINUX enforcing 为SELINUXdisabled [rootoracle ~]# grep SELINUX /etc/seli…

顺序表的实现(迈入数据结构的大门)

什么是数据结构 数据结构是由&#xff1a;“数据”与“结构”两部分组成 数据与结构 数据&#xff1a;如我们所看见的广告、图片、视频等&#xff0c;常见的数值&#xff0c;教务系统里的&#xff08;姓名、性别、学号、学历等等&#xff09;&#xff1b; 结构&#xff1a;当…

python网络爬虫学习——编写一个网络爬虫

参考资料&#xff1a;用Python写网络爬虫&#xff08;第2版&#xff09; 1、编写一个函数 &#xff08;1&#xff09;用于下载网页&#xff0c;且当下载网页发生错误时能及时报错。 # 导入库 import urllib.request from urllib.error import URLError,HTTPError,ContentTooS…

Golang 开发实战day12 - Pointer

&#x1f3c6;个人专栏 &#x1f93a; leetcode &#x1f9d7; Leetcode Prime &#x1f3c7; Golang20天教程 &#x1f6b4;‍♂️ Java问题收集园地 &#x1f334; 成长感悟 欢迎大家观看&#xff0c;不执着于追求顶峰&#xff0c;只享受探索过程 Golang 开发实战day12 - 指针…

Hive读写文件机制

Hive读写文件机制 1.SerDe是什么&#xff1f; SerDe是Hive中的一个概念&#xff0c;代表着“序列化/反序列化” &#xff08;Serializer/Deserializer&#xff09;。 SerDe在Hive中是用来处理数据如何在Hive与底层存储系统&#xff08;例如HDFS&#xff09;之间进行转换的机制…

Xinstall广告效果监测,助力广告主优化投放策略

在移动互联网时代&#xff0c;APP推广已成为企业营销的重要手段。然而&#xff0c;如何衡量推广效果&#xff0c;了解用户来源&#xff0c;优化投放策略&#xff0c;一直是广告主和开发者面临的难题。这时&#xff0c;Xinstall作为国内专业的App全渠道统计服务商&#xff0c;以…

SpringBoot项目部署到阿里云服务器

部署步骤 步骤分以下&#xff1a; 将SpringBoot项目打包Linux上准备好Java环境、可用的MySql数据库项目上传到服务器启动项目停止项目 1.SpringBoot项目打包 数据库的链接&#xff0c;账户和密码需要和Linux上一致。 如上图打包即可。 2.Linux上准备好Java环境以及Mysql环境…

微生物群落构建(community assembly)

Introduction Zhou, J. & Ning, D. Stochastic Community Assembly: Does It Matter in Microbial Ecology? Microbiol Mol Biol Rev 81, e00002-17 (2017). This review is very comprehensive (1)&#xff01; 周集中老师实验室的长期研究兴趣集中在从基因组到生态系统…

ZIP压缩输出流(将ZIP文件解压)

文章目录 前言一、ZIP压缩输出流是什么&#xff1f;二、使用介绍 1.使用方法2.实操展示总结 前言 该篇文章相对应的介绍如何使用java代码将各种文件&#xff08;文件夹&#xff09;从ZIP压缩文件中取出到指定的文件夹中。解压流将ZIP文件中的文件以条目的形式逐一读取&#xff…

WMS仓储管理系统库存分类的详细讲解

在当今日益复杂和快速变化的商业环境中&#xff0c;仓库管理成为了一个企业不可或缺的关键环节。WMS仓储管理系统解决方案凭借其自动化和信息化的优势&#xff0c;为企业带来了革命性的改变&#xff0c;特别是在库存分类方面。接下来&#xff0c;我们将深入探讨WMS仓储管理系统…

LLMs之GPT4ALL:GPT4ALL的简介、安装和使用方法、案例应用之详细攻略

LLMs之GPT4ALL&#xff1a;GPT4ALL的简介、安装和使用方法、案例应用之详细攻略 目录 GPT4ALL的简介 0、新功能 1、特点 2、功能 3、技术报告 GPT4ALL的安装和使用方法 1、安装 2、使用方法 GPT4ALL的案例应用 LLMs之LLaMA3&#xff1a;基于GPT4ALL框架对LLaMA-3实现…

【笔记】Anaconda命令提示符(Anaconda Prompt)操作

通过anaconda配置python环境有时需要conda安装一些包或者文件&#xff0c;这里作为一个笔记记录如何打开Anaconda命令提示符&#xff08;Anaconda Prompt&#xff09;&#xff0c;并用conda操作 1.打开Anaconda命令提示符&#xff08;Anaconda Prompt&#xff09; 可直接在搜…

如何获得一个Oracle 23ai数据库(RPM安装)

准确的说&#xff0c;是Oracle 23ai Free Developer版&#xff0c;因为企业版目前只在云上&#xff08;OCI和Azure&#xff09;和ECC上提供。 方法包括3种&#xff0c;本文介绍第2种&#xff1a; Virtual ApplianceRPM安装Docker RPM安装支持Linux 8和Linux 9。由于官方的Vi…