RabbitMQ-核心特性

已经不需要为RabbitMQ交换机的离去而感到伤心了,接下来登场的是RabbitMQ-核心特性!!!

文章目录

  • 核心特性
    • 消息过期机制
    • 消息确认机制
    • 死信队列

核心特性

消息过期机制

官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了
适用场景:清理过期数据

1)给队列中的所有消息指定过期时间

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);

如果在过期时间内,还没有消费者取消息,消息才会过期
注意,如果消息已经接收到,但是没确认,是不会过期的
消费者中给队列中所有消息设置过期时间:

public class TtlConsumer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建队列,指定消息过期参数
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl", 5000);
        // args 指定参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, args);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定义了如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 消费消息,会持续阻塞
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

2)给某条消息指定过期时间

// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .expiration("1000")
        .build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

生产者给某条消息指定过期时间

public class TtlProducer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
//        factory.setUsername();
//        factory.setPassword();
//        factory.setPort();

        // 建立连接、创建频道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 发送消息
            String message = "Hello World!";

            // 给消息指定过期时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("1000")
                    .build();
            channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息确认机制

官方文档:https://www.rabbitmq.com/confirms.html
为保证消息成功被消费,rabbitmq提供了消息确认机制,当消费者收到消息后要给一个成功反馈:
●ack:消费成功
●nack:消费失败
●reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
image.png

        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {
        });

一般情况,建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:

image.png

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

第二个参数 multiple 批量确认:是指是否要一次性确认所有的历史消息直到当前这条消息
指定拒绝某条消息:
image.png

channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

第 3 个参数表示是否重新入队,可用于重试

死信队列

官方文档:https://www.rabbitmq.com/dlx.html
为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?
死信:过期的消息,拒收的消息,消息队列满了,处理失败的消息的统称
死信队列:专门处理死信的队列
死信交换机:专门给死信队列转发消息的交换机
示例场景:
image.png
实现:
1)创建死信交换机和死信队列,并且绑定关系
2)给失败之后需要容错处理的队列绑定死信交换机
3)可以给要容错的队列指定死信之后的转发规则,死信应该再转发到哪个死信队列
4)可以通过程序来读取死信队列中的消息,从而进行处理
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.Scanner;

public class DlxDirectProducer {
    //死信交换机
    private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";
    //工作交换机
    private static final String WORK_EXCHANGE_NAME = "direct2-exchange";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");

            // 创建laoban死信队列
            String queueName = "laoban_dlx_queue";
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");
            //创建waibao死信队列
            String queueName2 = "waibao_dlx_queue";
            channel.queueDeclare(queueName2, true, false, false, null);
            channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");

            DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 拒绝消息
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                System.out.println(" [laoban] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };

            DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 拒绝消息
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                System.out.println(" [waibao] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };

            channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {
            });
            channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {
            });


            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String userInput = scanner.nextLine();
                String[] strings = userInput.split(" ");
                if (strings.length < 1) {
                    continue;
                }
                String message = strings[0];
                String routingKey = strings[1];

                channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
            }

        }
    }
}

消费者代码:

在这里插入代码片package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

public class DlxDirectConsumer {

    private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";

    private static final String WORK_EXCHANGE_NAME = "direct2-exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");

        //小狗的死信要转发到waibao这个死信队列
        // 指定死信队列参数
        Map<String, Object> args = new HashMap<>();
        // 要绑定到哪个交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 指定死信要转发到哪个死信队列
        args.put("x-dead-letter-routing-key", "waibao");

        // 创建队列,随机分配一个队列名称
        String queueName = "xiaodog_queue";
        channel.queueDeclare(queueName, true, false, false, args);
        channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");
        //小猫的死信要转发到laoban这个死信队列
        Map<String, Object> args2 = new HashMap<>();
        args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        args2.put("x-dead-letter-routing-key", "laoban");

        // 创建队列,随机分配一个队列名称
        String queueName2 = "xiaocat_queue";
        channel.queueDeclare(queueName2, true, false, false, args2);
        channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            // 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            System.out.println(" [xiaodog] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            // 拒绝消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            System.out.println(" [xiaocat] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {
        });
        channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/552533.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

git 分支-变基

在git中&#xff0c;将一个分支的更改集成到另一个分支有两种主要方式&#xff1a;合并&#xff08;merge&#xff09;和变基&#xff08;rebase&#xff09;。在本节中&#xff0c;将学习什么是变基&#xff0c;如何执行变基操作&#xff0c;为什么它是一个非常强大的工具&…

C++ 一个有关类模板、构造函数、析构函数、拷贝构造、重载等的数组案例分析

文章目录 概要根据代码和输出进行分析&#xff08;看注释和图示&#xff09;个人小结 概要 案例描述: 实现一个通用的数组类&#xff0c;要求如下&#xff1a; 可以对内置数据类型以及自定义数据类型的数据进行存储&#xff1b;将数组中的数据存储到堆区&#xff1b;构造函数…

QT-编译报库错误(LF/CRLF)

QT-安装后环境问题记录 版本和环境问题 版本和环境 QT5.15.2 Windows10 QT Creator 问题 在QT夸端开发的项目中 &#xff0c;使用QTCreator打开项目pro文件&#xff0c;编译报出很多系统库 及本地文件中的一些问题&#xff0c;具体如图&#xff1a; 后续&#xff0c;我以为…

【b站李同学的Lee】Part 2 模块化开发 NodeJS+Gulp基础入门+实战

课程地址&#xff1a;【NodeJSGulp基础入门实战】 https://www.bilibili.com/video/BV1aE411n737/?share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 目录 4 Node.js模块化开发 4.1 JavaScript开发弊端 4.1.1 文件依赖 4.1.2 命名冲突 4.2 生活中的…

Midjourney常见玩法及prompt关键词技巧

今天系统给大家讲讲Midjourney的常见玩法和prompt关键词的一些注意事项&#xff0c;带大家入门&#xff5e;&#xff08;多图预警&#xff0c;建议收藏&#xff5e;&#xff09; 一、入门及常见玩法 1、注册并添加服务器&#xff08;会的童鞋可跳过&#xff5e;&#xff09; …

DC-9渗透测试复现

DC-9渗透测试复现 目的&#xff1a; 获取最高权限以及flag 过程&#xff1a; 信息打点--sql注入- 文件包含漏洞-Knockd开门开启ssh连接-hyjra爆破-sudo提权(文件追加) 环境&#xff1a; 攻击机&#xff1a;kali(192.168.85.137) 靶机&#xff1a;DC_3(192.168.85.141) …

GPT状态和原理 - 解密OpenAI模型训练

目录 1 如何训练 GPT 助手 1.1 第一阶段 Pretraining 预训练 1.2 第二阶段&#xff1a;Supervised Finetuning有监督微调 1.3 第三阶段 Reward Modeling 奖励建模 1.4 第四阶段 Reinforcement Learning 强化学习 1.5 总结 2 第二部分&#xff1a;如何有效的应用在您的应…

【Linux】Linux信号

目录 信号的概念 生活中的信号 Linux中的信号 kill命令 kill 命令的使用 常见的信号 命令行代码示例 注意事项 信号的处理方式 产生信号 信号的捕捉 信号捕捉示意图 内核如何实现信号捕捉 信号的捕捉与处理 小结 阻塞信号 信号在内核中的表示图 信号集操作函数…

如何学习嵌入式Linux?

如何去学习嵌入式 Linux 呢&#xff1f;嵌入式底层开发毫无疑问是一项极为关键重要的技术&#xff0c;其被广泛地应用于形形色色的嵌入式系统之中。伴随科技的迅猛飞速发展&#xff0c;嵌入式系统已然成为了我们生活中不可或缺的一个组成部分&#xff0c;这也极为凸显出了嵌入式…

基于 Bazel 的 iOS Monorepo 工程实践

在之前很长一段时间里&#xff0c;哔哩哔哩 iOS 工程是使用 Polyrepo&#xff08;或者说 Multirepo&#xff0c;即多仓库&#xff09;的传统模式进行开发。但是随着业务的发展&#xff0c;我们的代码仓库的数量也随之膨胀&#xff0c;我们慢慢发现 Polyrepo 模式并不一定是适合…

DDoS攻击愈演愈烈,谈如何做好DDoS防御

DDoS攻击是目前最常见的网络攻击方式之一&#xff0c;各种规模的企业包括组织机构都在受其影响。对于未受保护的企业来讲&#xff0c;每次DDoS攻击的平均成本为20万美元。可见&#xff0c;我们显然需要开展更多的DDoS防御工作。除考虑如何规避已发生的攻击外&#xff0c;更重要…

手机副业赚钱秘籍:让你的手机变成赚钱利器

当今社会&#xff0c;智能手机已然成为我们生活不可或缺的一部分。随着技术的飞速进步&#xff0c;手机不再仅仅是通讯工具&#xff0c;而是化身为生活伴侣与工作助手。在这个信息爆炸的时代&#xff0c;我们时常会被一种焦虑感所困扰&#xff1a;如何能让手机超越消磨时光的定…

关于Git的一些基础用法

关于Git的一些基础用法 1. 前言2. 使用GitHub/gitee创建项目2.1 创建账号2.2 创建项目2.3 下载仓库到本地2.4 提交代码到远端仓库2.5 查看日志2.6 同步远端仓库和本地仓库 1. 前言 首先说一个冷知识&#xff08;好像也不是很冷&#xff09;&#xff0c;Linux和git的创始人是同…

CC254X 8051芯片手册介绍

1 8051CPU 8051是一种8位元的单芯片微控制器&#xff0c;属于MCS-51单芯片的一种&#xff0c;由英特尔(Intel)公司于1981年制造。Intel公司将MCS51的核心技术授权给了很多其它公司&#xff0c;所以有很多公司在做以8051为核心的单片机&#xff0c;如Atmel、飞利浦、深联华等公…

C++:类型转换

目录 1、C语言中的类型转换 2、C的四种类型转换 2.1 static_cast 2.2 reinterpret_cast 2.3 const_cast 2.4 dynamic_cast 3 RTTI 1、C语言中的类型转换 如果 赋值运算符左右两侧类型不同&#xff0c;或者形参与实参类型不匹配&#xff0c;或者返回值类型与 接收返回值…

TexStudio + MikTex 手动安装宏包

遇到上面这个 “宏包安装” 提示窗口后&#xff0c;设置来源为本地&#xff0c;随后在这个网址 https://mirrors.ustc.edu.cn/CTAN/systems/win32/miktex/tm/packages/ 下载所需的宏包&#xff0c;放到本地仓库里&#xff0c;即可 有三个宏包是必须要有的&#xff0c;它们是索…

上下文输入无限制,谷歌发布Infini-Transformer

去年&#xff0c;百川智能发布号称全球最长的上下文窗口大模型Baichuan2-192K&#xff0c;一次性可输入35万字&#xff0c;超越GPT-4。 今年3月&#xff0c;Kimi智能助手宣布在上下文窗口技术上突破200万字。 紧追其后&#xff0c;国内各大互联网巨头纷纷布局升级自家大模型产…

JAVA基础08- 继承,重写,super以及this

目录 继承&#xff08;extends&#xff09; 定义 说明 作用 方法的重写 定义 重写关键点 方法重写与重载的区别 练习 练习1&#xff08;方法继承与重写的简单练习&#xff09; 练习2&#xff08;方法继承与重写的进阶练习&#xff09; This的使用 定义 作用以及注…

Postman之版本信息查看

Postman之版本信息查看 一、为何需要查看版本信息&#xff1f;二、查看Postman的版本信息的步骤 一、为何需要查看版本信息&#xff1f; 不同的版本之间可能存在功能和界面的差异。 二、查看Postman的版本信息的步骤 1、打开 Postman 2、打开设置项 点击页面右上角的 “Set…

MyBatis 源码分析 - SQL 的执行过程

MyBatis 源码分析 - SQL 的执行过程 * 本文速览 本篇文章较为详细的介绍了 MyBatis 执行 SQL 的过程。该过程本身比较复杂&#xff0c;牵涉到的技术点比较多。包括但不限于 Mapper 接口代理类的生成、接口方法的解析、SQL 语句的解析、运行时参数的绑定、查询结果自动映射、延…