typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_management

sudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';

// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');

// 创建Channel
const channel = await connection.createChannel();

// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {
  // 设置死信交换机
  deadLetterExchange: 'my_dead_letter_exchange'
});

// 消费消息
channel.consume(queueName, (msg) => {
  // 处理消息
  if (msg) {
    // 处理失败,拒绝消息并将其重新放回队列
    // channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列
    // 处理失败,拒绝消息
    channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列
     // or 处理失败,拒绝消息并将其重新放回死信队列
    channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息
  }
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {
  expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {
  maxLength: 100, // 设置最大队列长度为100
  deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=password

rabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    const exchange = 'routing_exchange';
    // 定义 dlx-exchange
    const dlxExchangeName = 'dlx-exchange';


    // 声明交换机
    await channel.assertExchange(exchange, 'direct', {durable: true});
    await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失
    const dlxqueueBindings= [
        {
            dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',
        },
        {
            dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'
        }
    ];
    for (const binding of dlxqueueBindings) {

        // 绑定延迟死信队列
        await channel.assertQueue(binding.dlxQueueName );
        //死信交换机和死信队列绑定 Routing key fast 的消息
        await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机
    }


    // 定义队列和路由键的映射
    const queueBindings = [
        {
            queue: '3_second_queue', routingKey: 'fast', arguments: {
                'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        },
        {
            queue: '8_second_queue', routingKey: 'slow', arguments: {
                'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        }
    ];

    // 声明队列,并将队列绑定到交换机上
    for (const binding of queueBindings) {
        await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});
        await channel.bindQueue(binding.queue, exchange, binding.routingKey);
    }


    for (let i = 0; i < 10; i++) {
        await new Promise((resolve) => {
            setTimeout(() => {
                resolve(1)
            }, 1000)
        })
        const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
        console.log('当前中国时间:', chinaTime);
        // 发送消息到交换机,并设置不同的路由键
        await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);
        await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);

    }

    // 关闭连接
    setTimeout(async () => {
        await channel.close();
        await connection.close();
    }, 10000); //10 秒后关闭连接
}


async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {
    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}

setupRouting().catch(console.error);

//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    let queue = 'dlx-3_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });


}


setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    let queue = 'dlx-8_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });


}


setupRouting().catch(console.error);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/498050.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RTOS线程切换的过程和原理

0 前言 RTOS中最重要的一个概念就是线程&#xff0c;线程的按需切换能够满足RTOS的实时性要求&#xff0c;同时能将复杂的需求分解成一个个线程执行减轻我们开发负担。 本文从栈的角度出发&#xff0c;详细介绍RTOS线程切换的过程和原理。 注&#xff1a;本文参考的RTOS是RT-T…

Typora字数过多的时候造成卡顿现象如何解决?

Typora字数过多的时候造成卡顿现象如何解决&#xff1f; 点击 、切换、滚动、打字都有点卡顿&#xff0c;下面介绍三种方法&#xff0c;三种方法都可以尝试&#xff0c;建议先尝试方法一&#xff0c;效果不满意就用方法二&#xff0c;实在不行就最后一个取巧的办法。 方法1&a…

Unity TMP 使用教程

文章目录 1 导入资源包2 字体制作3 表情包制作4 TMP 控件4.1 属性4.2 富文本标签 1 导入资源包 “Window -> TextMeshPro -> Import TMP Essential Resources”&#xff0c;导入完成后会创建一个名为"TextMehs Pro"的文件夹&#xff0c;这里面包含所需要的资源…

Maya 2024 for Mac/Win:重塑三维创意世界的利器

在数字化浪潮汹涌的当下&#xff0c;三维图形软件早已成为创意产业不可或缺的重要工具。而在这其中&#xff0c;Maya 2024以其卓越的性能和丰富的功能&#xff0c;赢得了无数设计师的青睐。无论是Mac还是Win平台&#xff0c;Maya 2024都能为您的三维创作提供强大的支持。 Maya…

Docker部署MinIO对象存储服务

1. 拉取MinIO镜像 # 下载镜像 docker pull minio/minio#查看镜像 docker images2. 创建目录 # 文件存储目录 mkdir -p /opt/minio/data# 配置文件 mkdir -p /opt/minio/config# 日志文件 mkdir -p /opt/minio/logs3. 创建Minio容器并运行 docker run \ -p 9000:9000 \ -p 90…

ES学习日记(二)-------集群设置

上一节写了elasticsearch单节点安装和配置,现在说集群,简单地说就是在多台服务器上搭建单节点,在配置文件里面增加多个ip地址即可,过程同单节点部署,主要说集群配置 注意:不建议在之前单节点es上修改配置为集群,据说运行之后会生成很多文件,在单点基础上修改容易出现未知问题,…

zedboard+AD9361 运行 open WiFi

先到github上下载img&#xff0c;网页链接如下&#xff1a; https://github.com/open-sdr/openwifi?tabreadme-ov-file 打开网页后下载 openwifi img 用win32 Disk lmager 把文件写入到SD卡中&#xff0c;这一步操作会把SD卡重新清空&#xff0c;注意保存数据。这个软件我会…

最小可行产品需要最小可行架构——可持续架构(三)

前言 最小可行产品&#xff08;MVP&#xff09;的概念可以帮助团队专注于尽快交付他们认为对客户最有价值的东西&#xff0c;以便在投入大量时间和资源之前迅速、廉价地评估产品的市场规模。MVP不仅需要考虑产品的市场可行性&#xff0c;还需要考虑其技术可行性&#xff0c;以…

【JavaWeb】Day24.Web入门——HTTP协议(一)

HTTP协议——概述 1.介绍 HTTP&#xff1a;Hyper Text Transfer Protocol(超文本传输协议)&#xff0c;规定了浏览器与服务器之间数据传输的规则。 http是互联网上应用最为广泛的一种网络协议http协议要求&#xff1a;浏览器在向服务器发送请求数据时&#xff0c;或是服务器在…

Oracle存数字精度问题number、binary_double、binary_float类型

--表1 score是number(10,5)类型 create table TEST1 (score number(10,5) ); --表2 score是binary_double类型 create table TEST2 (score binary_double ); --表3 score是binary_float类型 create table TEST3 (score binary_float );实验一&#xff1a;分别往三张表插入 小数…

Redis开源协议变更!Garnet:微软开源代替方案?

Garnet&#xff1a;微软开源的高性能替代方案&#xff0c;秉承兼容 RESP 协议的同时&#xff0c;以卓越性能和无缝迁移能力重新定义分布式缓存存储&#xff01; - 精选真开源&#xff0c;释放新价值。 概览 最近&#xff0c;Redis修改了开源协议&#xff0c;从BSD变成了 SSPLv…

青龙脚本 猫猫看看

话不多说开图 https://raw.githubusercontent.com/Huansheng1/my-qinglong-js/main/%E7%8C%AB%E7%8C%AB%E7%9C%8B%E7%9C%8B.py

探索Python人工智能在气象监测中的创新应用

Python是功能强大、免费、开源&#xff0c;实现面向对象的编程语言&#xff0c;在数据处理、科学计算、数学建模、数据挖掘和数据可视化方面具备优异的性能&#xff0c;这些优势使得Python在气象、海洋、地理、气候、水文和生态等地学领域的科研和工程项目中得到广泛应用。可以…

Jupyter安装教程(Windows 版)

这几年AI人工智能这么火&#xff0c;陆陆续续诞生了很多新的产品&#xff0c;新的商业模式&#xff0c;随着Open-sora 1.0开源之后&#xff0c;让我更加地相信GPT5也即将要到来了&#xff0c;看来不学机器学习和深度学习&#xff0c;恐怕是要跟不上时代了。于是就想着今年开始接…

【管理咨询宝藏59】某大型汽车物流战略咨询报告

本报告首发于公号“管理咨询宝藏”&#xff0c;如需阅读完整版报告内容&#xff0c;请查阅公号“管理咨询宝藏”。 【管理咨询宝藏59】某大型汽车物流战略咨询报告 【格式】PDF 【关键词】HR调研、商业分析、管理咨询 【核心观点】 - 重新评估和调整商业模式&#xff0c;开拓…

智能设备配网保姆级教程

设备配网 简单来说&#xff0c;配网就是将物联网&#xff08;IoT&#xff09;设备连接并注册到云端&#xff0c;使其拥有与云端远程通信的能力。配网后&#xff0c;智能设备才能被手机应用或者项目管理后台控制&#xff0c;依托于智能场景创造价值。本文介绍了配网的相关知识&…

【分享】CMMI V3.0版本做了哪些改变?哪些企业适合申请CMMI3.0

​ CMM是由美国卡内基梅隆大学软件工程研究所1987年开发成功的&#xff0c;它基于过去所有软件工程过程改进的成果&#xff0c;吸取了以往软件工程的经验教训&#xff0c;提供了一个基于过程改进的框架&#xff1b;CMMI(Capability Maturity Model Integration能力成熟度模型集…

代码随想录算法训练营第三十六天|435. 无重叠区间,763. 划分字母区间

435. 无重叠区间 题目 给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 示例 1: 输入: intervals [[1,2],[2,3],[3,4],[1,3]] 输出: 1 解释: 移除 [1,3] 后&#xff0c;剩下…

文献学习(自备)

收官大作&#xff0c;多组学融合的新套路发NC&#xff01;&#xff01; - 知乎 (zhihu.com) Hofbauer cell function in the term placenta associates with adult cardiovascular and depressive outcomes | Nature Communications 病理性胎盘炎症会增加几种成人疾病的风险&a…

Linux——信号的保存与处理

目录 前言 一、信号的常见概念 1.信号递达 2.信号未决 3.信号阻塞 二、Linux中的递达未决阻塞 三、信号集 四、信号集的处理 1.sig相关函数 2.sigprocmask()函数 3.sigpending()函数 五、信号的处理时机 六、信号处理函数 前言 在之前&#xff0c;我们学习了信号…