RabbitMQ消息可靠性等机制详解(精细版三)

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:
  rabbitmq:
    host: 你的地址
    port: 5672
    virtual-host: /tingyi
    username: test
    password: test
    publisher-confirms: true
    publisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/**
 * @author 听忆
 */
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @PostConstruct  // init-method
    public void initMethod(){
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
​
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }
​
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("消息已经送达到Exchange");
        }else{
            System.out.println("消息没有送达到Exchange");
        }
    }
​
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达到Queue");
    }
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:
  rabbitmq:
    host: 你的地址
    port: 5672
    virtual-host: /tingyi
    username: test
    password: test
    listener:
      simple:
        acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/**
 * @author 听忆
 */
@Component
public class Consumer {
​
    @RabbitListener(queues = "boot-queue")
    public void getMessage(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:" + msg);
        try {
            int i = 1 / 0;
            /**
             * 消费者发起成功通知
             * 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号
             * 第二个参数:是否开启批量处理 false:不开启批量
             * 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,
             *          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            e.printStackTrace();
            /**
             * 返回失败通知
             * 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号
             * 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝
             * 第三个boolean true消息接收失败重新回到原有队列中
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }
​
    }
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:
            如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:
  redis:
    host: 你的地址
    port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
    //第四个参数: 设置消息唯一id
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);
    System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/**
 * @author 听忆
 */
/**
         * java中的方法叫做setIfAbsent, redis中的命令叫做setnx
         * 作用:
         *      如果为空就set值,并返回1, true
         *      如果存在(不为空)不进行操作,并返回0, false
         */
@Component
public class Consumer {
​
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    @RabbitListener(queues = "boot-queue")
    public void getMessage(String msg, Channel channel, Message message) throws IOException {
        //0. 获取MessageId, 消息唯一id
        String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
        //1. 设置key到Redis
        if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​
            //2. 消费消息
            System.out.println("接收到消息:" + msg);
​
            //3. 设置key的value为1
            redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​
            //4.  手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​
        }else {
​
            //5. 获取Redis中的value即可 如果是1,手动ack
            if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        }
​
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/762186.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】计数排序等排序

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

企业软文投放为什么要选择包收录媒体?

如今这个信息时代企业想要有效的将品牌推广出去&#xff0c;那选择推广方式至关重要。软文投放作为一种常见的品牌推广方式&#xff0c;其效果往往取决于投放的媒体质量。而在众多媒体中&#xff0c;包收录媒体凭借其独特的优势&#xff0c;成为了企业软文投放的明智之选。 一…

Nuxt3 的生命周期和钩子函数(七)

title: Nuxt3 的生命周期和钩子函数&#xff08;七&#xff09; date: 2024/6/30 updated: 2024/6/30 author: cmdragon excerpt: 摘要&#xff1a;文章阐述了Nuxt3中Nitro生命周期钩子的使用&#xff0c;如nitro:config自定义配置、nitro:init注册构建钩子、nitro:build:be…

为什么企业应用开发,c++干不过java?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「c的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01; C/C这种东西&#xff0c;根本…

15_软件程序设计基础

目录 嵌入式软件开发原理 宿主机和目标机 交叉编译 交叉调试 嵌入式软件开发特点和挑战 开发工具 程序设计语言基本概念 解释和编译 常见程序设计语言 程序设计语言的基本成分 编译程序基本原理 嵌入式软件开发原理 宿主机和目标机 嵌入式软件开发不同于传统软件开…

QT5:在窗口右上角显示图标

目录 一、环境与目标 二、实现逻辑&#xff08;纯代码&#xff09;与效果 三、参考代码 四、总结 一、环境与目标 qt版本&#xff1a;5.12.7 windows 11 下的 Qt Designer &#xff08;已搭建&#xff09; 目标&#xff1a;使用嵌套布局的方式将两个按钮显示在窗口右上角…

C++专业面试真题(1)学习

进程有多少种状态&#xff0c;如何转换 创建&#xff1a;一个进程启动&#xff0c;首先进入创建状态&#xff0c;需要获取系统资源创建进程管理科PCB完成资源分配。就绪态&#xff1a;在创建完成后&#xff0c;进程已经准备好&#xff0c;处于就绪状态&#xff0c;但是还未获得…

深度势能生成器(DP-GEN)入门讲解

文章目录 1.原子间相互作用1.为什么研究原子间相互作用2.研究原子间相互作用的传统方法 2.深度学习研究原子间相互作用1.深度势能平滑模型(DeepPot-se)2.Deep Potential 模型训练3.同步学习→充足采样&筛选样本 3.DP-GEN操作及运行1.DP-GEN主流程2.DP-GEN基本命令3.生成初始…

Sui创始团队在竞速环节中的快问快答

在Sui Basecamp活动期间&#xff0c;Sui区块链的最初贡献者在Oracle红牛赛车模拟器上展示了他们的技术能力&#xff0c;在驾驶圈时回答了有关Sui的问题。 Evan Cheng&#xff08;又名Revvin’ Evan&#xff09;在解释Mysticeti创下区块链最终性记录的同时保持着他的驾驶线路。…

【深度好文】LLMOps揭秘:AI工作流程的高效管理之道!

可以关注我的公众号&#xff1a;Halo咯咯 01。 概述 将大型语言模型&#xff08;LLMs&#xff09;的强大能力与机器学习运维&#xff08;MLOps&#xff09;的有序结构相结合&#xff0c;团队能够以更高效的方式工作&#xff0c;而非仅仅增加劳动强度。团队的焦点可以专注于开…

Redis分布式集群部署

目录 一. 原理简述 二. 集群配置​​​​​​​ 2.1 环境准备 2.2 编译安装一个redis 2.3 创建集群 2.4 写入数据测试 实验一&#xff1a; 实验二&#xff1a; 实验三&#xff1a; 实验四&#xff1a; 添加节点 自动分配槽位 提升节点为master&#xff1a; 实验…

Spring Security 认证流程

Spring Scurity是spring生态下用于认证和授权的框架&#xff0c;具有高度的灵活性和可扩展行&#xff0c;本节主要对Spring Security的认证过程中进行概括性的介绍&#xff0c;主要介绍在该过程中&#xff0c;会涉及到哪些组件以及每个组件所承担的职责&#xff0c;希望大家可以…

Java [ 基础 ] 方法引用 ✨

✨探索Java基础✨ Java基础&#xff1a;方法引用 方法引用是Java 8中引入的一种新特性&#xff0c;它使得代码更加简洁和易读。方法引用提供了一种可以直接引用已有方法作为Lambda表达式的替代方案。本文将深入介绍方法引用的基本概念、使用方法、具体实例及其在实际开发中的…

Open3D 点云的旋转与平移

目录 一、概述 1.1旋转 1.2平移 二、代码实现 2.1实现旋转 2.2实现平移 2.3组合变换 三、实现效果 3.1原始点云 3.2变换后点云 一、概述 在Open3D中&#xff0c;点云的旋转和平移是通过几何变换来实现的。几何变换可以应用于点云对象&#xff0c;使其在空间中移动或旋…

MobPush iOS端海外推送最佳实现

推送注册 在AppDelegate里进行SDK初始化&#xff08;也可以在Info.plist文件中进行AppKey&#xff0c;AppSecret的配置&#xff09;并对通知功能进行注册以及设置推送的环境和切换海外服务器等&#xff0c;参考如下步骤代码&#xff1a; <span style"background-colo…

叮!云原生虚拟数仓 PieCloudDB Database 动态包裹已送达

第一部分 PieCloudDB Database 最新动态 支持动态配置查询簇 PieCloudDB 最新内核版本 v2.14.0 新增动态配置查询簇功能。PieCloudDB 动态配置查询簇功能实现可伸缩的并行化查询&#xff0c;可提升单个查询并行使用底层资源的能力&#xff0c;同时加快查询响应速度。 动态配…

redis,memcached,nginx网络组件

课程目标&#xff1a; 1.网络模块要处理哪些事情 2.reactor是怎么处理这些事情的 3.reactor怎么封装 4.网络模块与业务逻辑的关系 5.怎么优化reactor? io函数 函数调用 都有两个作用&#xff1a;io检测 是否就绪 io操作 1. int clientfd accept(listenfd, &addr, &l…

navicat Lite 版

navicat Lite 版&#xff1a; Navicat 出了一个 Navicat Premium 的Lite版。 官方现在链接&#xff1a;https://www.navicat.com.cn/download/navicat-premium-lite#windows 从官网可以看到现在能够下载最新版本 17&#xff0c;支持各种平台

大型语言模型能否生成可信的事实核查解释?——通过多智能体辩论实现可信可解释的事实核查

Can LLMs Produce Faithful Explanations For Fact-checking? Towards Faithful Explainable Fact-Checking via Multi-Agent Debate 论文地址:https://arxiv.org/abs/2402.07401https://arxiv.org/abs/2402.07401 1.概述 在数字化时代,对于迅速传播的错误信息,其核实与明…

模拟电子学基本概念+Keil5安装指南!!

2024-7-1&#xff0c;星期一&#xff0c;16:56&#xff0c;天气&#xff1a;阴转小雨&#xff0c;心情&#xff1a;晴。大家好啊&#xff0c;今天换了一个新的主题&#xff0c;为什么嘞&#xff0c;是因为截止到昨天&#xff0c;电路基础部分的内容已经暂时告一段落啦&#xff…