go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.绑定

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(
  q.Name,    // queue name
  "black",   // routing key
  "logs",    // exchange
  false,
  nil)

 3.直连交换器

direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

direct-exchang

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

4.多重绑定

direct-exchange-multiple

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

5.发送日志

在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。

先创建一个direct交换器:

err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)

指定routing key发送一条消息:

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs_direct",         // exchange
  severityFrom(os.Args), // routing key
  false, // mandatory
  false, // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。

6.订阅

为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取

q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when unused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

if len(os.Args) < 2 {
  log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
  os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {
  log.Printf("Binding queue %s to exchange %s with routing key %s",
     q.Name, "logs_direct", s)
  err = ch.QueueBind(
    q.Name,        // queue name
    s,             // routing key
    "logs_direct", // exchange
    false,
    nil)
  failOnError(err, "Failed to bind a queue")
}

7.完整示例

img

emit_log_direct.go脚本的代码:

package main

import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_direct", // name
                "direct",      // type
                true,          // durable
                false,         // auto-deleted
                false,         // internal
                false,         // no-wait
                nil,           // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs_direct",         // exchange
                severityFrom(os.Args), // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}

func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "info"
        } else {
                s = os.Args[1]
        }
        return s
}

receive_logs_direct.go的代码:

package main

import (
        "log"
        "os"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_direct", // name
                "direct",      // type
                true,          // durable
                false,         // auto-deleted
                false,         // internal
                false,         // no-wait
                nil,           // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        if len(os.Args) < 2 {
                log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
                os.Exit(0)
        }
        for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_direct", s)
                err = ch.QueueBind(
                        q.Name,        // queue name
                        s,             // routing key
                        "logs_direct", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)

参考文章:

go rabbitmq Routing模式 - 范斯猫 (fansimao.com)

RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/371974.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ctfshow-web1~10-WP

web1 右键查看源码就能看到flag web2 打开网页提示无法查看源代码,右键也使用不了,那我们就在url前面加上view-source: view-source:http://83a83588-671e-4a94-9c6f-6857f9e20c2f.chall.ctf.show/ 访问后即可获得flag web3 右键源码也没看到信息,去查看一下请求头和响应…

2.05作业

1.请编程实现哈希表的创建存储数组{12,24,234,234,23,234,23}&#xff0c;输入key查找的值&#xff0c;实现查找功能。 #include<stdio.h> #include<string.h> #include<stdlib.h> #include<math.h> typedef int datatype; typedef struct Node {datat…

Leetcode24:两两交换链表中的节点

一、题目 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 示例&#xff1a; 输入&#xff1a;head [1,2,3,4] 输出&#xff…

88.网游逆向分析与插件开发-物品使用-物品使用策略管理UI的设计

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;物品交换的逆向分析与C封装-CSDN博客 码云地址&#xff08;ui显示角色数据 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;f1b9b1a69ac3e2c3…

Python爬虫requests库详解

使用 requests 上一节中&#xff0c;我们了解了 urllib 的基本用法&#xff0c;但是其中确实有不方便的地方&#xff0c;比如处理网页验证和 Cookies 时&#xff0c;需要写 Opener 和 Handler 来处理。为了更加方便地实现这些操作&#xff0c;就有了更为强大的库 requests&…

第97讲:MHA高可用集群模拟主库故障以及修复过程

文章目录 1.分析主库故障后哪一个从库会切换为主库2.模拟主库故障观察剩余从库的状态2.1.模拟主库故障2.3.当前主从架构 3.修复故障的主库3.1.修复主库3.2.当前主从架构3.3.恢复MHA 1.分析主库故障后哪一个从库会切换为主库 在模拟MHA高可用集群主库故障之前&#xff0c;我们先…

微信小程序 使用npm包

1. 微信小程序 使用npm包 1.1. npm初始化 如果你的小程序项目没有安装过npm包的话&#xff0c;你需要先初始化npm npm init1.2. 安装npm包 这里以vant-weapp(小程序UI组件库)为例&#xff1a; npm i vant-weapp -S --production1.3. npm包构建 1.3.1. 点击微信开发者工具右…

数据库管理-第145期 最强Oracle监控EMCC深入使用-02(20240205)

数据库管理145期 2024-02-05 数据库管理-第145期 最强Oracle监控EMCC深入使用-02&#xff08;20240205&#xff09;1 监控方式2 度量配置3 阻塞4 DG监控总结 数据库管理-第145期 最强Oracle监控EMCC深入使用-02&#xff08;20240205&#xff09; 作者&#xff1a;胖头鱼的鱼缸&…

C++学习Day03之类对象作为类成员

目录 一、程序及输出二、分析与总结 一、程序及输出 #include<iostream> using namespace std; #include <string>class Phone { public:Phone(string pName){cout << "phone 的有参构造调用" << endl;m_PhoneName pName;}~Phone(){cout &…

【已解决】Oracle 12541 TNS 无监听程序

目录 1、找到Oracle监听服务&#xff08;OracleOraDb10g_homeTNLListener&#xff09;&#xff0c;停止运行 2、首先查看监听文件是否超过4G 3、修改配置文件 连接oracle突然报错&#xff0c;提示Oracle 12541 TNS 无监听程序&#xff0c;可以按照以下步骤解决 1、找到Ora…

(2)(2.13) Rockblock Satellite Modem

文章目录 前言 1 支持的MAVLink命令信息 2 设置 3 使用方法 4 数据成本 5 参数 前言 &#xff01;Note 该功能仅适用于 ArduPilot 4.4 或更高版本&#xff0c;并且要求飞行控制器支持 LUA 脚本(LUA Scripts)。 RockBLOCK 卫星调制解调器可实现与 ArduPilot 飞行器的全球…

C++一维数组

个人主页&#xff1a;PingdiGuo_guo 收录专栏&#xff1a;C干货专栏 铁汁们大家好呀&#xff0c;我是PingdiGuo_guo&#xff0c;今天我们来学习一下数组&#xff08;一维&#xff09;。 文章目录 1.数组的概念与思想 2.为什么要使用数组 3.数组的特性 4.数组的操作 1.定义…

Spark SQL调优实战

1、新添参数说明 // Driver和Executor内存和CPU资源相关配置 --是否开启executor动态分配&#xff0c;开启时spark.executor.instances不生效 spark.dynamicAllocation.enabledfalse --配置Driver内存 spark.dirver.memory5g --driver最大结果大小&#xff0c;设置为0代…

python中的web框架介绍

目录 一&#xff1a;框架介绍 二&#xff1a;框架安装 Python中有许多流行的Web框架,以下是一些最受欢迎的框架&#xff1a; 一&#xff1a;框架介绍 1: Django Django是一个高级Python Web框架&#xff0c;它鼓励快速开发和干净的设计。Django的主要特点是其强大的ORM&am…

【C++历练之路】二叉搜索树的学习应用及其实现

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; 前言&#x1f354;&#xff1a; 我们之前学过一些查找关键数据的办法&#xff0c;排序二分查找。但是这种方法的插入的时间复杂的太高&#xff0c;今天我们来学习一个更好的办法来应对数据查找——二叉搜索树。 目录…

【图论】基环树

基环树其实并不是树&#xff0c;是指有n个点n条边的图&#xff0c;我们知道n个点n-1条边的连通图是树&#xff0c;再加一条边就会形成一个环&#xff0c;所以基环树中一定有一个环&#xff0c;长下面这样&#xff1a; 由基环树可以引申出基环内向树和基环外向树 基环内向树如…

ADC详解

一、ADC 简介 ADC 即模拟数字转换器&#xff0c;英文详称 Analog-to-digital converter&#xff0c;可以将外部的模拟信号转换为数字信号。 STM32F4xx 系列芯片拥有 3 个 ADC&#xff0c;这些 ADC 可以独立使用&#xff0c;其中 ADC1 和 ADC2 还可以组成双重模式&…

HarmonyOS ArkTS修改App的默认加载的界面(二十)

前言&#xff1a;在Android开发中想要修改默认启动页&#xff0c;只需要在AndroidManifest.xml中设置即可 只需要在启动的activity种添加如下属性即可 <intent-filter><action android:name"android.intent.action.MAIN" /><category android:name&qu…

相同的树[简单]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你两棵二叉树的根节点p和q&#xff0c;编写一个函数来检验这两棵树是否相同。如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q [1,…

Debezium发布历史109

原文地址&#xff1a; https://debezium.io/blog/2021/09/16/debezium-1-7-cr1-released/ 欢迎关注留言&#xff0c;我是收集整理小能手&#xff0c;工具翻译&#xff0c;仅供参考&#xff0c;笔芯笔芯. Debezium 1.7.0.CR1 Released September 16, 2021 by Gunnar Morling r…