RabbitMQ-发布/订阅模式

1、发布/订阅模式介绍

在普通的生产者、消费者模式,rabbitmq会将消息依次传递给每一个消费者,一个worker一个,平均分配,这就是Round-robin调度方式,为了实现更加复杂的调度,我们就需要使用发布/订阅的方式。

2、交换机(exchange)

RabbitMQ中,消息模型的核心理念就是,生产者从来不能直接将消息发送到队列,甚至生产者都不知道消息要被发送到队列中。

相反,生产者只能将消息发送到交换机中,交换机一侧从生产者接收消息,一侧将消息发送到队列中,交换机需要知道如何处理接收到的消息,是发送给一个队列还是多个队列?这是由交换机的类型决定的。

交换机共分为四类:  directtopicheaders and fanout. 本章节以扇形交换机为例说明rabbitmq的使用。

3、fanout交换机的使用方式

扇形交换机,就像你猜测的那样,他可以将他接收到的全部消息广播到所有队列里。

3.1 声明交换机

首先声明一个扇形交换机,type参数设置为『fanout』

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

3.2 发送消息到交换机

交换机设定完成后,就可以往该交换机发送消息:

	body := "Hello World!"
	err = ch.Publish("logs", "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

如果要在rabbitmq的页面上查看发送的消息,需要提前创建一个队列,并绑定到该交换机[logs]上,就可以查看发送的消息:

扇形交换机的特性,就是他会将收到的消息广播给所有绑定到该交换机的队列,我们可以创建多个队列,并绑定到该交换机上,我们发送一次消息,就会看到,所有绑定到该交换机的队列中都会有一条消息,先创建三个队列,并分别绑定到logs交换机:

之后运行脚本,发送两次消息:

 可以看到,三个队列当中都有两条消息。

3.2 扇形交换机发送消息代码

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
	if err != nil {
		fmt.Println("Failed to declare an exchange")
		return
	}
	body := "Hello World!"
	err = ch.Publish("logs", "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
	if err != nil {
		fmt.Println("Failed to publish a message")
		return
	}
}

 3.2 声明队列,用于接收消息

	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)

声明队列时,没有指定队列名称,这时,系统会返回一个随机名称存储在q变量中。 

3.3 binding

队列声明完成后,需要将该队列绑定到交换机上,这样交换机才能把消息广播给该队列:

绑定代码: 

    err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)

消费者侧全部代码如下:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
	if err != nil {
		fmt.Println("Failed to declare an exchange")
		return
	}
	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	var forever chan struct{}

	go func() {
		for d := range msgs {
			fmt.Printf(" [x] %s\n", d.Body)
		}
	}()

	fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

程序启动后,控制台上会增加一个随机命名的队列。

 运行【3.2】的生产者程序,发送消息到扇形交换机,这个时候消费者就会同步消费到消息,并进行打印:

4、总结

关于扇形交换机,核心的一点需要我们记住,发送到扇形交换机的消息,他会将消息广播给所有绑定到该交换机的队列上,无脑广播,所有队列会同时接受到交换机上全部的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/664753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【linux】开机调用python脚本

linux中&#xff0c;可以使用crontab 设置开机自动调用 crontab的安装在前面文章里写过了&#xff0c;不再重复 首先&#xff0c;还是进入crontab配置文件 crontab -e 进入之后&#xff0c;跟其他定时任务不同&#xff0c;只需要在时间配置那里用rebooot 这类之后的两个文件的…

qwen-moe

一、定义 qwen-moe 代码讲解&#xff0c; 代码qwen-moe与Mixtral-moe 一样&#xff0c; 专家模块qwen-moe 开源教程Mixture of Experts (MoE) 模型在Transformer结构中如何实现&#xff0c;Gate的实现一般采用什么函数&#xff1f; Sparse MoE的优势有哪些&#xff1f;MoE是如…

NFTScan 获 Google Cloud 战略支持!

近日&#xff0c;NFT 数据基础设施服务商 NFTScan 获得全球领先云计算服务提供商 Google Cloud 战略支持。未来&#xff0c;双方将在链上数据和区块链领域展开战略合作&#xff0c;高效联动&#xff0c;共同探索区块链技术的更多可能性&#xff0c;为用户和行业带来更多惊喜与成…

强烈推荐十款数据防泄密软件,高人气的数据防泄密软件

100G的文件不见了&#xff1f;客户的电话信息被拷贝走了&#xff1f;源代码被竞争对手搞到手了&#xff1f;这些都是严重的数据泄密事件&#xff0c;为此&#xff0c;我们需要数据防泄密软件来全方位保护数据安全。根据当前市场上的热门推荐和综合评价&#xff0c;以下几款数据…

基于Linux的文件操作(socket操作)

基于Linux的文件操作&#xff08;socket操作&#xff09; 1. 文件描述符基本概念文件描述符的定义&#xff1a;标准文件描述符&#xff1a;文件描述符的分配&#xff1a; 2. 文件描述符操作打开文件读取文件中的数据 在linux中&#xff0c;socket也被认为是文件的一种&#xff…

JS【详解】快速排序

快速排序的时间复杂度为 O(n2) 排序流程 1、首先设定一个分界值&#xff08;比如数组最中间的元素&#xff09;&#xff0c;通过该分界值将数组分成左右两部分。 2、将大于或等于分界值的数据集中到数组右边&#xff0c;小于分界值的数据集中到数组的左边。 3、对左侧和右侧的…

项目中父模块调用子模块出现 Invalid bound statement (not found)问题

背景 做某个saas项目的时候&#xff0c;我把用户、角色、菜单、字典等模块弄成了一个基础包&#xff0c;想着如果之后又类似的项目的时候可以偷个懒 直接引用基础包就可以了。 当我引用的时候出现了这个问题 Invalid bound statement (not found):xxx 分析思路 这个问题一般…

卧式饲料搅拌机:养殖场得力助手

卧式饲料搅拌机采用卧式结构&#xff0c;设计科学合理&#xff0c;操作简便。相比传统的立式搅拌机&#xff0c;卧式搅拌机具有更大的搅拌容量和更均匀的搅拌效果。它能够轻松应对不同种类、不同比例的饲料混合需求&#xff0c;确保饲料成分的均衡分布&#xff0c;从而提高饲料…

【强化学习】DPO(Direct Preference Optimization)算法学习笔记

【强化学习】DPO&#xff08;Direct Preference Optimization&#xff09;算法学习笔记 RLHF与DPO的关系KL散度Bradley-Terry模型DPO算法流程参考文献 RLHF与DPO的关系 DPO&#xff08;Direct Preference Optimization&#xff09;和RLHF&#xff08;Reinforcement Learning f…

KMPlayer v2024.4.25.13 官方版 (万能播放器)

前言 KMPlaye通过各种插件扩展KMP可以支持层出不穷的新格式。KMPlaye强大的插件功能&#xff0c;直接从Winamp继承的插件功能&#xff0c;能够直接使用Winamp的音频&#xff0c;输入&#xff0c;视觉效果插件&#xff0c;而通过独有的扩展能力&#xff0c;只要你喜欢&#xff…

【linux-imx6ull-设备树点灯】

目录 1. 设备树简介1.1 编译-引用1.2 设备树文件结构1.3 设备树节点介绍1.3.1 特殊节点chosen 1.4 节点内容追加 2. 设备树常用OF操作函数2.1 节点寻找类2.2 属性提取类2.3 其它常用类 4. 设备树下LED实验4.1 实验简介4.2 添加LED设备节点4.3 获取设备节点并提取属性4.3.1 获取…

国内类似ChatGPT的大模型应用有哪些?发展情况如何了

第一部分&#xff1a;几个容易混淆的概念 很多人&#xff0c;包括很多粉丝的科技博主&#xff0c;经常把ChatGPT和预训练大模型混为一谈&#xff0c;因此有必要先做一个澄清。预训练大语言模型属于预训练大模型的一类&#xff0c;而ChatGPT、文心一言又是预训练大语言模型的一个…

【Linux】Linux基本指令3

目录 1.date指令 2.cal指令 3.find指令&#xff1a;&#xff08;灰常重要&#xff09; -name 4.grep指令——行文本过滤工具 5.zip/unzip指令&#xff1a; 6.tar指令&#xff08;重要&#xff09;&#xff1a;打包/解包&#xff0c;不打开它&#xff0c;直接看内容 7.bc…

SpringBoot六种API请求参数读取方式

SpringBoot六种API请求参数读取方式 同步请求和异步请求 同步: 指单线程依次做几件事异步: 指多线程同时做几件事 同步请求: 指客户端浏览器只有一个主线程, 此线程负责页面的渲染和发出请求等操作, 如果此主线程发出请求的话则停止渲染而且会清空页面显示的内容 直到服务器响…

3d渲染的常用概念和技术,渲染100邀请码1a12

之前我们介绍了3D渲染的基本原理和流程&#xff0c;这次说下几个常用概念和技术。 3D渲染中涉及到很多专业的概念和技术&#xff0c;它们决定了渲染质量和效果&#xff0c;常用的有以下几个。1、光线追踪 光线追踪是一些专业渲染器&#xff08;如V-Ray和Corona等&#xff09;…

算法思想总结:哈希表

一、哈希表剖析 1、哈希表底层&#xff1a;通过对C的学习&#xff0c;我们知道STL中哈希表底层是用的链地址法封装的开散列。 2、哈希表作用&#xff1a;存储数据的容器&#xff0c;插入、删除、搜索的时间复杂度都是O&#xff08;1&#xff09;&#xff0c;无序。 3、什么时…

Android HIDL接口添加

一.HIDL介绍 HIDL的全称是HAL interface definition language&#xff08;硬件抽象层接口定义语言&#xff09;&#xff0c;是Android Framework 与Android HAL之间的接口。HIDL 旨在用于进程间通信 (IPC)&#xff0c;进程之间的通信 采用 Binder 机制。 二.HIDL 与AIDL 的对…

客户文章|难能可贵,非模式生物的功能研究与创新

菜豆&#xff08;Phaseolus vulgaris&#xff09;&#xff0c;又名四季豆、芸豆、油豆角&#xff0c;是全球第一大豆类蔬菜&#xff0c;我国是世界上最主要的菜豆生产国和销售国。在田间生产过程中&#xff0c;菜豆常面临着各种生物和非生物逆境的胁迫&#xff0c;对其产量品质…

FOC - BLDC六步换相驱动原理

文章目录 1 . 前言2 . 电机旋转原理3 . BLDC特点4 . BLDC反电动势投影位置5 . BLDC换相时刻6 . BLDC换相注意事项7 . 小结 【全文大纲】 : https://blog.csdn.net/Engineer_LU/article/details/135149485 1 . 前言 无刷直流电机在这里区分为两种&#xff0c;一是永磁无刷直流电…

【Linux-LCD 驱动】

Linux-LCD 驱动 ■ Framebuffer 简称 fb■ LCD 驱动程序编写■ 1、LCD 屏幕 IO 配置■ 2、LCD 屏幕参数节点信息修改■ 3、LCD 屏幕背光节点信息■ 4、使能 Linux logo 显示 ■ 设置 LCD 作为终端控制台■ 1、设置 uboot 中的 bootargs■ 2、修改/etc/inittab 文件 ■ LCD 背光…