RabbitMQ-死信队列(golang)

1、概念

      死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。

2、死信产生的原因

文心一言的回答如下:

  1. 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
  2. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
  3. 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
  4. 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。

总结来说,主要原因就三个,消息被拒绝、消息过期、队列满

在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。

3、死信队列使用实践

3.1 消息过期

设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。

关键点:设置正常队列属性,ttl5s过期:

// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

全部代码如下: 

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

队列信息包括绑定的死信队列信息、ttl等信息如下:

运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。

3.2 队列满

当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。

设置队列长度属性代码如下:

args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

队列属性如下:

发送两条信息:

 继续发送第三个:

 测试代码:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

3.3 消息被拒绝

       消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。

客户端函数:ch.Nack,函数原型:

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
	ch.m.Lock()
	defer ch.m.Unlock()

	return ch.send(&basicNack{
		DeliveryTag: tag,
		Multiple:    multiple,
		Requeue:     requeue,
	})
}

入参含义如下: 

tag

这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 ch.Ackch.Nack 或 ch.Reject 时,必须提供这个标识符,以便RabbitMQ知道是对哪条消息进行确认或拒绝。

multiple

这是一个布尔值(bool),用于指示是否应该同时确认(或拒绝)多条消息。如果设置为 true,则RabbitMQ将认为从上一个被确认的消息开始(包括该消息),直到当前消息为止的所有未确认消息都被拒绝。这通常用于批量处理消息确认,但在使用 ch.Nack 时,它的作用更多是关于是否应该重新排队当前消息之后的消息(取决于RabbitMQ的配置和消息的属性)。

requeue

这也是一个布尔值(bool),用于指示被拒绝的消息是否应该被重新放入队列的末尾以便稍后重试。如果设置为 true,则消息将被重新排队;如果设置为 false,则消息将被丢弃(或者根据RabbitMQ的配置可能被发送到死信队列,如果配置了的话)。

测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:

之后使用如下代码进行消费:

package main

import (
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}

	// 声明正常队列
	// q, err := ch.QueueDeclare(
	// 	"normal_queue", // name
	// 	true,           // durable
	// 	false,          // delete when unused
	// 	false,          // exclusive
	// 	false,          // no-wait
	// 	nil,            // arguments
	// )
	// if err != nil {
	// 	fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
	// 	return
	// }
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		"normal_queue", // queue name
		"normal_queue", // routing key
		"my_exchange",  // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	msgs, _ := ch.Consume(
		"normal_queue", // queue
		"",             // consumer
		false,          // auto-ack
		true,           // exclusive
		false,          // no-local
		false,          // no-wait
		nil,            // args
	)
	go func() {
		for d := range msgs {
			// 模拟处理失败,全部放入死信队列
			ch.Nack(d.DeliveryTag, false, false)
		}
	}()
	time.Sleep(10 * time.Second)
}

运行代码后,3条消息全部进入到死信队列中:

 4、总结

      RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916323.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redisson的可重入锁

初始状态: 表示系统或资源在没有线程持有锁的情况下的状态,任何线程都可以尝试获取锁。 线程 1 获得锁: 线程 1 首次获取了锁并进入受保护的代码区域。 线程 1 再次请求锁: 在持有锁的情况下,线程 1 再次请求锁&a…

java程序打包及执行 jar命令及运行jar文件

java程序打包及执行 jar命令及运行jar文件 打包命令: 安装完成jdk之后采用 jar命令进行打包 jar -cvfe ddd.jar -C bin/ddd.java 打包 ddd.java 文件 jar -cvfe dddd.jar -C . 注意 -C 后面的点. 表示当前目录下所有 如图: 运行jar 文件 java -class…

视频孪生技术在金融银行网点场景中的应用价值

作为国民经济重要的基础行业,金融行业在高速发展的同时衍生出业务纠纷、安全防范、职能管理等诸多问题,对安全防范和监督管理提出了更高的要求。因此,如何能更好的利用视频监控系统价值,让管理人员更简便的浏览监控视频、更快速的…

SpringCloud OpenFeign负载均衡远程调用 跨服务调用 连接池优化

介绍 Spring Cloud OpenFeign 是 Spring Cloud 的一部分,提供了一种声明式的 HTTP 客户端方式来简化服务间的通信。通过 OpenFeign,开发者可以像调用本地方法一样,轻松地调用远程服务,而不需要手动处理 HTTP 请求、响应和连接等底…

Go语言实现用户登录Web应用

文章目录 1. Go语言Web框架1.1 框架比较1.2 安装Gin框架 2. 实现用户登录功能2.1 创建项目目录2.2 打开项目目录2.3 创建登录Go程序2.4 创建模板页面2.4.1 登录页面2.4.2 登录成功页面2.4.3 登录失败页面 3. 测试用户登录项目3.1 运行登录主程序3.2 访问登录页面3.3 演示登录成…

2024算法基础公选课练习三(DFS1)(1)

一、前言 dfs是初学者的重点,也是难点,这次的有些题目也不好写。题目有点多,因此分成(1)和(2) 二、题目总览 三、具体题目 3.1 问题 A: 贪心——排队接水 思路 贪心,把接水时间短…

数据库参数备份

MySQL #!/bin/bash # 获取当前日期和时间的时间戳 TIMESTAMP$(date "%Y%m%d-%H%M%S")# 0、创建目录 mkdir /tmp/parameter_$TIMESTAMP/# 1、获取所有命名空间 echo "1、获取所有命名空间" NAMESPACES$(kubectl get ns | grep qfusion- | grep -v qfusion-…

Kconfig 知道的!与不知道的?

1 Kconfig 的重要性 Kconfig 是 Linux 内核配置系统的重要工具,它通过定义和管理配置选项,使开发者能够灵活地调整内核模块。无论是精简内核以适配嵌入式系统,还是为桌面应用扩展功能,Kconfig 都在其中扮演着关键角色。本文将带领…

CCI3.0-HQ:用于预训练大型语言模型的高质量大规模中文数据集

摘要 我们介绍了 CCI3.0-HQ,它是中文语料库互联网 3.0(CCI3.0)的一个高质量500GB子集,采用新颖的两阶段混合过滤管道开发,显著提高了数据质量。为了评估其有效性,我们在不同数据集的100B tokens上从头开始…

rhcsa笔记二

普通文件的创建 touch命令的使用 touch 文件名 (文件路径) linux不是用后缀区分文件类型的,而是用ll出现的第一个字符区分文件类型的 -:普通文件 d:目录文件 [rootserver ~]# stat /etc/hostname 文件:/etc/hos…

微澜:用 OceanBase 搭建基于知识图谱的实时资讯流的应用实践

本文作者: 北京深鉴智源科技有限公司架构师 郑荣凯 本文整理自北京深鉴智源科技有限公司架构师郑荣凯,在《深入浅出 OceanBase 第四期》的分享。 知识图谱是一项综合性的系统工程,需要在在各种应用场景中向用户展示经过分页的一度关系。 微…

探索Python文档自动化的奥秘:`python-docx`库全解析

文章目录 探索Python文档自动化的奥秘:python-docx库全解析1. 背景:为何选择python-docx?2. python-docx是什么?3. 如何安装python-docx?4. 简单库函数使用方法创建文档添加段落添加标题添加表格插入图片 5. 应用场景自…

测试实项中的偶必现难测bug--一键登录失败

问题描述:安卓和ios有出现部分一键登录失败的场景,由于场景比较极端,衍生了很多不好评估的情况。 产生原因分析: 目前有解决过多次这种行为的问题,每次的产生原因都有所不同,这边根据我个人测试和收集复现的情况列举一些我碰到的: 1、由于我们调用的是友盟的一键登录的…

Vue的基础使用

一、为什么要学习Vue 1.前端必备技能 2.岗位多,绝大互联网公司都在使用Vue 3.提高开发效率 4.高薪必备技能(Vue2Vue3) 二、什么是Vue 概念:Vue (读音 /vjuː/,类似于 view) 是一套 构建用户界面 的 渐进式 框架…

vue3使用VueQuill插入自定义按钮

在 Vue 3 项目中使用 VueQuill 编辑器时,我们可以自定义内容来满足特定的需求。本文将介绍如何在 VueQuill 中插入自定义内容,比如插入特定的标签或样式元素。 Quill官方中文文档 1. 项目设置和依赖安装 如果你还没有创建 Vue 3 项目,可以…

【springboot使用sqlite数据库】Java后台同时使用mysql、sqlite

环境:根据业务的需要,老版程序使用的数据库是sqlite,版本升级成前后台分离模式,因此需要兼容mysql与sqlite数据库同时使用。 pom.xml设置: application.yml文件配置: mapper.java文件: service.…

多智能体深度确定性策略梯度(MADDPG)算法复现教程

复现软硬件: Ubunru20.04,Python 3.8.10, torch 2.4.1, gym 0.10.5,VScode 论文: http://arxiv.org/pdf/1706.02275 环境: GitHub - openai/multiagent-particle-envs: Code for a multi-agent particle environment used in the paper "Multi-Agent Actor-…

51c大模型~合集42

我自己的原文哦~ https://blog.51cto.com/whaosoft/11859244 #猎户座 「草莓」即将上线,OpenAI新旗舰大模型曝光,代号「猎户座」 ChatGPT 要进化了? 本月初,OpenAI 创始人、CEO 山姆・奥特曼突然在 X 上发了一张照片&#xff0…

探索Copier:Python项目模板的革命者

文章目录 **探索Copier:Python项目模板的革命者**1. 背景介绍:为何Copier成为新宠?2. Copier是什么?3. 如何安装Copier?4. 简单库函数使用方法4.1 创建模板4.2 从Git URL创建项目4.3 使用快捷方式4.4 动态替换文本4.5 …

系统掌握大语言模型提示词 - 从理论到实践

以下是我目前的一些主要个人标签: 6 年多头部大厂软件开发经验;1 年多 AI 业务应用经验,拥有丰富的业务提示词调优经验和模型微调经验。信仰 AGI,已经将 AI 通过自定义 Chatbot /搭建 Agent 融合到我的工作流中。头部大厂技术大学…