第十一章 RabbitMQ之消费者确认机制

目录

一、介绍

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

2.1.2. 生产者主要代码 

2.1.3. 消费者主要代码 

2.1.4. 运行效果 

2.2. manual:手动模式

2.3. auto:自动模式 


一、介绍

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

ack:成功处理消息,RabbitMQ从队列中删除该消息

nack:消息处理失败,RabbitMQ需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

none:不处理 消息投递给消费者后立刻ack 消息立刻从MQ删除(非常不安全不建议使用)

manual:手动模式 即手动ack或reject,需要在业务代码结束后,调用api发送ack,但是这种有代码入侵,不建议使用。

auto:自动模式 SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:

1. 如果是业务异常,会自动返回nack

2. 如果是消息处理或校验异常,自动返回reject

Spring默认未我们设定的是auto 自动模式,符合我们实际项目的需求。 

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack


2.1.2. 生产者主要代码 

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");
    }
}

2.1.3. 消费者主要代码 

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "simple.queue")
    public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
        throw new Exception();
    }
}

2.1.4. 运行效果 

 

我们可以看到,当生产者投递到MQ的那一刻,会立刻返回ACK,此刻消费者的业务逻辑未执行完。

2.2. manual:手动模式

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack


我们定义了一个SimpleMessageListenerContainer,并为它设置了一个ChannelAwareMessageListener。在监听器内部,我们实现了消息的接收和处理,并在处理完成后使用channel.basicAck方法手动发送一个确认消息给RabbitMQ,表明消息已被消费。如果在处理消息时发生异常,我们可以使用channel.basicReject方法拒绝该消息,以便RabbitMQ可以将其重新排队或者进行其他配置的处理。 

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("yourQueueName"); // 设置监听的队列名称
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                try {
                    // 消息处理逻辑
                    System.out.println("Received message: " + new String(message.getBody()));
 
                    // 确认消息已被成功处理
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    // 出现异常,拒绝该消息
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        });
        return container;
    }
}

2.3. auto:自动模式 

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack


当生产者投递到MQ后消费者在消费过程中发生业务异常,MQ会将它标记为Unacked,后续会一直投递该消息,直到消费成功为止。

 

下图看到有两条消息,其中一条是第一次投递失败重新投递的消息: 

至此我们思考一下,实际项目中我们推荐采用Spring AMQP为我们实现的auto 自动模式确认机制,虽然看上去我们的系统设计简单了,但是对于如果我们业务代码出现异常,消息在消费过程中执行一直失败,那么RabbitMQ后续会一直投递该消息,这期间异常消息如果一直消费不了,循环投递就会给我们系统造成极大的压力负担,这该怎么解决?下一章将给大家讲解失败消息的处理策略!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/891067.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络】详谈TCP协议确认应答机制捎带应答机制超时重传机制连接管理机制流量管理机制滑动窗口拥塞控制延迟应答

一、TCP 协议段格式 1.1、4位首部长度 4位首部长度的基本单位是4字节,也就是说如果4位首部长度填6,那报头长度就是24字节。报头长度的取值范围为[0,60]字节,也就是说选项的最大长度为40字节。 二、确认应答机制 发送数据和发送应答&#x…

CSS 入门

1. CSS 1.1 概念 CSS(Cascading Style Sheet),层叠样式表,用于控制页面的样式 CSS 能够对网页中元素位置的排版进行像素级精确控制,实现美化页面的效果,能够做到页面的样式和结构分离(类似于…

Redis总结(官方文档解读)

定义 Redis是一个开源的,基于内存的数据结构存储系统,可以用作数据库、缓存和消息中间件。 特征 高性能 支持丰富的数据类型 丰富的操作类型,操作是原子性的 支持周期性持久化 支持分布式 开源免费,社区活跃 数据类型 数据…

基础篇:带你打开Vue的大门(一)

学习目标: 理解Vue的基本概念:掌握Vue.js是什么,它的设计理念,以及它在现代Web开发中的应用。掌握Vue的基本语法:学习Vue的基础指令和语法,能够使用Vue构建简单的交互式界面。熟悉Vue组件化开发&#xff1…

vue3--通用 popover 气泡卡片组件实现

背景 在日常开发中,我们一般都是利用一些诸如:element-ui、element-plus、ant-design等组件库去做我们的页面或者系统 这些对于一些后台管理系统来说是最好的选择,因为后台管理系统其实都是大同小异的,包括功能、布局结构等 但是对于前台项目,比如官网、门户网站这些 …

【银行科技岗】相关考试知识点总结及部分考题

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 一、**网络与安全**二、**软件开发与设计**三、**数据库与数据管理**四、**编程与系统**五、**计算机硬件与性能**六、**大数据与人工智能**七、**系统与应用**相关…

人型机器人技术及前景详解

随着科技的飞速发展,人型机器人作为人工智能领域的一个重要分支,正逐步走进我们的生活和工作之中。它们不仅在外形上模拟人类,更在感知、决策、行为和交互能力上展现出强大潜力。本文将深入探讨人型机器人的技术原理、当前发展状况以及未来的…

数据结构-排序2

1.快速排序 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法,其基本思想为: 任取待排序元素序列中 的某元素作为基准值,按照该排序码将待排序集合分割成两子序列,左子序列中所有元素均小于基准值,右 子…

interwirelessac9560感叹号,电脑无法连接wifi,无法搜索到wifi

interwirelessac9560感叹号 电脑无法连接wifi,无法搜索到wifi 原因 这可能是wifl模块出现了问题。 解决方案 1、winx 打开,选择【设备管理器】 2、选择网络适配器 右键打开wireless-AC,选择【卸载设备】。 3、关机2分钟后&#xff0c…

【CSS】纯css3螺旋状loading加载特效

效果图 <div class"ai-loader"><div class"dot"></div><div class"dot"></div><div class"dot"></div><div class"dot"></div><div class"dot">&…

C语言复习概要(六)

公主请阅 1. 深入理解数组与指针在C语言中的应用1.1 数组名的理解 2. 使用指针访问数组3. 一维数组传参的本质4. 冒泡排序的实现5. 二级指针6. 指针数组7. 指针数组模拟二维数组8.总结 1. 深入理解数组与指针在C语言中的应用 数组与指针是C语言的核心概念之一&#xff0c;理解…

【安装JDK和Android SDK】

安装JDK和Android SDK 1 前言2 下载2.1 下载途径2.2 JDK下载和安装2.2.1 下载2.2.2 安装并配置环境变量2.2.3 验证 2.3 SDK下载和安装2.3.1 下载2.3.2 安装2.3.3 环境变量配置2.3.4 验证 1 前言 在软件开发中&#xff0c;Android应用开发通常使用Android Studio&#xff0c;但…

使用Milvus和Llama-agents构建更强大的Agent系统

代理&#xff08;Agent&#xff09;系统能够帮助开发人员创建智能的自主系统&#xff0c;因此变得越来越流行。大语言模型&#xff08;LLM&#xff09;能够遵循各种指令&#xff0c;是管理 Agent 的理想选择&#xff0c;在许多场景中帮助我们尽可能减少人工干预、处理更多复杂任…

typescript使用webpack打包编译问题

解决方案&#xff1a;在webpack.config.js中的mdule.exports中设置mode。 再次运行npm run start即可。

【软考】设计模式之中介者模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 适用性6. 优点 1. 说明 1.用一个中介对象来封装一系列的对象交互。2.中介者使各对象不需要显式地相互引用&#xff0c;从而使其耦合松散&#xff0c;而且可以独立地改变它们之间的交互。3.中介者模式&#xff08;Mediator Pattern&…

Copilot Coaching新功能铸就Word更强

Copilot 的意思是副驾驶。 现在&#xff0c;您的副驾驶教练来了&#xff1a;Copilot Coaching Copilot Coaching 是 Word 中的一项新 Copilot 功能&#xff0c;可在您查看内容时为您提供支持&#xff0c;以实现语法和拼写之外的改进 - 帮助您澄清想法&#xff0c;并为您提供有…

【优选算法】(第四十篇)

目录 岛屿数量&#xff08;medium&#xff09; 题目解析 讲解算法原理 编写代码 岛屿的最⼤⾯积&#xff08;medium&#xff09; 题目解析 讲解算法原理 编写代码 岛屿数量&#xff08;medium&#xff09; 题目解析 1.题目链接&#xff1a;. - 力扣&#xff08;LeetCo…

植物大战僵尸杂交版

最新版植物大战僵尸杂交版 最近本款游戏火爆 下载资源如下&#xff1a; win版本&#xff1a;2.3.7 链接&#xff1a;下载地址 提取码&#xff1a;9N3P Mac&#xff08;苹果版本&#xff09;&#xff1a;2.0.0 链接&#xff1a;下载地址 提取码&#xff1a;Bjaa 介绍&#xff…

【论文#码率控制】ADAPTIVE RATE CONTROL FOR H.264

目录 摘要1.前言2.基本知识2.1 蛋鸡悖论2.2 基本单元的定义2.3 线性MAD预测模型 3.GOP级码率控制3.1 总比特数3.2 初始化量化参数 4.帧级码率控制4.1 非存储图像的量化参数4.2 存储图像的目标比特 5.基本单元级码率控制6.实验结果7.结论 《ADAPTIVE RATE CONTROL FOR H.264》 A…

考研编程:10.11 回文数 水仙花 生成一定范围内的随机数 求二叉树宽度

回文数 #include <stdio.h>int main(){int a,b,c0,sum;scanf("%d",&a);ba;while(b!0){c b%10 c*10;b b/10;}if(ca){printf("yes");}return 0; } 水仙花 #include <stdio.h> #include <math.h> int main(){int a,b,c0,sum;scan…