微服务——服务异步通讯(MQ高级)

MQ的一些常见问题

消息可靠性

生产者消息确认

返回ack,怎么感觉这么像某个tcp的3次握手。

使用资料提供的案例工程.

在图形化界面创建一个simple.queue的队列,虚拟机要和配置文件里面的一样。

 SpringAMQP实现生产者确认

AMQP里面支持多种生产者确认的类型。

simple是同步等待模式,发了消息之后就一直等待结果,可能会导致代码阻塞。

correlated是异步回调模式,像前段的ajax请求的回调函数。

ApplicationContextAware是bean工厂通知。会在Spring容器创建完后来通知并传一个spring容器到下面的方法。然后从中取到rabbitTemplate的bean并设置ReturnCallback。 

ReturnCallback:消息到了交换机,路由时失败了没有到达消息队列

ConfirmCallback:消息连交换机都没到。

这个不像ReturnCallback只能配置一个,这个可以在每次发消息时设置。

这里在发送消息时多了一个correlationData,这是在配置开关选择的confirm类型为correlated。里面封装了消息的唯一id和callback.

callback里面的result是成功的回调函数,ex是失败的回调函数。这里的失败是指回调都没收到。

实现

先是在生产者的配置文件里要加上前面的配置j

编写returnCallback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果有需要的话,可以重发消息
        });
    }
}

编写ConfirmCallback

这里先要在图形界面手动将交换机和消息队列做绑定 

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        //1.准备消息
        String message = "hello, spring amqp!";
        //2.准备correlationData
        //2.1消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            //判断结果
            if(result.isAck()){
                //ACK
                log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
            }else{
                //NACK
                log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());
            }
        }, ex -> {
            //记录日志
            log.error("消息发送失败!",ex);
            //重发消息
        });
        //3.发送消息
        rabbitTemplate.convertAndSend("camq.topic", "simple.test", message,correlationData);
    }

测试得到

成功的测试情况

 

失败的测试情况

投递交换机失败,交换机不存在

投递队列失败,队列不存在

 

消息持久化

这里通过重启rabbitmq容器发现消息都不见了可以确认,rabbitmq和redis一样都是内存运行的。

甚至我手动加上的消息队列和绑定关系都不见了。这里消息队列不见是因为前面创建队列时选择的是Transient,不持久化。系统默认的交换机都还在,是因为durable为true,持久化。

创建队列或交换机的时候可以设置Durability为Durable即可持久化。

在消费者代码中进行交换机和队列的创建,然后可以看见如下持久化的交换机和队列.

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simpleExchange(){
        return new DirectExchange("simple.direct",true,false);
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}

手动发送一条消息进行测试

重启之后消息还是消失了。

要想让消息持久化,需要在发送消息时指定。


    @Test
    public void testDurableMessage(){
        //1.准备消息
        Message message = MessageBuilder.withBody("hello,pop".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .build();
        //2.发送消息
        rabbitTemplate.convertAndSend("simple.queue",message);
    }

 重启之后消息就持久化了。

通常在springamqp中这些都是持久化的。

消费者消息确认

在none模式下,消费者拿到消息都就报异常了,然后消息也没了。

在auto模式下,消费者拿到消息后给mq报了个unack,然后消息会重新投递,消费者继续拿消息,tmd,死循环了。 但是这里消息就不会消失了。


    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1/0);
        log.info("消费者处理消息成功!");
    }

消费失败重试机制

重试次数耗尽之后会将消息丢弃。

消费者失败消息处理策略

 在消费者代码中

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue",true);
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

 重新发送消息进行测试,可以看见重试次数耗尽之后就送到了死信队列了。

在里面将异常的堆栈信息也包含了. 

 

死信交换机

初识死信交换机

区别在于,上一个是消费者失败之后寻找交换机路由到error队列,这个是退回到队列,再指定交换机,最后路由。

TTL

这个的应用场景比如说订单超时未支付然后自动取消。

实现  

          

准备 代码部分

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name = "dl.queue",durable = "true"),
            exchange=@Exchange(name="dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg){
        log.info("接收到 dl.queue的延迟消息:{}",msg);
    }

@Configuration
public class TTLMessageConfig {
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct");
    }
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }
    @Bean
    public Binding simpleBinging(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

 测试代码

    @Test
    public void testTTLMessage(){
        //1.准备消息
        Message message = MessageBuilder
                .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .build();
        //2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
        //3.记录日志
        log.info("消息成功发送!");
    }

10s之后在消费者那里就可以看见

 

 然后这里会以短的优先,5s后消费者就可以收到消息。

延迟队列

1.重装rabbitmq容器 

这个插件需要找到mq内部的插件文件夹,所以需要在创建容器的时候进行数据卷挂载。

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

 2.安装DelayExchange插件

官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。 

查看挂载的数据卷.

docker volume inspect mq-plugins

接下来的看着好麻烦,以后看文档吧.

还真的麻烦的一批,真不想再搞这玩意,文件搞来搞去。

不知道为什么,挂载数据卷时一直报错,不能用自己定义的文件夹来挂载。

 

 

在消费者中如下声明

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name = "delay.queue",durable = "true"),
            exchange=@Exchange(name="delay.direct",delayed = "true"),
            key = "delay"
    ))
    public void listenDelayQueue(String msg){
        log.info("接收到 delay.queue的延迟消息:{}",msg);
    }

 在生产者中如下定义

    @Test
    public void testSendDelayMessage(){
        //1.准备消息
        Message message = MessageBuilder
                .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .setHeader("x-delay",5000)
                .build();
        //2.准备correlationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData);
        log.info("发送消息成功");
    }

测试结果如下 成功实现延迟5秒。但是会被报错,理论上说交换机应该立即转发,不会延迟,但是这里的延迟交换机可以帮忙保存消息延迟发送,所以这里才会报错,not_router,消息没有到达队列

 为了解决这个报错,需要修改生产者代码

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //判断是否是延迟消息
            if (message.getMessageProperties().getReceivedDelay()>0) {
                //是一个延迟消息,忽略错误提示
                return;
            }
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果有需要的话,可以重发消息
        });
    }
}

惰性队列

消息堆积问题

问题解决

消费者中声明两个队列。 

@Configuration
public class LazyConfig {
    @Bean
    public Queue lazyQueue(){
        return QueueBuilder.durable("lazy.queue")
                .lazy()
                .build();
    }
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable("normal.queue")
                .build();
    }
}

 测试,准备两个队列之后分别向两个队列发消息。

    @Test
    public void testLazyMessage(){
        for(int i=0;i<1000000;i++){
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
                    .build();
            //3.发送消息
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
    }
    @Test
    public void testnormalMessage(){
        for(int i=0;i<1000000;i++){
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
                    .build();
            //3.发送消息
            rabbitTemplate.convertAndSend("normal.queue", message);
        }
    }

可以看见惰性队列的消息全部到paged out 刷出磁盘了?????、,为什么非惰性队列的也是刷出磁盘了。

 

MQ集群

集群个屁,不搞了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/255286.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL job 定时任务

目录 介绍 优点&#xff1a; 缺点&#xff1a; 使用场景&#xff1a; 案例 创建表 -- 创建定时任务 每一分钟插入一条数据 执行结果 -- 查询定时任务 ENABLED--启用 DISABLED--禁用 -- 查询定时任务 -- 启用定时任务 ​-- 禁用定时任务 ​-- 删除定时任务 …

【mybatis】mapper.xml映射文件

目录 一.概述 二.了解mapper.xml文件 namespaceidresultType指定映射文件的路径 一.概述 mapper.xml 是一个 MyBatis 的映射文件&#xff0c;用于定义 SQL 语句和结果映射。它是一个 XML 文件&#xff0c;通常放置在项目的资源目录下。 随着mybatis框架的发展&#xff0c;myb…

Spring IOC 原理(二)

Spring IOC 原理 概念 Spring 通过一个配置文件描述 Bean 及 Bean 之间的依赖关系&#xff0c;利用 Java 语言的反射功能实例化Bean 并建立 Bean 之间的依赖关系。 Spring 的 IoC 容器在完成这些底层工作的基础上&#xff0c;还提供了 Bean 实例缓存、生命周期管理、 Bean 实…

数据结构与算法--贪心算法

贪心算法 应用场景 假设存在下面需要付费的广播台&#xff0c;以及广播台信号可以覆盖的地区。 如何选择最少的广播台&#xff0c;让所有的地区都可以接收到信号 介绍 贪婪算法(贪心算法)是指在对问题进行求解时&#xff0c;在每一步选择中都采取最好或者最优(即最有利)的选择…

第三周:Python能力复盘

资料&#xff1a; 《笨办法学Python》阅读地址&#xff1a;https://www.bookstack.cn/read/LearnPython3TheHardWay 《廖雪峰Python教程》阅读地址&#xff1a;http://t.cn/RK0qGu7 《机器学习numpy与pandas基础》&#xff1a;https://zhuanlan.zhihu.com/p/639733816 《matplo…

代码随想录算法训练营 | day55 动态规划 392.判断子序列,115.不同的子序列

刷题 392.判断子序列 题目链接 | 文章讲解 | 视频讲解 题目&#xff1a;给定字符串 s 和 t &#xff0c;判断 s 是否为 t 的子序列。 字符串的一个子序列是原始字符串删除一些&#xff08;也可以不删除&#xff09;字符而不改变剩余字符相对位置形成的新字符串。&#xff0…

SHT10温湿度传感器——STM32驱动

———————实验效果——————— &#x1f384;硬件外观 &#x1f384;接线 &#x1f388; 3.3V供电 &#x1f388; IIC通讯 &#x1f384; 代码获取 &#x1f388; 查看下方 ———————END———————

【深度学习初探】Day32 - 三维点云数据基础

【深度学习初探】Day32 - 三维点云数据基础 文章目录 【深度学习初探】Day32 - 三维点云数据基础一、点云的定义二、点云的获取三、点云的属性四、点云的存储格式4.1 pts4.2 LAS4.3 PCD4.4 .xyz4.5 .pcap 五、三维点云的表示方法5.1 二维投影5.2 三维体素5.3 原始点云5.4 图 六…

(已解决)如何使用matplotlib绘制小提琴图

网上很多人使用seaborn绘制小提琴图&#xff0c;本人暂时不想学新的东西&#xff0c;就是懒。本文介绍如何使用matplotlib绘制小提琴图&#xff0c;很多其他博客只是使用最简单的语法&#xff0c;默认小提琴颜色会是蓝色&#xff0c;根本改不了。本文使用了一点高级的用法&…

排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序

排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序 文章目录 排序 | 冒泡 插入 希尔 选择 堆 快排 归并 非递归 计数 基数 排序前言&#xff1a;冒泡排序插入排序希尔排序选择排序堆排序快速排序--交换排序三数取中快速排序hoare版本快速排序挖坑法快速排序前后指…

Linux---Ubuntu软件卸载

1. 软件卸载的介绍 Ubuntu软件卸载有两种方式: 离线安装包的卸载(deb 文件格式卸载&#xff09;在线安装包的卸载(apt-get 方式卸载) 2. deb 文件格式卸载 命令格式: sudo dpkg –r 安装包名 -r 选项表示安装的卸载 dpkg 卸载效果图: 3. apt-get 方式卸载 命令格式: …

JMeter如何从数据库中获取数据并作为变量使用?

前言 JMeter如何从数据库中获取数据并作为变量使用&#xff1f;这在我们使用JMeter做接口测试、压力测试时经常碰到&#xff0c;今天通过两个示例&#xff08;实现MySQL数据库的查询结果的单值引用和多值引用&#xff09;进行说明。这里虽然以MySQL数据库做说明&#xff0c;但…

Oracle VM VirtualBox使用——备赛笔记——2024全国职业院校技能大赛“大数据应用开发”赛项——任务2:离线数据处理

简述&#xff1a; Oracle VM VirtualBox是一款开源虚拟机软件&#xff0c;由德国Innotek公司开发&#xff0c;后被Sun Microsystems公司收购&#xff0c;并最终被甲骨文公司收购。它支持在Windows、Mac OS X、Linux、OpenBSD、Solaris、IBM OS2甚至Android等操作系统上创建虚拟…

06.deque 容器

6、deque 容器 功能&#xff1a; 双端数组&#xff0c;可以对头端进行插入删除操作 deque 与 vector 区别&#xff1a; vector 对于头部的插入删除效率低&#xff0c;数据量越大&#xff0c;效率越低deque 相对而言&#xff0c;对头部的插入删除速度会比 vector 快vector 访…

一种解决Qt5发布release文件引发的无法定位程序输入点错误的方法

目录 本地环境问题描述分析解决方案 本地环境 本文将不会解释如何利用Qt5编译生成release类型的可执行文件以及如何利用windeployqt生成可执行的依赖库&#xff0c;请自行百度。 环境值操作系统Windows 10 专业版&#xff08;22H2&#xff09;Qt版本Qt 5.15.2Qt Creator版本5.0…

好的软件测试人员简历是什么样子的?

简历是入职职场的一张名片&#xff0c;也是进入职场一块“敲门砖”。从某种角度说&#xff0c;简历也是一张专业人员的说明书。 软件测试人员作为IT行业具有技术含量的职业&#xff0c;一份优秀的简历包含的内容以及如何写好简历尤为重要。接下来从以下两方面来介绍这个话题&a…

Mysql数据库的基础知识和yum安装步骤

MySQL数据库介绍 什么是数据库DB&#xff1f; DB的全称是database&#xff0c;即数据库的意思。数据库实际上就是一个文件集合&#xff0c;是一个存储数据的仓库&#xff0c;数据库是按照特定的格式把数据存储起来&#xff0c;用户可以对存储的数据进行增删改查操作&#xff1…

代码随想Day41 | 343. 整数拆分、96.不同的二叉搜索树

343. 整数拆分 这道题的贪心思路是尽可能地拆分成3&#xff0c;&#xff08;结论&#xff09;&#xff0c;需要进行数学证明&#xff0c;详细代码如下&#xff1a; class Solution { public:int integerBreak(int n) {if(n2) return 1;if(n3) return 2;if(n4) return 4;int re…

【每日OJ—有效的括号(栈)】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 1、有效的括号题目&#xff1a; 1.1方法讲解&#xff1a; 1.2代码实现&#xff1a; 总结 前言 世上有两种耀眼的光芒&#xff0c;一种是正在升起的太阳&#…

AUTOSAR StbM模块的配置以及代码实现

AUTOSAR StbM模块的配置以及代码实现 1、AUTOSAR配置 2、StbM_Init 初始化各个变量。 3、StbM_MainFunction StbM_Rb_IsSyncTimeBase 同步的TimeBase的id范围是0-15 StbM_Rb_IsOffsetTimeBase offset的TimeBase的id范围是16-31 StbM_Rb_IsPureLocalTimeBase pure的Time…