rabbitmq交换机,死信队列的简单例子

       假设我们有一个场景,生产者有消息发到某个直连交换机,这个交换机上有两个队列分别存储两种类型的消息,但是与这两个队列相连的消费者太不争气了,处理消息有点慢,我们想5秒钟这个消息在队列中还没有被消费的话,就给它丢进死信队列里得了(我们平时听到的延时队列其实就可按此方法实现,故意让它过期然后延时处理),后续再处理,但是这俩队列明显存储的消息不一样,我们又不好意思将它都扔到同一个死信队列里去,如果我们想要俩死信队列分别装这两个消费者漏掉的消息,那我们怎么做呢?

        下面就是一个简单的例子,如果用spring boot之类的去做也是类似,原理差不多,感兴趣的可以自己改造。

        预处理:我们先创建一个工具类用来连接rabbitmq,注意你需要去创建对应的虚拟主机,以及对应的登录账号和密码。

工具类如下: 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        //VirtualHost(虚拟主机)是一个逻辑上独立的RabbitMQ服务实例。每个VirtualHost都有自己的队列、交换机、绑定等对象,并且它们之间是相互隔离的,即exchange、queue、message不能互通。
        factory.setVirtualHost("myVirtualHost");
        factory.setUsername("mytest");
        factory.setPassword("mytest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

        

现在我们有一个直连交换机test_exchange_direct(直连交换机即根据设置的固定键直接路由到对应的队列,注意与主题topic队列的区分),我们往这个交换机里每300毫秒分别发送键为good和bad的数据各30条。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendToExchange {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        
        //这里要注意,如果你没有响应的队列的话即交换机还没有绑定队列,发送消息到交换机这些消息会丢失。
        for (int i = 0; i < 30; i++) {
            // 消息内容
            String message = "good " + i;
            //会路由到good对应的队列上
            channel.basicPublish(EXCHANGE_NAME, "good", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(300);
        }

        for (int i = 0; i < 30; i++) {
            // 消息内容
            String message = "bad " + i;
            //会路由到bad对应的队列上
            channel.basicPublish(EXCHANGE_NAME, "bad", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(300);
        }

        channel.close();
        connection.close();
    }
}

        我们再创建一个直连死信交换机dead_exchange_direct,和连接到此私信交换机上的两个队列dead_queue,dead_queue1,对应的键分别为dead-good和dead-bad。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class DeadExchange {

    private final static String EXCHANGE_NAME = "dead_exchange_direct";

    private final static String QUEUE_NAME = "dead_queue";

    private final static String QUEUE_NAME1 = "dead_queue1";

    public static void main(String[] argv) throws Exception {
        channel1();
        channel2();
    }

    public static void channel1() throws Exception{
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");


        channel.close();
        connection.close();
    }

    public static void channel2() throws Exception{
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "dead-bad");


        channel.close();
        connection.close();
    }

第一个不争气消费者RecvFromExchange,这个消费者对应的队列是good_queue队列,它800毫秒能处理一条消息,给他设置读取队列消息过期时间为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-good

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import java.util.HashMap;
import java.util.Map;

public class RecvFromExchange {

    private final static String QUEUE_NAME = "good_queue";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_exchange_direct");
        args.put("x-dead-letter-routing-key","dead-good"); // 死信路由键dead 路由到键为dead的死信队列

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(800);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

第二个不争气消费者RecvFromExchange2,这个消费者对应的队列是bad_queue队列,它1秒能处理一条消息,它虽然慢一些但是我就是一视同仁给他设置读取队列消息过期时间也为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-bad

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import java.util.HashMap;
import java.util.Map;

public class RecvFromExchange2 {

    private final static String QUEUE_NAME = "bad_queue";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();


        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_exchange_direct");
        args.put("x-dead-letter-routing-key","dead-bad"); // 死信路由键dead 路由到键为dead的死信队列

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bad");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

按顺序先启动DeadExchange,SendToExchange,RecvFromExchange,RecvFromExchange2。然后再次启动SendToExchange,重新发数据观察发现,这两个不争气的消费者漏掉的数据最后被死信队列接收了。

 

接下来我们对我们喜欢的绑定键dead-good的好队列给它兜底擦屁股。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class FuckDeadQueue {

    private final static String EXCHANGE_NAME = "dead_exchange_direct";

    private final static String QUEUE_NAME = "dead_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }

}

 执行后发现,死信队列里的消息被我们消费掉了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/624969.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

idea上如何新建git分支

当前项目在dev分支&#xff0c;如果想在新分支上开发代码&#xff0c;如何新建一个分支呢&#xff1f;5秒搞定~ 1、工具类选择git&#xff0c;点击New Branch 或者右下角点击git分支&#xff0c;再点击New Branch 2、在弹出的Create New Branch弹窗中&#xff0c;输入你的新分支…

Open AI再次定义AI PC?

从传统的文字交互&#xff0c;到语音和图像交互——Open AI再次提升了人们对AI PC的想象空间。 这种更贴近人类间交互的模式&#xff0c;会多大程度改变目前PC的生态&#xff1f; 随着苹果M4芯片、高通骁龙X的发布&#xff0c;AI PC也逐渐成为了市场热议的产品。 从各家PC厂…

‍♂️垃圾收集算法必看!学习指数满天星!!!

&#x1f435;看完这篇文章&#xff0c;希望你有点收获&#x1f697; 注意&#xff1a;看之前你需要对JVM有点了解。。。 首先&#xff0c;垃圾回收算法主要分有三种: 标记-清除算法 见名知意&#xff0c;标记-清除&#xff08;Mark-Sweep&#xff09;算法分为两个阶段&#…

高清SDI串行数字接口采集卡与传输编码器

随着科技的快速发展&#xff0c;我们正处于一个数字化、信息化的时代&#xff0c;各式各样的设备正成为人们工作和生活中必不可少的伙伴。今天&#xff0c;我要向大家介绍的是一款具有革命性意义的视频采集卡——LCC262。这款由灵卡技术团队精心打造的产品&#xff0c;集合了多…

越来越真的Deepfake再次引起网安界的关注

当地时间5月6日&#xff0c;全球网络安全领域最受关注的年度盛会 RSAC 2024在美国旧金山隆重开幕。当天&#xff0c;被誉为“安全圈奥斯卡”的创新沙盒大赛也决出了冠军&#xff0c;Reality Defender凭借其创新性的深度伪造&#xff08;Deepfake&#xff09;检测平台摘得桂冠&a…

MySQL用SQL取三列中最大的数据值

1、有如下数据&#xff1a; ABC000097.0600330.72330.720069.650027.8827.85086.92086.92219.42219.4219.41 需要展示为如下形式&#xff1a; ABC结果列0000097.06097.060330.72330.72330.7200669.65009.6527.8827.85027.8886.92086.9286.92219.42219.4219.41219.42 解决办…

IP代理中的SOCKS5代理是什么?安全吗?

在互联网世界中&#xff0c;网络安全和个人隐私保护变得日益重要。SOCKS5代理作为一种安全高效的网络工具&#xff0c;不仅可以保护个人隐私安全&#xff0c;还可以提供更稳定、更快度的网络连接。本文将带大家深入了解SOCKS5代理在网络安全领域中的应用。 什么是SOCKS5代理 …

肺部营养“救星”,让每次呼吸更自由

​#肺科营养#朗格力#班古营养#复合营养素#肺部营养# 正常的健康人,每天自由幸福的呼吸。但是对于肺病患者来说,特别是慢阻肺人群,每一次呼吸都可能是一场挑战,每一口气都显得弥足珍贵。 肺病患者号称沉默的“呼吸杀手”,它虽然沉默,但不代表它没能力,除了引起肺功能下降,氧气…

智慧安防监控EasyCVR视频汇聚管理平台视频播放花屏的原因分析及处理

智慧安防监控EasyCVR视频管理平台能在复杂的网络环境中&#xff0c;将前端设备统一集中接入与汇聚管理。国标GB28181协议视频监控/视频汇聚EasyCVR平台可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、…

Python多任务

进程 1. 进程的概念 一个正在运行的程序或者软件就是一个进程&#xff0c;它是操作系统进行资源分配的基本单位&#xff0c;也就是说每启动一个进程&#xff0c;操作系统都会给其分配一定的运行资源(内存资源)保证进程的运行。 比如:现实生活中的公司可以理解成是一个进程&a…

乡村振兴与乡村旅游深度融合:依托乡村自然和文化资源,发展乡村旅游产业,促进农民增收致富,打造特色美丽乡村

目录 一、引言 二、乡村振兴与乡村旅游的内在联系 三、依托乡村自然和文化资源发展乡村旅游产业 &#xff08;一&#xff09;挖掘乡村自然资源优势&#xff0c;打造特色旅游品牌 &#xff08;二&#xff09;挖掘乡村文化资源内涵&#xff0c;丰富旅游活动内容 四、促进农…

lerna实战(一)

前言 将大型代码仓库分割成多个独立版本化的 软件包&#xff08;package&#xff09;对于代码共享来说非常有用。但是&#xff0c;如果某些更改 跨越了多个代码仓库的话将变得很 麻烦 并且难以跟踪&#xff0c;并且&#xff0c; 跨越多个代码仓库的测试将迅速变得非常复杂。 …

怎么给视频加水印?2招轻松搞定

在数字媒体时代&#xff0c;视频水印作为一种有效的版权保护手段&#xff0c;被广泛应用于各种场景。给视频添加水印不仅可以防止内容被恶意盗用&#xff0c;还能增加视频的辨识度&#xff0c;提升品牌形象。本文将为您介绍2种简单易行的方法&#xff0c;教您怎么给视频加水印&…

24.5.12(23广东,19陕西)(字典树)

星期一&#xff1a; dp题单 区间dp第三题 二叉搜索树 cf传送门 思路&#xff1a;dp【i】【j】【0/1】表示区间 i到 j&#xff0c;以 i / j为根节点能否形成一棵二叉搜索树 因为题目要求组成二叉搜索树&#xff0c;若 i 到 j 的节点为一颗完整的子…

地平线X3开发板配置wifi调试

1. 系统镜像制作 系统镜像的制作依赖bsp与补丁包&#xff0c;bsp在天工开物全量包中&#xff1a;https://developer.horizon.ai/resource 补丁下载链接&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1YKcOWL0EpboGq-SnqwIGeQ 提取码&#xff1a;b6lf 补丁包中有详细…

AWS ECS On Fargate 监控可观测最佳实践

概述 Amazon ECS on Fargate 为用户提供了简单、高效且可靠的容器化解决方案&#xff0c;使用户能够专注于应用程序开发和运行&#xff0c;而无需担心基础设施管理的复杂性。与其同时&#xff0c;用户需要实时了解在该环境中应用程序运行的性能、可用性、健康状况和资源使用情…

在VMware安装Androidx86_64系统要点

上篇使用VirtualBox安装过Androidx86_64系统&#xff0c;尝试了没有蓝牙共享的好方法。本篇记录下使用Vmware虚机安装改系统&#xff0c;并使用蓝牙共享功能。 1.准备材料 本篇安装环境是安装Window10_64位系统。需要下载好Vmware安装包&#xff0c;VMWare版本&#xff1a;VMw…

antv x6实现ER图

前端&#xff1a;安装相关依赖 npm install antv/x6 antv/x6-plugin-history antv/x6-plugin-selection antv/x6-plugin-minimap antv/layout 代码参考来源learn-antv-x6: antv/X6学习 antv官网https://antv.antgroup.com/ 代码包&#xff1a;链接: https://pan.baidu.c…

C++笔试强训day22

目录 1.添加字符 2.数组变换 3.装箱问题 常规一维优化&#xff1a; 1.添加字符 链接 因为lenA < lenB < 50&#xff0c;因此可以无脑暴力解题&#xff1a; 遍历所有符合条件的匹配方法&#xff0c;找出最小的不同的数量&#xff0c;即最大的相同的数量 #include &…

react18【系列实用教程】useEffect —— 副作用操作 (2024最新版)

什么是副作用操作&#xff1f; useEffect 用于编写由渲染本身引起的对接组件外部的操作&#xff08;官方称呼为&#xff1a;副作用操作&#xff09; 以下情况会触发页面渲染 初次加载页面&#xff08;组的挂载&#xff09;响应式变量发生变化&#xff0c;触发页面根据新值重新…