SpringBoot集成RabbitMq,RabbitMq消费与生产,消费失败重发机制,发送签收确认机制

RabbitMq消费与生产,消费失败重发机制,发送确认机制,消息发送结果回执

  • 1. RabbitMq集成spring boot
    • RabbitMq集成依赖
    • RabbitMq配置
    • RabbitMq生产者,队列,交换通道配置,消费者示例
  • 2. RabbitMq消息确认机制
    • 消息确认机制分自动确认,和手动确认
  • 3. 消息重发机制
    • 消息重发配置
    • 消息重发如何触发
  • 4. 延时消息队列
  • 5. 接收返回结果队列
    • 尚未研究后续用到补充
  • 6. 遇到的报错
    • 启动报错 Channel shutdown: channel error; protocol method:

1. RabbitMq集成spring boot

  • RabbitMq集成依赖

           这里spring-boot依赖版本为2.3.7版本,RabbitMq集成amqp包,版本在spring-boot中有涵盖,不单独指明版本了。

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.7.RELEASE</version>
    </parent>
    
    <dependencyManagement>
      	<dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.7.RELEASE</version>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>	
    
    <dependencies>
       <!-- rabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    
    
  • RabbitMq配置

    spring:
      rabbitmq:
        # 基础项
        host:  ip地址
        port: 端口
        username:  用户名
        password: 密码
        # virtualhost需要提前在MQ的Web管理界面里手动创建,或者配置默认host"/"
        virtual-host: /
        # 生产者
        #确认消息已发送到交换机(Exchange)
        publisher-confirm-type: correlated
        #开启消息发送确认机制,默认为false
        #如果没有本条配置信息,当消费者收到生产者发送的消息后,生产者无法收到确认成功的回调信息
        publisher-confirms: true
        #支持消息发送失败返回队列,默认为false
        publisher-returns: true
        # 消费者
        listener:
          type: simple
          simple:
            #自动签收auto  手动 manual
            acknowledge-mode: auto
            #个字段一定要设置成 false 不然无法消费的数据不会进入死信队列的
            default-requeue-rejected: false
            prefetch: 1 #限制每次发送一条数据
            max-concurrency: 1 #启动消费者最大数量
            concurrency: 1 #同一个队列启动几个消费者
            retry:
              enabled: true #是否支持重试
              max-attempts: 3         # 最大重试次数,默认为3
              initial-interval: 30s # 重试间隔时间,默认1000(单位毫秒)
              max-interval: 120s # 重试最大间隔
              # 时间间隔的乘子,下一次间隔的时间=间隔时间 × 乘子,但最大不超过重试最大间隔
              multiplier: 1	
    
    	```
    
    
  • RabbitMq生产者,队列,交换通道配置,消费者示例

    Exchange 交换机配置

    @Component
    public class DnfxExchangeConfig {
    
        @Autowired
        RabbitMqConfig rabbitMqConfig;
        /**
         * topic交换机起名
         * 如果rabbitmq设置的类型是topic 就用topic类型的Exchange
         *
         * @return
         */
        @Bean
        TopicExchange dnfxOrderExchange() {
            return new TopicExchange(rabbitMqConfig.getFxexchange());
        }
    }
    

    队列queue配置

    @Component
    public class DnfxQueueConfig {
        @Autowired
        RabbitMqConfig rabbitMqConfig;
    
        /**
         * 队列起名
         *
         * @return
         */
        @Bean
        public Queue dnfxOrderQueue() {
            Map<String, Object> argsMap = new HashMap<String, Object>();
            //队列优先级  argsMap.put("x-max-priority", 5);
             //true 是否持久 
            return new Queue(rabbitMqConfig.getFxqueue(), true, false, false, argsMap);
        }
    }
    

    将队列和交换机绑定, 并设置用于匹配键

     @Component
    public class DnfxRoutingConfig {
        @Autowired
        RabbitMqConfig rabbitMqConfig;
    
        @Autowired
        DnfxQueueConfig queueConfig;
    
        @Autowired
        DnfxExchangeConfig exchangeConfig;
    
        /**
         * 绑定:将队列和交换机绑定, 并设置用于匹配键 myDirectRouting
         *
         * @return
         */
        @Bean
        Binding bindingOrderRouting() {
            return BindingBuilder.bind(queueConfig.dnfxOrderQueue()).to(exchangeConfig.dnfxOrderExchange()).with(rabbitMqConfig.getFxrouting());
        }
    }
    

    配置加载

    @Configuration
    @ConfigurationProperties(prefix = "xx.mq")
    @Data
    public class RabbitMqConfig {
        private String fxqueue;
    
        private String fxexchange;
    
        private String fxrouting;	
    }	
    

    RabbitTemplate

    @Configuration
    public class DnfxRabbitMqConfig {
        @Autowired
        RabbitMqConfig rabbitMqConfig;
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            return new RabbitTemplate(connectionFactory);
        }
    }
    

    生产者

    @Component
    public class DemoTestProduce {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        RabbitMqConfig rabbitMqConfig;
    
        public void sendDemoMsg() {
            String message = "测试消息发送";
            rabbitTemplate.convertAndSend(rabbitMqConfig.getFxexchange(), rabbitMqConfig.getFxrouting(), message);
        }
    }
    

    消费者

    @Component
    public class DnfxAliBbMessageListener {
    
        private final static Logger logger = LoggerFactory.getLogger(DnfxAliBbMessageListener.class);
    
        @RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${xx.mq.fxqueue}")
        public void listenSimpleQueueMessage(String msg) throws IOException {
            logger.info("接收到的1688回执消息:{}", msg);
        }
    }
    

2. RabbitMq消息确认机制

  • 消息确认机制分自动确认,和手动确认

    消息确认签收配置

      # 消费者
    listener:
      type: simple
      simple:
        #自动签收 auto  手动 manual
        acknowledge-mode: auto
    

           消息确认签收机制不过多赘述,网上有大把说明,这里简单描述一下,以及记录一下个人使用心得。
           acknowledge-mode: auto 配置为自动签收时候,消息送达至消费者手上后,Mq自动签收,并移除消息出消息队列。
           acknowledge-mode: false 配置为手动签收时候,消息送达至消费者后,消费者需要手动触发签收动作,如果消费者没有发送ACK消息,RabbitMQ服务器就会认为该消息还没有被消费,会将该消息重新发送给其他消费者。例如下图,手动签收模式,没有主动向MQ发送签收讯息,那么当前消费的这条消息会被标记为Unacked
    在这里插入图片描述
    关于签收 确认机制可以参考 https://blog.csdn.net/qq_42331185/article/details/131696949 ,这里贴部分这个博主的结论

    	@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${xx.mq.fxqueue}")
        public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {
            String msgBody = new String(message.getBody());
            logger.info("接收到的1688回执消息:{}", msgBody);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicReject(deliveryTag, false);
    

    deliveryTag:消息传递标签,格式为序列号,必须使用这个标签,不然信道会关闭,详情下面会说到
    multiple:为true则表示序号deliverTag之前的消息均被确认或拒绝(basicNack),false表示当前消息。为true的时候就可以做到批量确认
    requeue:为true表示,失败的消息将会重新排队,不会丢弃或者死信,为false则表示丢弃

     1、消息成功签收  basicAck(deliveryTag,multiple)
     channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
     
     2、失败确认 basicNack(deliveryTag,multiple,requeue)
     channel.basicNack(message.getEnvelope().getDeliveryTag(),false, true);
     
     3、失败确认:basicReject(deliveryTag,requeue)
     channel.basicReject(message.getEnvelope().getDeliveryTag(), true);
    

    注:关于以上手动确认multiple属性为true时,批量确认这个元素个人未进行验证,失败确认requeue为true时,当前消息会重新丟至MQ队列中,等待下次消费(已验证)
           关于消息确认机制,自动确认可能导致消息丢失,如果单条消息发送至消费者后,消费者处理报错,最多触发消息重发机制,重发达到重发上限后,便会抛弃此消息,造成消息丢失。
           手动确认签收,千万不要在cath中或者final中进行失败重发签收,即basicNack basicReject 失败签收时requeue 为true,否则当前消息若真为异常消息,此消息会一直消费,失败签收,重新排队,进行循环,导致消息积压或者资源浪费

3. 消息重发机制

  • 消息重发配置

           注意: 如果遗漏 max-interval multiplier两个属性,消息重发机制仍会生效,但是重发间隔时间为默认10秒重发, initial-interval 重发间隔时间将不会生效。此处已验证,尚未确认是bug或者本身就是联动配置

     # 消费者
    listener:
      type: simple
      simple:
        retry:
          enabled: true #是否支持重试
          max-attempts: 3         # 最大重试次数,默认为3
          initial-interval: 30s # 重试间隔时间,默认1000(单位毫秒)
          max-interval: 120s # 重试最大间隔
          # 时间间隔的乘子,下一次间隔的时间=间隔时间 × 乘子,但最大不超过重试最大间隔
          multiplier: 1
    
  • 消息重发如何触发

            消息重发机制,与消息确认签收机制是两种不同的机制,这个概念不要弄混了,消息确认签收机制亦可以将消息重新放入队列进行二次消费
           消息重发机制,在消费者进行消费时,如果rabbitmq开启了消息重发机制,当消费者处理消息时候抛出了异常,即触发消息重发机制,注意,处理消息逻辑不要用try-catch捕捉异常,异常被捕捉后,会抛出异常信息,但不会影响代码正常执行,amqp aop会视为正常消费,不会触发重发机制。

    	@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "${zcwl.mq.fxqueue}")
        public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {
            String msgBody = new String(message.getBody());
            logger.info("接收到的1688回执消息:{}", msgBody);
    
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            //此处会抛出异常
            int a = 1 / 0;
            //确认签收机制为手动签收,一定要进行签收,否则触发重发机制后,此条消息仍会被标记为unacked
            channel.basicReject(deliveryTag, false);
    

4. 延时消息队列

       延时消息队列需要配合RabbitMq延时消息队列插件使用,安装延时消息队列插件此处不赘述,网上搜一大把
       延时消息队列创建队列以及绑定key时没什么特殊的,在创建exchange交换机时,需要注意选项,如下图所示即可。
       x-delayed-type = redirect 如果不能创建,报错时,那么=topic也是可以的
在这里插入图片描述
注册exchange交换机时候,注意给入 x-delayed-type 参数,队列注册,以及队列交换机绑定与普通队列一样即可

	@Bean
    CustomExchange dnfxOrderDelayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "topic");
        return new CustomExchange(rabbitMqConfig.getFxOrderDelayExchange(), "x-delayed-message", true, false, args);
    }

测试发送延时消息方法,队列监听与普通消息一样即可

 public void sendDelayMsg() {
        System.out.println(LocalDateTime.now() + ":发送延时消息");
        String message = "这里是测试延时发送消息";
        this.rabbitTemplate.convertAndSend(rabbitMqConfig.getFxOrderDelayExchange(), rabbitMqConfig.getFxOrderDelayRouting(), message, message1 -> {
        	//delay的单位是毫秒
            message1.getMessageProperties().setDelay(1000 * 60);
            return message1;
        });
    }

5. 接收返回结果队列

尚未研究后续用到补充

6. 遇到的报错

  • 启动报错 Channel shutdown: channel error; protocol method:

    报错详情:

    Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'fx-bb-msg-exchange' in vhost '/': received 'direct' but current is 'topic', class-id=40, method-id=10)
    

           此错误为注册交换机时候抛出的错误,错误信息为注册交换机的属性,与RabbitMq已经创建好的交换机属性不一致,程序试图修改属性报错。
           错误示范:
           当前exchange交换机创建时候,创建的类型Type为topic类型,在注册exchange交换机时,返回的却是DirectExchange,那么系统便会尝试修改属性,从而引发报错
    在这里插入图片描述
    在这里插入图片描述
           修复方式:
           创建时,返回TopicExchange即可,与 创建的交换机类型保持一致
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/316204.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PHP项目添加分布式锁,这里是ThinkPHP8框架实现分布式锁

背景&#xff1a;公司旧项目&#xff0c;最初访问量不多&#xff0c;单机部署的。后来&#xff0c;访问量上来了&#xff0c;有阵子很卡&#xff0c;公司决定横向扩展&#xff0c;后端代码部署了三台服务器。部署调整后&#xff0c;有用户反馈&#xff0c;一个订单支付了三次。…

C/S架构,集成三维影像后处理功能,自主版权的一套医院PACS系统源码

一、PACS简介 PACS&#xff08;PictureArchivingandCommunicationsSystem&#xff09;即图像存储与传输系统&#xff0c;是应用于医院的数字医疗设备如CT、MR&#xff08;磁共振&#xff09;、US&#xff08;超声成像&#xff09;、X光机、DSA&#xff08;数字减影&#xff09…

基于卡尔曼滤波的声源跟踪方法研究

基于卡尔曼滤波的声源跟踪方法研究 摘 要一、研究意义二、研究内容三、算法介绍3.1基于到达时间差的定位算法3.1.1算法原理介绍3.1.2仿真实验设计与分析 3.2扩展卡尔曼滤波算法3.2.1算法的基本原理3.2.2仿真实验及分析 3.3无迹卡尔曼滤波算法3.3.1算法的基本原理3.3.2仿真实验及…

VGAN实现视网膜图像血管分割(基于pytorch)

背景介绍 VGAN&#xff08;Retinal Vessel Segmentation in Fundoscopic Images with Generative Adversarial Networks&#xff09;出自2018年的一篇论文&#xff0c;尝试使用生成性对抗网络实现视网膜血管分割的任务,原论文地址&#xff1a;https://arxiv.org/abs/1706.0931…

用通俗易懂的方式讲解:十分钟读懂 Stable Diffusion 运行原理

AIGC 热潮正猛烈地席卷开来&#xff0c;可以说 Stable Diffusion 开源发布把 AI 图像生成提高了全新高度&#xff0c;特别是 ControlNet 和 T2I-Adapter 控制模块的提出进一步提高生成可控性&#xff0c;也在逐渐改变一部分行业的生产模式。惊艳其出色表现&#xff0c;也不禁好…

大语言模型下载,huggingface和modelscope加速

huggingface 下载模型 如果服务器翻墙了&#xff0c;不用租机器 如果服务器没翻墙&#xff0c;可以建议使用下面的方式 可以租一台**autodl**不用显卡的机器&#xff0c;一小时只有1毛钱&#xff0c;启动学术加速&#xff0c;然后下载&#xff0c;下载完之后&#xff0c;用scp…

Java重修第五天—面向对象2

通过学习本篇文章可以掌握如下知识 static&#xff1b;设计单例&#xff1b;继承。 之前文章我们已经对面向对象进行了入门学习&#xff0c;这篇文章我们就开始深入了解面向对象设计。 static 我们定义了一个 Student类&#xff0c;增加姓名属性&#xff1a;name &#xff1…

Spring Task 任务调度工具

大家好我是苏麟 , 今天聊聊Spring Task 任务调度工具 Spring Task Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 什么是定时任务 ? 通过时…

009-Zynq基操之如何去玩转PL向PS的中断(对新手友好,走过路过千万不要错过)

文章目录 前言一、PL-PS的中断是啥&#xff1f;二、PL-PS端中断详细步骤1.ZYNQ核配置2.PS端中断函数配置3.需要拓展多个中断函数 总结 前言 本设计跟我的ZYNQ实战合集专栏中的脉冲触发电路有关系&#xff0c;也正好趁这个机会讲述一下PL-PS的中断系统&#xff0c;如何去触发中…

为什么不直接public,多此一举用get、set,一文给你说明白

文章目录 1. 封装性&#xff08;Encapsulation&#xff09;2. 验证与逻辑处理3. 计算属性&#xff08;Computed Properties&#xff09;4. **跟踪变化&#xff08;Change Tracking&#xff09;5. 懒加载与延迟初始化&#xff08;Lazy Initialization&#xff09;6. 兼容性与未来…

【Leetcode】2182. 构造限制重复的字符串

文章目录 题目思路代码 题目 2182. 构造限制重复的字符串 问题&#xff1a;给你一个字符串 s 和一个整数 repeatLimit &#xff0c;用 s 中的字符构造一个新字符串 repeatLimitedString &#xff0c;使任何字母 连续 出现的次数都不超过 repeatLimit 次。你不必使用 s 中的全…

高效便捷的远程管理利器——Royal TSX for Mac软件介绍

Royal TSX for Mac是一款功能强大、操作便捷的远程管理软件。无论是远程桌面、SSH、VNC、Telnet还是FTP&#xff0c;用户都可以通过Royal TSX轻松地远程连接和管理各种服务器、计算机和网络设备。 Royal TSX for Mac提供了直观的界面和丰富的功能&#xff0c;让用户能够快速便…

新版云进销存ERP销售库存仓库员工管理系统源码

新版云进销存ERP销售库存仓库员工管理系统源码 系统介绍&#xff1a;2022版本,带合同报价单打印&#xff0c;修复子账号不显示新加客户的BUG&#xff0c;还有其他方面的优化。 简单方便。 功能强大&#xff0c;系统采用phpMYSQL开发&#xff0c;B/S架构&#xff0c;方便随地使用…

红队打靶练习:HOLYNIX: V1

目录 信息收集 1、arp 2、netdiscover 3、nmap 4、nikto whatweb 目录探测 1、gobuster 2、dirsearch 3、dirb 4、feroxbuster WEB sqlmap 1、爆库 2、爆表 3、爆列 4、爆字段 后台登录 1、文件上传 2、文件包含 3、越权漏洞 反弹shell 提权 总结 信息…

SpringSecurity入门demo(二)表单认证

上一篇博客集成 Spring Security&#xff0c;使用其默认生效的 HTTP 基本认证保护 URL 资源&#xff0c;下面使用表单认证来保护 URL 资源。 一、默认表单认证&#xff1a; 代码改动&#xff1a;自定义WebSecurityConfig配置类 package com.security.demo.config; import or…

最新AI绘画Midjourney绘画提示词Prompt大全

一、Midjourney绘画工具 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭…

国内开源环境漫谈

我国开源软件产业相较于欧美发达国家而言起步相对较晚&#xff0c;开源项目很少超过五年&#xff0c;开发者较年轻。国外很多开源项目都是10年以上的规划与投入。在开源社区发展初期、发展期、协作期、结晶期与流行期的五个阶段中&#xff0c;中国的开源社区平台大多处于前三个…

imgaug库指南(14):从入门到精通的【图像增强】之旅

引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和资源。正因如此&#xff0c;数据增强技术应运而生&#xff0c;成为了解决这一问题的…

【服务器数据恢复】服务器硬盘磁头损坏的数据恢复案例

服务器硬盘故障&#xff1a; 一台服务器上raid阵列上有两块硬盘出现故障&#xff0c;用户方已经将故障硬盘送到其他机构检测过&#xff0c;其中一块硬盘已经开盘&#xff0c;检测结果是盘片损伤严重&#xff1b;另一块硬盘尚未开盘&#xff0c;初步判断也存在硬件故障&#xff…

代码随想录 Leetcode160. 相交链表

题目&#xff1a; 代码(首刷看解析 2024年1月13日&#xff09;&#xff1a; class Solution { public:ListNode *getIntersectionNode(ListNode *headA, ListNode *headB) {ListNode *A headA, *B headB;while (A ! B) {A A ! nullptr ? A->next : headB;B B ! nullpt…