RabbitMQ-工作模式(Publish模式Routing模式)

在这里插入图片描述

文章目录

  • 发布/订阅(Publish/Subscribe)
    • 交换机
    • 临时队列
    • 绑定
    • 总体代码示例
  • 路由(Routing)
    • 绑定
    • 直连交换机
    • 多重绑定
    • 发送日志
    • 订阅
    • 总体代码示例

更多相关内容可查看

发布/订阅(Publish/Subscribe)

构建一个简单的日志系统

  • 我们将通过构建一个简单的日志系统来说明这个模式。它将包含两个程序 – 第一个程序将发出日志消息,第二个程序将接收并打印它们。
  • 在我们的日志系统中,每个运行中的接收程序都将收到这些消息。这样,我们就能够运行一个接收程序并将日志定向到磁盘;同时,我们也能够运行另一个接收程序,并在屏幕上看到日志。

基本上,发布的日志消息将被广播到所有的接收程序。

交换机

Rabbit中完整的消息模型:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

在RabbitMQ中消息模型的核心思想是,生产者从不直接发送任何消息到队列。实际上,很多时候生产者甚至不知道消息是否会被发送到任何队列。

相反,生产者只能将消息发送到一个交换机。交换机是一个非常简单的东西。它一边从生产者接收消息,另一边将它们推送到队列。交换机必须确切地知道如何处理收到的消息。它应该被附加到特定的队列吗?它应该被附加到多个队列吗?还是应该被丢弃。这些规则由交换机类型定义。
在这里插入图片描述

有几种可用的交换机类型:direct、topic、headers和fanout。我们将专注于最后一种类型 – fanout。让我们创建一个这种类型的交换机,并称其为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换机非常简单。正如你可能从名称中猜到的那样,它只是将接收到的所有消息广播到它所知道的所有队列。这正是我们的日志记录器所需要的。

列出交换机
要列出服务器上的交换机,您可以运行非常有用的rabbitmqctl命令:

sudo rabbitmqctl list_exchanges

在这个列表中会有一些amq.*交换机和默认(未命名)交换机。这些是默认创建的,无需考虑

之前对交换机一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,通过空字符串("")来标识它。

回想一下之前发布消息的方式:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换机的名称空字符串表示默认或无名交换机:如果存在指定 routingKey 的队列,则消息会被路由到该队列。

现在,我们可以将消息发布到我们命名的交换机:

channel.basicPublish("logs", "", null, message.getBytes());

这样,我们将消息发布到名为 logs 的交换机中,而不是默认的无名交换机。

临时队列

在之前,我们使用的队列都有特定的名称。能够给队列命名对我们来说非常重要 - 我们需要将工作者指向同一个队列。在想要在生产者和消费者之间共享队列时,给队列命名非常重要。

但是对于我们的日志记录器来说情况并非如此。我们希望收到所有日志消息,而不仅仅是其中的一部分。我们也只对当前正在流动的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要两件事情。

  • 首先,每当我们连接到 Rabbit 时,我们都需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者更好的是让服务器为我们选择一个随机的队列名称。
  • 其次,一旦我们断开消费者连接,队列应该自动删除。

在 Java 中,当我们向 queueDeclare() 方法提供没有参数时,我们创建一个非持久化、独占、自动删除的队列,并且由服务器生成一个名称:

String queueName = channel.queueDeclare().getQueue();

在这一点上,queueName 包含一个随机的队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

在这里插入图片描述

我们已经创建了一个fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定

channel.queueBind(queueName, "logs", "");

从现在开始,logs 交换机将会将消息追加到我们的队列中。

列出绑定
您可以使用以下命令列出现有的绑定:

rabbitmqctl list_bindings

总体代码示例

在这里插入图片描述
发出日志消息的 producer 程序:logsroutingKeyfanoutEmitLog.java

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         //指定交换机类型-fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);
		//绑定交换机 发送消息到交换机EXCHANGE_NAME中
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

如果还没有队列绑定到交换机,则消息将丢失, 但这对我们来说没关系,如果还没有消费者在获取,我们可以安全地丢弃该消息。

代码为:ReceiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
	//指定交换机类型-fanout
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    //绑定交换机跟队列
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

如果你想将日志保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上看到日志,开启一个新的终端并运行:

java -cp $CP ReceiveLogs

当然,要发出日志,只需输入:

java -cp $CP EmitLog

使用 rabbitmqctl list_bindings 命令,你可以验证代码实际上是否按我们想要的方式创建了绑定和队列。如果有两个 ReceiveLogs.java 程序正在运行,你应该会看到类似如下的输出:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

结果的解释很简单:来自 exchange logs 的数据发送到两个带有服务器分配名称的队列中。这正是我们想要的。

路由(Routing)

绑定

在前面的示例中,我们已经创建了绑定。你可能还记得 代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自该交换机的消息感兴趣。

绑定可以接受一个额外的routingKey 参数。为了避免与basic_publish参数混淆,我们将其称为绑定键。以下是我们如何使用键创建绑定的示例:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的含义取决于交换机类型。我们先前使用的fanout 交换机简单地忽略了它的值。

直连交换机

在我们之前的教程中,我们的日志系统将所有消息广播给所有消费者。我们希望扩展其功能,以允许根据消息的严重性进行过滤。例如,我们可能希望一个将日志消息写入磁盘的程序只接收关键错误,而不浪费磁盘空间来记录警告或信息日志消息。

我们之前使用的是fanout 交换机,它并没有提供太多的灵活性 - 它只能进行无脑广播。

相反,我们将使用直连交换机。直连交换机背后的路由算法很简单 - 消息将被发送到绑定键与消息的路由键完全匹配的队列中。

为了说明这一点,考虑以下设置:

橙黑绿P直接问₁Q₂C₁

在这个设置中,我们可以看到直连交换机 X 与两个绑定到它的队列。第一个队列绑定的绑定键是 orange,而第二个队列有两个绑定,一个绑定键为 black,另一个为 green

在这样的设置中,使用路由键 orange发布到交换机的消息将被路由到队列 Q1。具有路由键 black 或 green的消息将发送到 Q2。所有其他消息将被丢弃。

多重绑定

黑黑P直接问₁Q₂C₁C₂

将多个队列与相同的绑定键绑定是完全合法的。在我们的示例中,我们可以添加一个在交换机 X 和队列 Q1 之间的绑定,绑定键为black。在这种情况下,直连交换机将像fanout 交换机一样行为,将消息广播到所有匹配的队列。具有路由键 black 的消息将被传递到 Q1 和 Q2。

发送日志

我们将使用这个模型来构建我们的日志系统。与使用fanout 交换机不同,我们将消息发送到一个直连交换机。我们将日志严重程度作为路由键提供。这样,接收程序就能够选择它想要接收的严重程度。让我们先专注于发送日志。

和往常一样,我们首先需要创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接下来,我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

我们假设 ‘severity’ 可以是 'info'、'warning' 或 'error' 中的一个。

订阅

接收消息的方式与之前的教程类似,只有一个例外 - 我们将为每个我们感兴趣的严重程度创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

总体代码示例

错误信息警告错误P直接amq.gen-S9b...amq.gen-Ag1...C₁C₂

生产者类的代码:EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         //指定交换机类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		//指定日志类型
        String severity = getSeverity(argv);
        String message = getMessage(argv);
		//发送日志
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

消费者代码为:ReceiveLogsDirect.java

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
	//指定交换机类型
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //获取交换机随机名字
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
    	//指定日志类型
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

如果你只想将 ‘warning’ 和 ‘error’(而不是 ‘info’)日志消息保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在屏幕上看到所有的日志消息,打开一个新的终端并执行以下命令:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出一个错误日志消息,只需输入以下命令:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/692417.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

达梦8 探寻达梦排序原理:新排序机制(SORT_FLAG=1)

测试版本&#xff1a;--03134283938-20221019-172201-20018 达梦的排序机制由四个dm.ini参数控制&#xff1a; #maximum sort buffer size in Megabytes &#xff0c;有效值范围&#xff08;1~2048&#xff09; SORT_BUF_SIZE 100 #ma…

FinalShell导出服务器配置信息密码password是加密的,如何解密?

本章教程,主要实现了一个小的功能,对FinalShell导出的配置信息,进行解密。 FinalShell导出之后,会产生一个json文件,例如下面这种json格式,里面记录了服务器的IP地址,端口和密码,里面的密码是经过加密处理的,本文主要利用java代码实现对这个password进行解密还原。 {&…

【设计模式】行为型设计模式之 策略模式学习实践

介绍 策略模式&#xff08;Strategy&#xff09;&#xff0c;就是⼀个问题有多种解决⽅案&#xff0c;选择其中的⼀种使⽤&#xff0c;这种情况下我们 使⽤策略模式来实现灵活地选择&#xff0c;也能够⽅便地增加新的解决⽅案。⽐如做数学题&#xff0c;⼀个问题的 解法可能有…

五、身份与访问管理—身份管理和访问控制管理(CISSP)

目录 1.身份管理 1.1 目录技术 1.2 单点登录 1.2.1 Kerberos认证 1.2.2 SESAME认证 1.2.3 KryptoKnight认证 1.3 联合身份管理 1.3.1 SAML安全断言标记语言 1.3.2 标记语言 1.3.3 OpenID 1.3.4 OAuth 1.3.5 OIDC(OpenID Connect) 2.身份即服务(IDaaS) 2.1 AA…

如何提高网站收录?

GSI服务就是专门干这个的&#xff0c;这个服务用的是光算科技自己研发的GPC爬虫池系统。这个系统通过建立一个庞大的站群和复杂的链接结构&#xff0c;来吸引谷歌的爬虫。这样一来&#xff0c;你的网站就能更频繁地被谷歌的爬虫访问&#xff0c;从而提高被收录的机会。 说到效…

【漏洞复现】Apache OFBiz 路径遍历导致RCE漏洞(CVE-2024-36104)

0x01 产品简介 Apache OFBiz是一个电子商务平台&#xff0c;用于构建大中型企业级、跨平台、跨数据库、跨应用服务器的多层、分布式电子商务类应用系统。是美国阿帕奇(Apache)基金会的一套企业资源计划(ERP)系统。该系统提供了一整套基于Java的Web应用程序组件和工具。 0x02 …

【Nacos 2.3.3支持Postgre SQL数据源配置】

Nacos 2.3.3支持Postgre SQL数据源配置 1、Nacos下载2、 插件下载&#xff1a;3、SQL脚本获取、nacos数据库创建、插件编译4、Nacos 集群搭建方式&#xff1a; 1、Nacos下载 下载地址&#xff1a; https://download.nacos.io/nacos-server/nacos-server-2.3.2.zip 或者自行在官…

OrangePi AIpro Ubuntu 22.04 aarch64 安装MySql 8.0

查看MySQL安装包 接下来可以使用以下命令安装MySQL服务器&#xff1a; 安装MySQL 8.0 # 安装最新版本 sudo apt install -y mysql-server # 安装指定版本 sudo apt install -y mysql-server-8.0初始化配置信息 sudo mysql_secure_installationVALIDATE PASSWORD COMPONENT ca…

pc之间的相互通信详解

如图&#xff0c;实现两台pc之间的相互通信 1.pc1和pc2之间如何进行通讯。 2.pc有mac和ip&#xff0c;首先pc1需要向sw1发送广播&#xff0c;sw1查询mac地址表&#xff0c;向router发送广播&#xff0c;router不接受广播&#xff0c;router的每个接口都有ip和mac&#xff0c;…

【Java笔记】第10章:接口

前言1. 接口的概念与定义2. 接口的声明与语法3. 接口的实现4. 接口的继承5. 接口的默认方法6. 接口的静态方法7. 接口的私有方法8. 接口的作用9. 接口与抽象类的区别10. 接口在Java集合中的应用结语 上期回顾:【Java笔记】第9章&#xff1a;三个修饰符 个人主页&#xff1a;C_G…

Java | Leetcode Java题解之第139题单词拆分

题目&#xff1a; 题解&#xff1a; public class Solution {public boolean wordBreak(String s, List<String> wordDict) {Set<String> wordDictSet new HashSet(wordDict);boolean[] dp new boolean[s.length() 1];dp[0] true;for (int i 1; i < s.len…

力扣每日一题85:最大矩形

题目 困难 相关标签 相关企业 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例 1&#xff1a; 输入&#xff1a;matrix [["1","0","1","0",&q…

毫米波SDK使用1

本文档是AM273x等毫米波雷达处理器SDK的配置和使用&#xff0c;主要参考TI的官方文档《mmwave mcuplus sdk user guide》。这里仅摘取其中重要的部分&#xff0c;其余枝节可参考原文。 2 系统概览 mmWave SDK分为两个主要组件:mmWave套件和mmWave演示。 2.1. mmWave套件 mmWa…

react 基础样式的控制(行内和className)

import ./index.cssconst style{color:red,font-size:150px }function App() {return (<div className"App"><h1>行内样式控制</h1><h1 style{{color:red,font-size:150px}} >asd </h1><span style{style} >asd </span>&l…

MATLAB算法实战应用案例精讲-【数模应用】数据孤岛(概念篇)

目录 前言 算法原理 什么是数据孤岛 数据孤岛产生的原因 数据孤岛的问题 什么时候数据孤岛不是坏事&#xff1f; 为什么很难摆脱数据孤岛 数据孤岛对企业造成的负面效应 数据孤岛的影响 数据孤岛的危害 如何解决数据孤岛问题 如何摆脱数据孤岛&#xff1f; 前言 数…

Java学习 - Maven - 常用命令(学习精选)

前言 在上一篇文章中&#xff0c;我们对 Maven 有了初步的了解&#xff0c;包括它的定义、安装步骤以及一些基本的配置方法。Maven 是一个强大的项目管理工具&#xff0c;它可以帮助开发者自动化构建过程&#xff0c;并且管理项目的依赖关系。 今天&#xff0c;我们将深入探讨…

高光谱图像聚类的像素-超像素对比学习与伪标签校正

Pixel-Superpixel Contrastive Learning and Pseudo-Label Correction for Hyperspectral Image Clustering 文章目录 Pixel-Superpixel Contrastive Learning and Pseudo-Label Correction for Hyperspectral Image Clustering摘要引言相关方法对比学习 方法超像素对比学习像素…

攻防世界---misc---Excaliflag

1、题目描述&#xff0c;下载附件是一张图片 2、用winhex分析&#xff0c;没有发现奇怪的地方 3、在kali中使用binwalk -e 命令&#xff0c;虽然分离出来了一些东西&#xff0c;但是不是有用的 4、最后用stegsolve分析&#xff0c;切换图片&#xff0c;发现有字符串&#xff0c…

番外篇 | 利用华为2023最新Gold-YOLO中的Gatherand-Distribute对特征融合模块进行改进

前言:Hello大家好,我是小哥谈。论文提出一种改进的信息融合机制Gather-and-Distribute (GD) ,通过全局融合多层特征并将全局信息注入高层,以提高YOLO系列模型的信息融合能力和检测性能。通过引入MAE-style预训练方法,进一步提高模型的准确性。🌈 目录 🚀1.论文解…

MyBatisPlus总结二

MybatisPlus总结一在这&#xff1a; MybatisPlus总结1/2-CSDN博客 六、分页查询&#xff1a; 6.1.介绍&#xff1a; MybatisPlus内置了分页插件&#xff0c;所以我们只需要配置一个分页拦截器就可以了&#xff0c;由于不同的数据库的分页的方式不一样&#xff0c;例如mysql和…