微服务day07

MQ高级

发送者可靠性,MQ的可靠性,消费者可靠性。

发送者可靠性

发送者重连

连接重试的配置文件:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

关闭MQ。

运行测试用例结果,进行了三次重连。

11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]

发送者确认

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启

开启发送者确认:

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

package com.itheima.publisher.config;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Test
    public void testObgect1() throws InterruptedException {
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                //failure出现异常时的处理逻辑,基本不会触发。
                log.error("发送失败",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                //判断是否成功发送到服务器
                if (result.isAck()){
                    log.debug("发送消息成功,收到 ack!");
                }else{ // result.getReason(),String类型,返回nack时的异常描述
                    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                }
            }
        });
        //准备Map数据
        Map map = new HashMap();
        map.put("name","jack");
        map.put("age",21);
        rabbitTemplate.convertAndSend("obgect.queue1",map,cd);
        //用来接受返回的值
        Thread.sleep(2000);
    }

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。

    @Test
    public void testTopic3(){
        //自定义消息为非持久化
        Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes())
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
        //修改交换机的名字为hm.topic
        String name = "hm.topic";
        //修改路由键为blue
        rabbitTemplate.convertAndSend(name,"china.goods",build);
    }

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

MQ可靠性小结

消费者可靠性

消费者确认机制

消费者失败重试策略

相关文档:

哈哈哈

业务幂等性

 修改发送者代码,修改消息转换器:

    @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }

修改消费者代码来获取消息id:

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(Message message) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");
        log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");
        if (true) {
            throw new RuntimeException("故意的");
        }
        log.info("消息处理完成");
    }

 运行结果: 由于前面的步骤设置了报错,因此重发了3次。

11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

只需要修改消费者的处理逻辑部分即可:

package com.hmall.trade.listener;

import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class Orderlisten {
    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic"),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        //1、根据id获取订单信息
        Order order = orderService.getById(orderId);
        //判断订单状态,只有待支付才修改
        if (order == null || order.getStatus() != 1){
            //条件错误直接结束
            return;
        }
        orderService.markOrderPaySuccess(orderId);
    }
}

小结,小小面试题

延迟消息

死信交换机

消息的消费者代码:

创建普通交换机和信息队列,将消息队列的死信绑定死信交换机

dleConfigtasion.class


package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class dleConfigrasion {
    //交给Bean注解来进行处理
    //创建交换机
    @Bean
    public DirectExchange ttlExchange(){
        //参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)
        return ExchangeBuilder.directExchange("ttl.direct").build();
    }

    //创建消息队列
    @Bean
    public Queue ttltQueue(){
//        return new Queue("fanout.queue1");
        //使用build来创建消息队列
        //指定该队列的死信交换机
        return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();
    }

    // 绑定队列和交换机
    @Bean
    public Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");
    }


}

死信交换机和消息队列:

leatinMq.class   

 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlt.queue"),
            exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),
            key = {"hi"}
    ))
    public void Directlisten1dlt(String msg){
        System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());
    }

消息发送方:需要指定消息的存活时间:

    @Test
    public void testTopic(){
        //修改路由键为blue
        rabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",
                new MessagePostProcessor() {
            //可以对生成的message进行处理
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置过期时间为10秒钟
                message.getMessageProperties().setExpiration("1000");
                return message;
            }
        });
    }

代码结束。

延时插件:延时插件下载地址

插件官方文档:延时插件官方文档

详细操作文档: 黑马文档

取消超时订单

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916623.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

艾体宝干货丨微突发流量检测与分析:IOTA让网络监控更精准

网络流量中的微突发问题常常难以察觉&#xff0c;但它们可能对网络性能产生显著影响。这篇文章深入探讨了如何利用IOTA来捕捉和分析微突发&#xff0c;帮助您快速有效地解决网络中的突发流量问题。 什么是微突发&#xff08;Microburst&#xff09;流量&#xff1f; 微突发是…

SQL 审核在 CloudQuery 的四大场景应用

数据库作为数据的核心载体&#xff0c;其安全性和稳定性对业务的影响至关重要。而在我们日常业务中&#xff0c;SQL 编写不当是引起数据库故障的一个重要原因&#xff0c;轻则影响数据库性能&#xff0c;重则可能直接导致「雪崩」。因此&#xff0c;SQL 审核作为 SQL 代码投入生…

uniapp: 微信小程序包体积超过2M的优化方法

一、问题描述 在使用uniapp进行微信小程序开发时&#xff0c;经常会遇到包体积超过2M而无法上传&#xff1a; 二、解决方案 目前关于微信小程序分包大小有以下限制&#xff1a; 整个小程序所有分包大小不超过 30M&#xff08;服务商代开发的小程序不超过 20M&#xff09; 单个…

Node.Js+Knex+MySQL增删改查的简单示例(Typescript)

数据库: CREATE DATABASE MyDB; CREATE TABLE t_users (user_id int(11) NOT NULL,user_name varchar(10) NOT NULL ) ENGINEInnoDB DEFAULT CHARSETutf8; 项目结构: package.json如下&#xff0c;拷贝并替换你们本地的package.json后运行 npm install 命令安装所需要的依赖。…

fastadmin多个表crud连表操作步骤

1、crud命令 php think crud -t xq_user_credential -u 1 -c credential -i voucher_type,nickname,user_id,voucher_url,status,time --forcetrue2、修改控制器controller文件 <?phpnamespace app\admin\controller;use app\common\controller\Backend;/*** 凭证信息…

【论文阅读】利用SEM二维图像表征黏土矿物三维结构

导言 在油气储层研究中&#xff0c;黏土矿物对流体流动的影响需要在微观尺度上理解&#xff0c;但传统的二维SEM图像难以完整地表征三维孔隙结构。常规的三维成像技术如FIB-SEM&#xff08;聚焦离子束扫描电子显微镜&#xff09;虽然可以获取高精度的3D图像&#xff0c;但成本…

JavaScript 中的 undefined 、null 与 NaN :概念解析与对比

文章目录 &#x1f4af;前言&#x1f4af;undefined1. 什么是 undefined2. undefined 的使用场景3. undefined 的特性 &#x1f4af;null1. 什么是 null2. null 的使用场景3. null 的特性 &#x1f4af;NaN1. 什么是 NaN2. NaN 的使用场景3. NaN 的特性 &#x1f4af;三者的区别…

C++编程技巧与规范-类和对象

类和对象 1. 静态对象的探讨与全局对象的构造顺序 静态对象的探讨 类中的静态成员变量(类类型静态成员) 类中静态变量的声明与定义&#xff08;类中声明类外定义&#xff09; #include<iostream> using namespace std;namespace _nmspl {class A{public:A():m_i(5){…

python遇到问题

1&#xff0c;BeautifulSoup lxml 解析器安装 问 1&#xff0c;BeautifulSoup lxml 解析器安装2&#xff0c;BeautifulSoup 如何引入第三方库 BeautifulSoup lxml&#xff0c;默认是导入的是python内置的解析器答1 1. 安装 Python 和 pip 确保你已经安装了 Python 和 pip。你…

async 和 await的使用

一、需求 点击按钮处理重复提交&#xff0c;想要通过disabled的方式实现。 但是点击按钮调用的方法里有ajax、跳转、弹窗等一系列逻辑操作&#xff0c;需要等方法里流程都走完&#xff0c;再把disabled设为false&#xff0c;这样下次点击按钮时就可以继续走方法里的ajax等操作…

MacOS下,如何在Safari浏览器中打开或关闭页面中的图片文字翻译功能

MacOS下&#xff0c;如何在Safari浏览器中打开或关闭页面中的图片文字翻译功能 在Mac上的Safari浏览器中&#xff0c;可以通过实况文本功能来实现图片中的文本翻译。关闭步骤具体步骤如下&#xff1a; 在浏览器地址栏&#xff0c;鼠标右击翻译按钮&#xff0c;然后点击“首选…

31.2 DOD压缩和相关的prometheus源码解读

本节重点介绍 : 时序数据时间的特点DOD压缩原理讲解dod压缩过程讲解dod压缩 prometheus源码解读 时序数据时间的特点 持续采集采集间隔固定&#xff0c;如prometheus配置job中的scrape_interval参数每隔15秒采集一次 - job_name: node_exporterhonor_timestamps: truescrape…

推荐一款好用的ios传输设备管理工具:AnyTrans for iOS

AnyTrans for iOS是一款好用的ios传输设备管理工具&#xff0c;可以方便用户对iphone、ipad、ipod中的文件进行管理操作&#xff0c;可以方便用户在电脑上进行各类文件的管理操作&#xff0c;支持联系人、视频、音频、短信、图片等文件的导入&#xff0c;软件支持双向传输和浏览…

快速利用c语言实现线性表(lineList)

线性表是数据结构中最基本和简单的一个&#xff0c;它是n的相同类型数据的有序序列&#xff0c;我们也可以用c语言中的数组来理解线性表。 一、线性表声明 我们定义一个线性表的结构体&#xff0c;内部有三个元素&#xff1a;其中elem是一个指针&#xff0c;指向线性表的头&am…

计算机毕业设计Python+CNN卷积神经网络股票预测系统 股票推荐系统 股票可视化 股票数据分析 量化交易系统 股票爬虫 股票K线图 大数据毕业设计 AI

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

QT QLineEdit失去焦点事件问题与解决

本文介绍如何获得QLineEdit的失去焦点事件和获得焦点的输入框也会触发失去焦点事件的问题&#xff01; 目录 一、QLineEdit获得失去焦点事件 1.自定义类继承自QLineEdit 2.重写 focusOutEvent 3.使用 二、失去焦点事件问题 1.问题描述 2.问题解决 三、源码分享 lineed…

vscode执行npm install报错

npm install一直提示报错 以管理员身份运行vscode&#xff0c;如果每次觉得很麻烦可以做如下修改&#xff1a;

【算法】树状数组

前言 众所周知&#xff0c;通过前缀和&#xff0c;我们可以很快的在一个很大的数组中求出区间和&#xff0c;但是如果想要去修改数组中的一个数的值&#xff0c;前缀和就无法实现。所以来学习一个新的数据结构&#xff1a;树状数组 &#xff08;文章中关于树状数组的截图来自于…

Java项目实战II基于微信小程序的私家车位共享系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在城市化进…

ZeroSSL HTTPS SSL证书ACMESSL申请3个月证书

目录 一、引言 二、准备工作 三、申请 SSL 证书 四、证书选型 五、ssl重要性 一、引言 目前免费 Lets Encrypt、ZeroSSL、BuyPass、Google Public CA SSL 证书&#xff0c;一般免费3-6个月。从申请难易程度分析&#xff0c;zerossl申请相对快速和简单&#xff0c;亲测速度非…