RabbitMQ高级特性2 、TTL、死信队列和延迟队列

MQ高级特性

1.削峰

设置 消费者

测试 添加多条消息

拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费

TTL

Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

1.图形化操作

添加队列

添加交换机

将交换机和对应的队列进行绑定

时间结束 , 消息失效

2.代码实现

配置 生产者

@Configuration
public class TopicMqTtlConfig {


    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq.queue.name2}")
    private String QUEUENAME2;
    // 1
    // . 交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    // 2。 队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .withArgument("x-message-ttl",300000000)//过期时间30秒
                .build();
        return queue2;
    }
    // 3. 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs();
        return binding2;
    }
}

测试

添加成功 ttl1只接收10条

时间过期

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

消息在什么情况下会成为死信?(面试会问)

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

2.代码实现

@Configuration
public class TopicMqDeadConfig {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    @Value("${mq1.exchange.name2}")
    private String DEADEXCHANGE;
    @Value("${mq1.queue.name1}")
    private String QUEUENAME1;
    @Value("${mq1.queue.name2}")
    private String QUEUENAME2;
    // 声明正常交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //  正常队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue = QueueBuilder.nonDurable(QUEUENAME1)
                .withArgument("x-message-ttl",30000)//过期时间30秒
                .withArgument("x-dead-letter-exchange",DEADEXCHANGE)
                .withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定
                //.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃
                .build();
        return queue;
    }
    // 交换机和队列进行绑定
    @Bean("binding1")
    public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();
        return binding1;
    }


    // 声明死信交换机
    @Bean("ex2")
    public Exchange getDeadExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();
        return exchange;
    }
    //死信队列
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2)
                .build();
        return queue2;
    }
    // 死信交换机和死信队列进行绑定
    @Bean("binding2")
    public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();
        return binding2;
    }


}

测试

如果程序出现错误 拒绝签收

监听正常队列

发送消息 启动测试

总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

        1. 队列消息长度到达限制;

        2. 消费者拒接消费消息,并且不重回队列;

        3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

1.配置

添加依赖

  <!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>




        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <!--nacos 配置中心-->
        <!--配置中心-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>


        <!-- application  bootstrap  -->


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>


        <!-- nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>




        <dependency>
            <groupId>com.example</groupId>
            <artifactId>sys-comm</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

 

修改配置

2.代码实现

创建实体类

发送消息 测试

过期后放入死信队列

添加依赖

 <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>

将json数据转化为对象

获取成功

3.连接数据库

创建表

创建测试类
@RestController
@RequestMapping("order")
public class OrderController {
    @Value("${mq1.exchange.name1}")
    private String EXCHANGENAME;
    //
    @Resource
    private RabbitTemplate rabbitTemplate;
    @GetMapping
    public Result aaa(TabOrder order){
     //1. 消息 存放到mq里面
        String s = JSONUtil.toJsonStr(order);
        // openfeign  --      数据添加到数据库里面
        rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s);
        return Result.success(s);
    }
}

监听normal
import javax.annotation.Resource;
@Component
public class XiaoFeng implements ChannelAwareMessageListener {
    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_normal")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象


        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            // 将订单的信息 报讯到数据库里面
            int insert = orderMapper.insert(order);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

监听dead
@Component
public class YanChi implements ChannelAwareMessageListener {


    @Resource
    private TabOrderMapper orderMapper;
    @Override
    @RabbitListener(queues = "test_queue_dead")
    public void onMessage(Message message, Channel channel) throws Exception {
        //Thread.sleep(2000);// 20s
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        // 将字符串转化为 对象
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            TabOrder order = JSONUtil.toBean(s, TabOrder.class);
            //  order 的状态
            TabOrder tabOrder = orderMapper.selectById(order.getId());
            if(tabOrder.getStatus()==1){
                // 取消
                tabOrder.setStatus(3);
            }
            orderMapper.updateById(tabOrder);
            channel.basicAck(deliveryTag,true); //
        }catch(Exception e){
            //long deliveryTag, boolean multiple, boolean requeue
            System.out.println("拒绝签收消息");
            channel.basicNack(deliveryTag,true,false);// 死信消息
        }
    }
}

测试

成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/198676.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

服务器主机安全如何保障

随着互联网的快速发展&#xff0c;服务器主机安全问题日益凸显。服务器主机是网络世界中的核心&#xff0c;其安全性关乎着整个网络系统的稳定性和可靠性。 当前&#xff0c;服务器主机面临着多种安全威胁。其中&#xff0c;网络攻击是最为常见的一种。网络攻击者利用各种手段…

ELK+Filebeat

Filebeat概述 1.Filebeat简介 Filebeat是一款轻量级的日志收集工具&#xff0c;可以在非JAVA环境下运行。 因此&#xff0c;Filebeat常被用在非JAVAf的服务器上用于替代Logstash&#xff0c;收集日志信息。实际上&#xff0c;Filebeat几乎可以起到与Logstash相同的作用&…

Linux—进程状态

目录 一.前言 1.1.通过系统调用获取进程标示符 1.2.通过系统调用创建进程 二.进程状态 三.Z(zombie)-僵尸进程 四.僵尸进程危害 一.前言 学习进程的状态&#xff0c;我们首先了解一下进程的基本数据 1.1.通过系统调用获取进程标示符 由getpid&#xff08;&#xff09…

多个nginx共享值、缓存问题

背景 目前我在集成登录认证功能&#xff08;cas&#xff09;&#xff0c;使用的架构是nginxlua&#xff0c;由于我们有多个系统&#xff08;全是前端项目&#xff09;&#xff0c;每套系统都采用nginxlua的方式进行部署&#xff08;即每个系统都是一个nginx&#xff09;&#…

pytorch读取tiny-imagenet-200的验证集(val)

ori_train torchvision.datasets.ImageFolder(root args.datadir /tiny-imagenet-200/train/, transformtransform)#可以获取class_idx的映射class_idx ori_train.class_to_idx val_annotations.txt中存储着每个图片对应的类别 获取验证集的标签 test_target []#读取val_…

java:jpa、Hibernate、Spring Data JPA、ORM以及和mybatis的区别

文章目录 Java连接数据库几种方式JPAHibernate和Spring Data JPAORM框架jpa和mybatis区别Spring Boot JPA使用例子1、创建库和表2、添加依赖3、配置数据源和Hibernate属性4、配置实体类5、创建一个继承JpaRepository的接口&#xff1a;6、创建一个控制器&#xff08;Controller…

汽车电子 -- 车载ADAS之FCTA/FCTB(前方横向来车碰撞预警/制动)

参看&#xff1a;功能定义-前方交通穿行提示&制动 1、前方横向来车碰撞预警/制动 FCTA/FCTB&#xff08; Front Cross Traffic Alert /Brake&#xff09; 前方横向来车碰撞预警FCTA&#xff08; Front Cross Traffic Alert &#xff09; 其功能表现为在车辆低速前进时&am…

解锁 ElasticJob 云原生实践的难题

发生了什么 最近在逛 ElasticJob 官方社区时发现很多小伙伴都在头疼这个 ElasticJob 上云的问题&#xff0c;ElasticJob 本就号称分布式弹性任务调度框架&#xff0c;怎么在云原生环境就有了问题了呢&#xff0c;这就要从 Kubenertes 和 ElasticJob 的一些状态化说起。 有意思的…

Java SE

目录 编程编的其实就是启动之后的内存⭐配置环境Java环境Windows配置Java环境变量Linux配置Java环境变量前言&#xff1a;常见Linux系统 Java基础类型八大基本数据类型数值型非数值型 void引用数据类型 运算符位运算符其他 基本结构表达式方法类实例&#xff08;对象&#xff0…

战略制定|竞争战略管理分析六大常用工具

企业战略可从多个角度理解&#xff0c;体现为著名的5P模型。首先&#xff0c;从未来发展视角看&#xff0c;战略是一种计划(Plan)&#xff0c;指导企业朝向既定目标前进。而从过去的发展历程看&#xff0c;它呈现为一种模式(Pattern)&#xff0c;反映了企业的历史行为趋势。在产…

【Apifox】测试工具自动编写接口文档

在开发过程中&#xff0c;我们总是避免不了进行接口的测试&#xff0c; 而相比手动敲测试代码&#xff0c;使用测试工具进行测试更为便捷&#xff0c;高效 今天发现了一个非常好用的接口测试工具Apifox 相比于Postman&#xff0c;他还拥有一个非常nb的功能&#xff0c; 在接…

解决git action发布失败报错:Error: Resource not accessible by integration

现象&#xff1a; 网上说的解决方法都是什么到github个人中心setting里面的action设置里面去找。 可这玩意根本就没有&#xff01; 正确解决办法&#xff1a; 在你的仓库页面&#xff0c;注意是仓库页面的setting里面&#xff1a; Actions> General>Workflow permisss…

dart多线程双向通信的案例----【小学4年级课程】

下面是运行后的打印顺序 I/flutter (20170): 上班 I/flutter (20170): 这里是校长室:main I/flutter (20170): 这里是饭堂:fantang1 I/flutter (20170): 这里是收发室--检查小孩发回去给他妈妈的信息是&#xff1a;我是秘书的儿子&#xff0c;我来到在校长室了。校长今晚想吃羊…

Docker 概述与安装

文章目录 1. Docker简介2. 传统虚拟机和容器3. Docker运行速度快的原因4. Docker软件4.1 Docker镜像4.2 Docker容器4.3 Docker仓库 5. Docker架构6. CentOS安装Docker6.1 卸载旧版本6.2 配置yum资源库6.3 安装Docker引擎6.4 启动docker引擎6.5 设置开机自启 7. 卸载Docker8. 运…

DM8误删除操作恢复方案

达梦数据库三种在误删除操作后的回退方案 一、闪回表 当用户操作不慎导致错误的删改数据时&#xff0c;闪回方式可以恢复数据。闪回技术&#xff0c;就是为了用户可以迅速处理这种 数据逻辑损坏的情况而产生的。 闪回技术主要是通过回滚段存储的 UNDO 记录来完成历史记录的还原…

Java大型电商项目——品优购(一)

视频教程&#xff1a;【黑马程序员】Java大型电商项目—品优购【配套源码笔记】_哔哩哔哩_bilibili源码下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1fECz5In_XCB-aW6ed6ZTbA 提取码&#xff1a;27xa 技术选型&#xff1a; 后端框架&#xff1a;SpringSprin…

使用 watch+$nextTick 解决Vue引入组件无法使用问题

问题描述&#xff1a; 很多时候我们都需要使用第三方组件库&#xff0c;比如Element-UI&#xff0c;Swiper 等等。 如果我们想要在这些结构中传入自己从服务器请求中获取的数据就会出现无法显示的问题。 比如我们在下面的Swiper例子中&#xff0c;我们需要new Swiper 才能让…

福州大学《嵌入式系统综合设计》 实验八:FFMPEG视频编码

一、实验目的 掌握使用算能平台进行视频编码的流程&#xff0c;包括开发主机环境与云平台的配置&#xff0c;视频编码程序的编写与理解&#xff0c;代码的编译、运行以及学习使用码流分析工具分析视频压缩码流等。 二、实验内容 搭建实验开发环境&#xff0c;编译并运行编码…

软著项目推荐 深度学习动物识别 - 卷积神经网络 机器视觉 图像识别

文章目录 0 前言1 背景2 算法原理2.1 动物识别方法概况2.2 常用的网络模型2.2.1 B-CNN2.2.2 SSD 3 SSD动物目标检测流程4 实现效果5 部分相关代码5.1 数据预处理5.2 构建卷积神经网络5.3 tensorflow计算图可视化5.4 网络模型训练5.5 对猫狗图像进行2分类 6 最后 0 前言 &#…

图数据库HugeGraph:HugeGraph-Hubble基于Web的可视化图管理初体验

原创/朱季谦 一、HugeGraph-Hubble简介 关于HugeGraph&#xff0c;官方资料是这样介绍的&#xff0c;它是一款易用、高效、通用的开源图数据库系统&#xff08;Graph Database&#xff09;&#xff0c; 实现了 Apache TinkerPop3 框架及完全兼容 Gremlin 查询语言&#xff0c…