如何保证消息不丢之MQ重试机制消息队列

1. 简介

死信队列,简称:DLXDead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另外一个交换机,这个交换机就是DLX

在这里插入图片描述

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

死信队列有什么用呢?

  1. 取消订单(比如下单30分钟后未付款,则取消订单,回滚库存),或者新用户注册,隔段时间进行短信问候等。
  2. 将消费者拒绝的消息发送到死信队列,然后将消息进行持久化,后续可以做业务分析或者处理。

2. TTL

因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示

TTL :Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列。
如果两者都进行了设置,以时间短的为准。

3. 利用死信队列的机制, 带有重试队列机制的消费队列的流程图:

在这里插入图片描述

动态绑定队列和重试队列:

/**
 * 遍历所有的 枚举队列 手动注册队列等相关bean到spring容器中
 */
@Component
public class QueueAutoRegisterAware implements BeanFactoryAware {

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
        for (QueueDeclareEnum value : QueueDeclareEnum.values()) {
            // 正常消息 队列 交换器 路由键绑定
            Queue queue = new Queue(value.getQueueName());
            DirectExchange directExchange = new DirectExchange(value.getExchangeName());
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(value.getRoutingKey());
            // 注册bean
            defaultListableBeanFactory.registerSingleton(value.getQueueName(), queue);
            defaultListableBeanFactory.registerSingleton(value.getExchangeName(), directExchange);
            defaultListableBeanFactory.registerSingleton(value.getRoutingKey(), binding);


            // TODO 重试队列 交换器 路由键(暂时没有处理, 后续需要用到mq机制则可以进行加入AOP切面重试机制)
            if (StringUtil.isNotBlank(value.getRetryQueueName()) && StringUtil.isNotBlank(value.getRetryRoutingKey())) {
                Map<String, Object> dlqParamMap = new HashMap<>(2);
                dlqParamMap.put("x-dead-letter-exchange", value.getExchangeName());
                dlqParamMap.put("x-dead-letter-routing-key", value.getRoutingKey());
                Queue retryQueue = new Queue(value.getRetryQueueName(), true, false, false, dlqParamMap);
                Binding retryBindIng = BindingBuilder.bind(retryQueue).to(directExchange).with(value.getRetryRoutingKey());

                defaultListableBeanFactory.registerSingleton(value.getRetryQueueName(), retryQueue);
                defaultListableBeanFactory.registerSingleton(value.getRetryRoutingKey(), retryBindIng);
            }
        }
    }
}

Aop异常切面,拦截异常把设置消息的重试次数和ttl过期时间,发送重试队列中;

/**
 * RabbitMQ监听器 切面
 * 异常消息发送到重试队列
 */
@Aspect
@Component
@Slf4j
@AllArgsConstructor
public class RabbitListenerAspect {


	private final RabbitTemplate rabbitTemplate;

	private final IFailureService failureService;

	private final TmsProperties tmsProperties;

	private final IOrderRequestService orderRequestService;


	@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
	public void pointCut() {
	}


	@Around("pointCut()")
	public Object around(ProceedingJoinPoint joinPoint) {
		log.debug("MQ切面接收到消息");
		AtomicReference<Message> messageReference = new AtomicReference<>();
		Arrays.stream(joinPoint.getArgs()).forEach(arg -> {
			if (arg instanceof Message) {
				messageReference.set((Message) arg);
			}
		});
		Message message = messageReference.get();
		if (message == null) {
			log.debug("mq消息内容为空,不执行相关操作");
			return null;
		}
		Object proceed;
		// 消费出现异常时 会根据配置发送到重试队列
		try {
			//绑定租户
			String tenantId = message.getMessageProperties().getHeader(ForecastConstant.TENANT_ID);
			if (tmsProperties.isTenantApp()) {
				TmsTenantUtil.bindId(tenantId);
			}
			proceed = joinPoint.proceed();
			if (tmsProperties.isTenantApp()) {
				TmsTenantUtil.unbind();
			}
		} catch (ForecastException e) {
			log.info("预报失败,直接回传失败信息", e);
			this.forecastMqExceptionDeal(message, e);
			return null;
		} catch (Throwable throwable) {
			log.error("消费失败,异常信息:{}", throwable);
			this.sendToRetryQueue(message, throwable);
			return null;
		}
		return proceed;
	}

	/**
	 * 发送到重试队列
	 *
	 * @param message 消息内容
	 */
	private void sendToRetryQueue(Message message, Throwable throwable) {
		sendToRetryQueue(message, throwable, true);
	}


	/**
	 * 发送到重试队列
	 *
	 * @param message 消息内容
	 */
	private void sendToRetryQueue(Message message, Throwable throwable, Boolean addRetryCount) {
		String consumerQueue = message.getMessageProperties().getConsumerQueue();
		// 重试次数 默认0
		Optional<QueueDeclareEnum> first = Arrays.stream(QueueDeclareEnum.values()).filter(value ->
			value.getQueueName().equalsIgnoreCase(consumerQueue)).findFirst();
		if (!first.isPresent()) {
			return;
		}
		QueueDeclareEnum queueDeclareEnum = first.get();
		String retryQueueName = queueDeclareEnum.getRetryQueueName();
		String retryRoutingKey = queueDeclareEnum.getRetryRoutingKey();
		if (StringUtil.isEmpty(retryQueueName) || StringUtil.isEmpty(retryRoutingKey)) {
			log.info("当前队列没有配置重试队列 不进行重试,队列名:{}", queueDeclareEnum.getQueueName());
			return;
		}
		Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().
			getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);
		//有效的操作 重试次数才+1
		if (addRetryCount) {
			retryCount++;
		}
		if (retryCount == 1) {
			try {
				failureService.deal(message, throwable);
			} catch (Exception e) {
				log.error("失败最大次数处理异常", e);
			}
		}
		if (retryCount > queueDeclareEnum.getMaxRetryCount()) {
			log.info("当前消息超过队列配置最大重试次数,不进行重试,队列名:{}", queueDeclareEnum.getQueueName());
			return;
		}
		//如果是手动重试的 则不进入重试队列
		if (message.getMessageProperties().getHeader(ForecastConstant.IS_HAND)) {
			return;
		}
		Message retryMessage = MessageBuilder.fromMessage(message).setHeader(ForecastConstant.RETRY_COUNT, retryCount).build();
		this.convertAndRetry(retryMessage, queueDeclareEnum);
	}

	/**
	 * 预报失败处理
	 */
	private void forecastMqExceptionDeal(Message message, ForecastException mqException) {
		switch (mqException.getErrorCode()) {
			case ERROR_CODE_1:
				OrderEntityBO orderEntityBO = (OrderEntityBO) mqException.getData();
				//1.回传失败
				failureService.deal(message, mqException);
				//2.塞到重试任务中去 第二天0时执行
				orderRequestService.saveEntity(new OrderTaskEntity(orderEntityBO.getOrder().getCode(),
					OrderRequestTypeEnum.RETRY_FORECAST, getOperateTime()));
				//3.记录到redis中
				redisDeal(orderEntityBO);
				break;
			case ERROR_CODE_2:
				//不增加重试次数
				sendToRetryQueue(message, mqException, false);
				break;
			case ERROR_CODE_3:
				Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().
					getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);
				if (retryCount == 3) {
					WXCallUtil.call(mqException.getCallMessage());
				}
				this.sendToRetryQueue(message, mqException);
			default:
		}
	}

	private void redisDeal(OrderEntityBO orderEntityBO) {
		String channelCode = orderEntityBO.getOrder().getChannelCode();
		String country = orderEntityBO.getReceiver().getCountryCode();
		String ruleName = country + "_" + channelCode;
		List<String> list = TmsRedisUtil.get(ForecastRedisConstant.CHANNEL_LIMIT_DATA);

		if (ObjectUtil.isEmpty(list)) {
			list = new ArrayList<>();
		}
		if (list.contains(ruleName)) {
			return;
		}
		list.add(ruleName);
		TmsRedisUtil.set(ForecastRedisConstant.CHANNEL_LIMIT_DATA, list);
		TmsRedisUtil.expireAt(ForecastRedisConstant.CHANNEL_LIMIT_DATA, getSecondDayZero());
	}

	/**
	 * 获取第二天0时 date
	 *
	 * @return
	 */
	private Date getSecondDayZero() {
		Instant instant = new Date().toInstant();
		return DateUtil.beginOfDay(Date.from(instant.plus(Duration.ofDays(1))));
	}

	/**
	 * 避免服务器时间不同步 向后兼容5分钟
	 *
	 * @return
	 */
	private Date getOperateTime() {
		//如果当前时间小于0时5分 则下次重试时间设为当天零时
		Date date = DateUtil.beginOfDay(new Date());
		if (System.currentTimeMillis() - date.getTime() < 5 * 60 * 1000) {
			return date;
		}
		return getSecondDayZero();
	}


	/**
	 * 发送消息到重试队列
	 *
	 * @param message          重试消息
	 * @param queueDeclareEnum 队列枚举
	 */
	public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum) {
		String ttlTime = String.valueOf(queueDeclareEnum.getTtlTime() * (Integer) message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT));
		message.getMessageProperties().setExpiration(ttlTime);
		rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);
	}

	/**
	 * 发送消息到重试队列
	 *
	 * @param message          重试消息
	 * @param queueDeclareEnum 队列枚举
	 */
	public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum, String ttlTime) {
		message.getMessageProperties().setExpiration(ttlTime);
		rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/441499.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

全球科技创新领域大检阅“2024上海国际智能科技及创新展览会”

随着科技的飞速发展&#xff0c;创新成为了推动社会进步的核心动力。在这样的背景下&#xff0c;“2024上海国际科技及创新展览会”应运而生&#xff0c;旨在汇聚全球智能科技领域的精英&#xff0c;共同展示最新的科技成果&#xff0c;探讨未来的发展方向。 本次展会将于2024年…

Alveo 概念拓扑结构

在 Alveo 加速卡中,涉及到的概念拓扑结构主要包括 Alveo 卡上的各个关键组件以及与主机系统之间的通信结构。以下是对这些概念拓扑结构的简要介绍: 1.DDR 即双数据率内存(Double Data Rate memory),是一种常见的计算机内存类型,用于存储和提供处理器所需的数据和指令。…

HBase安装,配置,启动,检查

目录: 一、HBase安装&#xff0c;配置 1、下载HBase安装包 2、解压&#xff0c;配置环境变量并激活 3、hbase 配置 4、将hadoop和zookeeper的配置文件创建软连接放在hbase配置目录 5、配置 regionserver 二、HBase启动与关闭&#xff0c;安装检验 1、启动关闭hbase的命令 2、 检…

mac本地启动sentinel

启动Sentinel控制台 1&#xff09;下载sentinel控制台jar包 https://github.com/alibaba/Sentinel/releases/download/1.8.6/sentinel-dashboard-1.8.6.jar 2&#xff09;启动sentinel控制台 使用如下命令启动控制台&#xff1a; java -Dserver.port8080 -Dcsp.sentinel.d…

基于单片机的红外测距仪设计

目 录 摘 要 I Abstract II 引 言 1 1 控制系统设计 3 1.1 主控制器选择 3 1.2 项目总体设计 3 2 项目硬件设计 5 2.1 单片机控制模块 5 2.2 测距模块设计 9 2.3 液晶显示模块 10 2.4 报警模块 11 3 项目软件设计 12 3.1 软件开发环境 12 3.2 系统主程序设计 13 3.3 LCD显示程…

数智化时代的新潮流:企业如何利用数据飞轮驱动增长?_光点科技

随着数据中台理念的逐渐“降温”&#xff0c;企业数智化的探索并未停歇。反而&#xff0c;数据飞轮成为了新的焦点&#xff0c;它承诺为企业带来更紧密的业务与数据结合&#xff0c;从而推动持续的增长。本文将探讨企业如何利用数据飞轮的概念&#xff0c;赋能业务&#xff0c;…

Spark 核心API

核心 API spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。 Transformation 转化类的算子是最多的&#xff0c;学会使用这些算子就应付多数的数…

勒索软件事件手册:综合指南

近年来&#xff0c;勒索软件攻击的频率和复杂程度都急剧增加。这些攻击的影响可能是毁灭性的&#xff0c;从经济损失到严重的运营中断。 这就是为什么对于希望防范这种网络安全威胁的企业来说&#xff0c; 强大的勒索软件事件响应手册是不可谈判的。 本指南旨在深入了解勒索软…

【工作实践-07】uniapp关于单位rpx坑

问题&#xff1a;在浏览器页面退出登录按钮上“退出登录”字样消失&#xff0c;而在手机端页面正常;通过查看浏览器页面的HTML代码&#xff0c;发现有“退出登录”这几个字&#xff0c;只不过由于样式问题&#xff0c;这几个字被挤到看不见了。 样式代码中有一行为&#xff1a…

UI自动化测试使用场景及脚本录制

经常有人会问&#xff0c;什么样的项目才适合进行UI自动化测试呢&#xff1f;UI自动化测试相当于模拟手工测试&#xff0c;通过程序去操作页面上的控件。而在实际测试过程中&#xff0c;经常会遇到无法找到控件&#xff0c;或者因控件定义变更而带来的维护成本等问题。 哪些场…

设计高并发系统的关键策略

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一. 架构设计 1. 微服务架构 2. 分布式架构 3. 负…

VR全景技术在VR看房中有哪些应用,能带来哪些好处

引言&#xff1a; 随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术在房地产行业中的应用也越来越广泛。其中&#xff0c;VR全景技术在VR看房中的运用尤为突出。今天&#xff0c;让我们一起深入探讨VR全景技术在VR看房中的应用及其带来的种种好处。 一、…

智慧灯杆-智慧城市照明现状分析(2)

作为城市照明的主体,城市道路照明伴随着我国城市建设的高速发展,获得了快速的增长。国家统计局数据显示,从2004年至2014年,我国城市道路照明灯数量由1053.15万盏增加到3000万盏以上,年均复合增长率超过11%,城市道路照明行业保持持续快速发展的趋势。 近几年,随着中国路灯…

钡铼技术R40工业路由器连接工业控制系统实现远程监控

钡铼技术的R40工业路由器是一款专为现代工业控制系统设计的高性能设备&#xff0c;它通过其先进的连接功能和丰富的接口&#xff0c;使得远程监控和管理成为可能。本文将从产品参数的角度出发&#xff0c;深入探讨R40工业路由器如何连接工业控制系统以实现远程监控。 1. R40工…

Windows 安装 Xinference

Windows 安装 Xinference 0. 引言1. 创建虚拟环境2. 安装 pytorch3. 安装 llama_cpp_python4. 安装 chatglm-cpp5. 安装 Xinference6. 设置 model 路径7. 启动 Xinference8. 查看 Cluster Information 0. 引言 Xorbits Inference&#xff08;Xinference&#xff09;是一个性能…

最新基于R语言lavaan结构方程模型(SEM)技术

原文链接&#xff1a;最新基于R语言lavaan结构方程模型&#xff08;SEM&#xff09;技术https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247596681&idx4&sn08753dd4d3e7bc492d750c0f06bba1b2&chksmfa823b6ecdf5b278ca0b94213391b5a222d1776743609cd3d14…

Leetcode3070. 元素和小于等于 k 的子矩阵的数目

Every day a Leetcode 题目来源&#xff1a;3070. 元素和小于等于 k 的子矩阵的数目 解法1&#xff1a;二维前缀和 二维前缀和的模板题。 代码&#xff1a; /** lc appleetcode.cn id3070 langcpp** [3070] 元素和小于等于 k 的子矩阵的数目*/// lc codestart// 二维前缀和…

Web3探索加密世界:什么是Web3钱包?

随着加密货币和区块链技术的发展&#xff0c;人们越来越多地开始探索Web3世界&#xff0c;这个世界以去中心化、安全和开放性为特征。在这个新兴的数字化领域中&#xff0c;Web3钱包成为了一个关键的概念和工具。但是&#xff0c;什么是Web3钱包&#xff1f;它有什么特点&#…

二、TensorFlow结构分析(3)

目录 1、张量 1.1 张量的类型 1.2 张量的阶 1.3 创建张量的指令 2、张量的变换 3、张量的数学运算 TF数据流图图与TensorBoard会话张量Tensor变量OP高级API 1、张量 1.1 张量的类型 1.2 张量的阶 def tensor_demo():# 张量的演示tensor1 tf.constant(4.0)tensor2 tf.co…

IPSEC VPN安全介绍以及相关实验

目录 一、IPSEC相关的安全服务 二、IPSEC的安全协议 三、实验 IPSEC一组协议集合&#xff0c;用于确保在IP网络上进行通信时的安全性和保密性。它提供了一种标准化的方法&#xff0c;用于对IP数据包进行加密、身份验证和完整性保护。IPSEC通常用于建立虚拟私人网络VPN连接&am…