如何保证消息不丢失?——使用rabbitmq的死信队列!

如何保证消息不丢失?——使用rabbitmq的死信队列!

1、什么是死信

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
    1. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
    1. 消息的Expiration 过期时长或队列TTL过期时间。
    1. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
在这里插入图片描述

3、那么,我们到底如何来使用死信队列呢?

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

生产者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.provider;

import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>Project: spring-rabbitmq - DeadProvider</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:35:12</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
@Service
public class DeadProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OrderingOk orderingOk) {
        rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                int id  = orderingOk.getId();
                int expiration = 0;
                if(id == 1){
                    expiration = 50*1000;
                }else if(id == 2){
                    expiration = 40*1000;
                }else if(id ==3){
                    expiration = 30*1000;
                }else if(id ==4){
                    expiration = 20*1000;
                }else if(id ==5){
                    expiration = 10*1000;
                }
                //为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费
                message.getMessageProperties().setExpiration(String.valueOf(expiration));
                return message;
            }
        });
    }
}

消费者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.consumers;

import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * <p>Project: spring-rabbitmq - DeadConsumer</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:32:59</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
//@Configuration
@Slf4j
public class DeadConsumer {
    //死信交换机
    @Bean
    public DirectExchange deadExchange(){
        return  ExchangeBuilder.directExchange("Dead_E01").build();
    }
    //死信队列
    @Bean
    public Queue deadQueue1(){
        return   QueueBuilder.durable("Dead_Q01").build();
    }
    //死信交换机与死信队列的绑定
    @Bean
    public Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
    }
    //业务队列
    @Bean
    public Queue queue1(){
        return   QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
                //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
                .build();
    }

    //业务交换机
    @Bean
    public DirectExchange exchange(){
        return  ExchangeBuilder.directExchange("Direct_E01").build();
    }
    //业务交换机与队列的绑定
    @Bean
    public Binding binding1(Queue queue1,DirectExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }


//    @RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
//
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
}


//}

测试

@Test
    void test4() throws IOException {

        for (int i = 1; i <=5;i++){
            OrderingOk orderingOk = OrderingOk.builder().id(i).name("fpl " + i).build();
            deadProvider.send(orderingOk);
            System.out.println("发送成功:"+i);
        }
        System.in.read();
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/532487.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

c/c++函数: strtok() ,strtok_s()

概述 函数的原型&#xff1a; char* strtok : strtok (char* _String, char const* _Delimiter); char* strtok_s: strtok_s( char* _String, char const* _Delimiter, char** _Context);函数的参数: _String : 传入一个字符串 _Delimiter: 传入一个字符字…

【canvas】canvas基础使用(四):线型的设置

简言 学习如何使用canvas来设置线形。 线型的方法和属性 使用canvas经常会和线段打交道&#xff0c;下面是设置线型的常用属性和方法。 lineWidth 线宽 CanvasRenderingContext2D.lineWidth 是 Canvas 2D API 设置线段厚度的属性&#xff08;即线段的宽度&#xff09;。 线…

VR紧急情况模拟|V R体验中心加盟|元宇宙文旅

通过VR技术实现紧急情况模拟&#xff0c;提升安全应急能力&#xff01; 简介&#xff1a;面对突发紧急情况&#xff0c;如火灾、地震、交通事故等&#xff0c;正确的反应和应对能够有效减少伤害和损失。为了提高人们在紧急情况下的应急能力&#xff0c;我们借助先进的虚拟现实…

全国水科技大会 免费征集《水环境治理减污降碳协同增效示范案例》

申报时间截止到2024年4月15日&#xff0c;请各单位抓紧申报&#xff0c;申报条件及申报表请联系&#xff1a;13718793867 围绕水环境治理减污降碳协同增效领域&#xff0c;以资源化、生态化和可持续化为导向&#xff0c;面向生态、流城、城市、农村、工业园区、电力、石化、钢…

寄快递的省钱小妙招,看看你能知道多少

首先就是从包裹的重量上和体积上&#xff0c;我们都知道快递员上门取件都是需要称重的&#xff0c;我们能做的就是尽量压缩包裹的体积来减少快递的运费价格。然后是使用自己的包装袋来打包行李&#xff0c;快递员的袋子也是需要另外花费的。对于一些不容易损坏的货物来说&#…

scan纯享代码 java

scan纯享代码 java 1 scan用法2 next3 nextLine 1 scan用法 在录入中间有回车的字符串的时候&#xff0c;不要使用next&#xff08;&#xff09;和nextLine&#xff08;&#xff09;的配合&#xff01;&#xff01; scan用法 Scanner scannernew Scanner(System.in); String…

了解单链表

27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 思路一&#xff1a; 创建新的数组&#xff0c;遍历原数组&#xff0c;将不为val的值放到新数组当中。空间复杂度不为O(1) 思路二&#xff1a;双指针法 我们设置两个指针src&#xff08;源数据&#xff09;和dst&#xf…

STM32G030F6P6 HSE时钟不能使用无源晶振,只能使用有源晶振!

STM32G030F6P6 HSE时钟不能使用无源晶振&#xff0c;只能使用有源晶振。 参见STM32CubeMX配置 使能RCC中 BYPASS CLOCK SOURCE后只有一个 PC14引脚。 查手册中 5.2.1 HSE clock章节 部分引脚少的封装&#xff0c;HSE时钟只有 OSC-IN&#xff0c;因此只能使用有源晶振 查Data…

mybatis-动态sql

动态sql 1、if标签2、where标签3、trim标签4、set标签5、choose when otherwise6 、foreach6.1 用in来删除6.2 用or来删除6.3 批量添加 7、 sql标签与include标签 1、if标签 需求&#xff1a;多条件查询。 可能的条件包括&#xff1a;品牌&#xff08;brand&#xff09;、指导…

SQL注入sqli_labs靶场第二题

解题思路与第一题相同 ?id1 and 11 和?id1 and 12进行测试如果11页面显示正常和原页面一样&#xff0c;并且12页面报错或者页面部分数据显示不正常&#xff0c;那么可以确定此处为数字型注入。 联合查询&#xff1a; 猜解列名数量&#xff1a;3 ?id1 order by 4 判断回显…

20240410解决OK3588-C的核心板刷机之后无法启动的问题

20240410解决OK3588-C的核心板刷机之后无法启动的问题 2024/4/10 19:38 1、编译OK3588的LINUX/Buildroot&#xff1f;forlinxubuntu: ~/3588/OK3588_Linux_fs$ sudo ./build.sh BoardConfig-linuxfs-ok3588.mk 2、进行全编译 forlinxubuntu: ~/3588/OK3588_Linux_fs$ sudo ./bu…

ArrayList中多线程的不安全问题

ArrayList中的不安全问题 正常的输出 List<String> list Arrays.asList("1","2","3"); list.forEach(System.out::println);为什么可以这样输出&#xff0c;是一种函数是接口&#xff0c;我们先过个耳熟 Arrys.asList是返回一个ArrayL…

ruoyi-nbcio-plus基于vue3的flowable的自定义业务提交申请组件的升级修改

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://122.227.135.243:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a…

nandgame中的Tokenize(标记化)

题目说明&#xff1a; "Tokenize" "标记化"标记器预先配置为识别数字和符号 。请配置标记器以额外识别符号减号 - 和括号 ( 和 )。您可以编辑源代码区域中的代码以测试它的标记化。level help 我们将构建一种高级编程语言。 高级语言具有更加人性化和灵…

Android 输入法框架

输入法属于输入系统的一部分&#xff0c;区别于输入系统只能向系统产生时间&#xff0c;输入法能向系统输入具体的内容&#xff0c;下面来认识输入法的大体框架&#xff0c;以下内容参考清华大学出版社出版的《Android图形显示系统》。 输入法框架包含3个组件&#xff0c;各组件…

AI智能滤镜解决方案,全新的视觉创作体验

一张精美的图片&#xff0c;一段引人入胜的视频&#xff0c;往往能够瞬间抓住观众的眼球&#xff0c;为企业带来不可估量的商业价值。然而&#xff0c;如何快速、高效地制作出高质量的视觉内容&#xff0c;一直是困扰众多企业的难题。美摄科技凭借其领先的AI智能滤镜解决方案&a…

处理慢查询时使用explain一般看哪些字段

explain之后会出现这些&#xff0c;一般就只看下面这几个字段 select_type就是查询类型&#xff0c;在我司的业务里基本上用的都是简单查询&#xff0c;在内存中处理逻辑&#xff0c;复杂查询的话排查问题比较麻烦&#xff0c;引起慢查询还会拖累数据库&#xff0c;数据库里还…

MySQL学习笔记(数据类型, DDL, DML, DQL, DCL)

Learning note 1、前言2、数据类型2.1、数值类型2.2、字符串类型2.3、日期类型 3、DDL总览数据库/表切换数据库查看表内容创建数据库/表删除数据库/表添加字段删除字段表的重命名修改字段名&#xff08;以及对应的数据类型&#xff09; 4、DML往字段里写入具体内容修改字段内容…

Celery使用异步、定时任务使用

一、什么是Celery 1.1、celery是什么 Celery是一个简单、灵活且可靠的&#xff0c;处理大量消息的分布式系统&#xff0c;专注于实时处理的异步任务队列&#xff0c;同时也支持任务调度。 Celery的架构由三部分组成&#xff0c;消息中间件&#xff08;message broker&#xf…

谈谈功率IC巨头—士兰微

大家好&#xff0c;我是砖一。 今天给大家分享一下士兰微电子公司&#xff0c;&#xff0c;有做功率元器件&开关电源和IC的朋友可以了解一下&#xff0c;希望对你有用~ 1 公司介绍 士兰微电子成立于1997年&#xff0c;于2003年上市&#xff0c;总部位于杭州&#xff0c;…