RabbitMQ消息模型之Routing-Direct

Routing Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

image-20191126220145375

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列。
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。

创建生产者

public class MyProducer {

    @Test
    public void test() throws Exception {
        // 交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "direct");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, "DEBUG", null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "INFO", null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "WARN", null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "ERROR", null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "WARN");
        channel.queueBind(queue, exchange, "ERROR");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "DEBUG");
        channel.queueBind(queue, exchange, "INFO");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

image-20220526182028082

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/202838.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【广州华锐视点】3D宪法普法知识宣传展厅——线上法律知识学习新途径

随着科技的不断发展&#xff0c;人们的生活方式也在不断地改变。在这个信息爆炸的时代&#xff0c;传统的普法教育方式已经无法满足人们的需求。为了适应这一变化&#xff0c;越来越多的教育机构开始尝试利用现代科技手段进行普法教育。其中&#xff0c;3D宪法普法知识宣传展厅…

vuepress-----2、初体验

2、初体验 目标 创建GitHub账号创建Github项目初体验vuepress默认主体的首页 初体验 (opens new window) --- home: true heroImage: /hero.png heroText: Hero 标题 tagline: Hero 副标题 actionText: 快速上手 → actionLink: /zh/guide/ features: - title: 简洁至上deta…

5.如何利用ORBSLAM3生成可用于机器人/无人机导航的二维/三维栅格地图--以octomap为例

1 octomap的安装及官方文档 这里我们用ROS自带的安装方式即可&#xff1a; sudo apt install ros-melodic-octomap-msgs ros-melodic-octomap-ros ros-melodic- octomap-rviz-plugins ros-melodic-octomap-server 如上图就是安装成功了&#xff1a; 如果安装失败了&#xff0c;…

Win11录屏没有声音?深入解析潜在原因

录屏已经成为了许多人工作、娱乐和教育中的重要工具。然而&#xff0c;有时候在win11系统中录制屏幕时&#xff0c;可能会遇到没有声音的问题发生&#xff0c;这将会影响到您的工作和创作。在本文中&#xff0c;我们将探讨win11录屏没有声音怎么办&#xff0c;并提供详细的步骤…

【优选算法系列】【专题十四优先级队列】第一节.(1046. 最后一块石头的重量和703. 数据流中的第 K 大元素)

文章目录 前言一、最后一块石头的重量 1.1 题目描述 1.2 题目解析 1.2.1 算法原理 1.2.2 代码编写 1.2.3 题目总结二、数据流中的第 K 大元素 2.1 题目描述 2.2 题目解析 2.2.1 算法原理 2.2…

8款前端特效动画及源码分享

3D立体数字时钟滚动特效 基于Splitting制作的一款3D立体数字时钟滚动特效&#xff0c;创意感满满&#xff0c;可以下载使用。 预览获取 核心代码 <div class"clock"><span class"cog hours tens" data-splitting>0123456789</span>&l…

SpringBoot查询指定范围内的坐标点

使用Redis geo实现 redis geo是基于Sorted Set来实现的 Redis 3.2 版本新增了geo相关命令&#xff0c;用于存储和操作地理位置信息。提供的命令包括添加、计算位置之间距离、根据中心点坐标和距离范围来查询地理位置集合等&#xff0c;说明如下: geoadd&#xff1a;添加地理…

A stop job is running for Session c1 of user root (25s 1min 30s)问题

写在前面 今天在前端点击重启按钮&#xff0c;突然发现开发板的串口打印信息卡住了&#xff0c;时间比较长的有一处&#xff0c;比较短的有两处&#xff0c;大致为A stop job is running for Session c1 of user root (25s 1min 30s)&#xff0c;此处估计是在关机重启的时候&a…

vue3通过el-dropdown实现动态菜单切换页面

这是效果图 首先是主页index.vue <template><el-row><el-col :span"20"><!-- 顶部菜单 --><div v-if"showTop"><topmenu /></div><!-- 右侧下方区域动态切换的内容 --><div style"flex: 1;&quo…

重排链表,剑指offerII 26,力扣 120

目录 力扣题目地址&#xff1a; 题目&#xff1a; 那我们直接看题解吧&#xff1a; 解题方法&#xff1a; 难度分析&#xff1a; 审题目事例提示&#xff1a; 解题分析&#xff1a; 解题思路&#xff1a; 解题补充&#xff1a; 力扣题目地址&#xff1a; 143. 重排链表 - 力扣&…

[node] Node.js的Web 模块

[node] Node.js的Web 模块 什么是 Web 服务器&#xff1f;Web的应用架构http使用方式使用 Node 创建 Web 服务器使用 Node 创建 Web 客户端 什么是 Web 服务器&#xff1f; Web服务器一般指网站服务器&#xff0c;是指驻留于因特网上某种类型计算机的程序&#xff0c;Web服务器…

COGVLM论文解读(COGVLM:VISUAL EXPERT FOR LARGE LANGUAGE MODELS)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、摘要二、引言三、模型方法1、模型思路2、融合公式 四、训练方法总结 前言 2023年5月18日清华&智谱AI发布并开源VisualGLM-6B以来&#xff0c;清华KEG&…

从零开始构造一个Operator(保姆级教程)

文章目录 operator是什么operator如何开发CRD需求分析Operator1. 初始化项目2. 初始化CRD相关文件3. code-generator生成代码4. controller业务逻辑实现5. manager启动6. 本地安装调试7. 部署在集群上8. 卸载清除资源 项目地址&#xff1a;https://github.com/kosmos-io/simple…

java:springboot结合mybatis

背景 前我们在这篇 java&#xff1a;jpa、Hibernate、Spring Data JPA、ORM以及和mybatis的区别 文章中讲了 springbootjpa 的使用&#xff0c;今天来看一下 springboot mybatis 的组合&#xff0c;其实和 jpa 的使用很像&#xff0c;大家可以对照的来看。 Spring Boot Myba…

Day13 qt 高级控件,自定义控件,事件,绘图,定时器

高级控件 QListWidget 列表展示控件 效果 添加数据 ui->listWidget->addItem("A"); QStringList list; list << "B" << "C" << "D"; ui->listWidget->addItems(list); 设置item点击 void Widget::on_l…

Verilator 用法

Verilating … 威尔逊-斯奈德版权所有 2003-2023。 … SPDX 许可证标识符&#xff1a; 仅限 LGPL-3.0 或 Artistic-2.0 验证 Verilator 可通过五种主要方式使用&#xff1a; 使用 --cc 或 :vlopt:-sc 选项&#xff0c;Verilator 将分别把设计翻译成 C 或 SystemC 代码。 将设计…

AI Agents 闭门研讨会报名丨CAMEL、AutoAgents、Humanoid agents作者参与

青源Workshop丨No.27 AI Agents主题闭门研讨会 所谓AI智能体&#xff08;AI Agents&#xff09;&#xff0c;是一种能够感知环境、进行决策和执行动作的智能实体。它们拥有自主性和自适应性&#xff0c;可以依靠AI赋予的能力完成特定任务&#xff0c;并在此过程中不断对自我进行…

Elasticsearch 快照如何工作?

作者&#xff1a;Lutf ur Rehman Elastic 提供许多由讲师指导的面对面和虚拟现场培训以及点播培训。 我们的旗舰课程是 Elasticsearch 工程师、Kibana 数据分析和 Elastic 可观测性工程师。 所有这些课程都会获得认证。有关这些课程的详细介绍&#xff0c;请参考我之前的文章 “…

LeetCode | 二叉树的最大深度

LeetCode | 二叉树的最大深度 OJ链接 这里需要注意的一点是每次有返回值&#xff0c;需要定义变量来保存上一次的值最后取最高的一方加1 int maxDepth(struct TreeNode* root) {if(root NULL)return NULL;int left maxDepth(root->left);int right maxDepth(root->r…

Haskell和http-client库下载代码示例

haskell import Network.HTTP.Client 然后&#xff0c;我们需要定义一个函数来下载视频。这个函数将接收一个URL作为参数&#xff0c;并返回一个IO动作&#xff0c;该动作将下载视频文件到当前目录。 haskell downloadVideo :: String -> IO () downloadVideo url do --…