RabbitMQ 发布订阅

 RabbitMQ 发布订阅视频学习地址:

简单模式下RabbitMQ 发布者发布消息 消费者消费消息

Publist/Subscribe 发布订阅

RabbitMQ 中,发布订阅模式是一种消息传递方式,其中发送者(发布者)不会将消息直接发送到特 定的接收者(订阅者)。而是将消息发送到一个交换机,交换机将消息转发到绑定到该交换机的每个队 ,每个绑定交换机的队列都将接收到消息。消费者(订阅者)监听自己的队列 并进行消费 。

 

场景 : 开放平台 开发者订阅了某个开放平台的 api 之后,数据有变化就会自动获取到最新的

 

 

 

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

 

P :生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X (交换机)
C :消费者,消息的接收者,会一直等待消息到来
Queue :消息队列,接收消息、缓存消息
Exchange :交换机( X )。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递 交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
Exchange 有常见以下 3 种类型:
Fanout :广播,将消息交给所有绑定到交换机的队列
Direct :定向,把消息交给符合指定 routing key 的队列
Topic :通配符,把消息交给符合 routing pattern (路由模式) 的队列
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

 

RabbitMQ 发布订阅模式的一些应用场景:  

 

1. 数据提供商与应用商 :例如中国气象局向多个门户网站提供气象数据。
2. 新闻机构 :将独家新闻发布给多个订阅者,但可能需要根据新闻类型进行更精细的路由。
3. 商城系统 :新添加商品后,同时更新缓存和数据库。
4. 用户通知 :用户充值或转账成功后,通过多种方式(如短信、邮件)通知用户。
5. 消息广播 :将消息广播到多个消费者,例如系统公告、活动通知等。
6. 降低耦合 :生产者和消费者通过 RabbitMQ 进行解耦,不需要直接连接,提高系统的灵活性和可
扩展性。
7. 异步处理 :生产者发送消息后,消费者可以异步处理,提高系统的响应速度和并发处理能力。

 

生产者
emit_log.go

 

package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 连接到RabbitMQ服务器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", //name 交换机名称
"fanout", //交换机类型 Fanout 广播
true, //durable 持久化
false, //autoDelete 是否自动删除
false, //internal 是否内部使用 设置为 false 时,表示无论如何这个交换器都不是
内置的
false, //noWait 是否等待服务器响应 参数通常默认为False,意味着操作会同步进
行并等待服务器的响应
nil, // 其他属性
)
failOnError(err, "Failed to declare an exchange")
//发送消息
body := bodyForm(os.Args)
// 发布消息到交换机,并指定路由键
err = ch.PublishWithContext(
context.Background(),
"logs", // 交换器的名称
"", // 队列名
false, // mandatory 必须发送到队列 ,false表示如果交换器无法根据自身的类型和路
由键找到一个符合条件的队列丢弃
false, //immediate 参数设置为 false 时,表示消息不需要立即被消费者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}

 

消费者
receive_log.go

 

package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建一个Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//声明一个交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部使用
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare an exchange")
// 声明一个临时队列
q, err := ch.QueueDeclare(
"", // 队列名称,留空表示由RabbitMQ自动生成
false, // 是否持久化
false, // 是否自动删除(当没有任何消费者连接时)
true, // 是否排他队列(仅限于当前连接)
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to declare a queue")
// 将队列绑定到交换机上
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键,留空表示接收交换机的所有消息
"logs", // 交换机名称
false, // 是否等待服务器响应
nil, // 其他属性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符,留空表示由RabbitMQ自动生成
true, // 是否自动应答
false, // 是否独占模式(仅限于当前连接)
false, // 是否等待服务器响应
false, // noLocal
nil, // 其他属性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}

 运行

# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再终端看到日志
go run receive_log.go
# shell2
go run emit_log.go

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/643300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux文本处理三剑客(详解)

一、文本三剑客是什么&#xff1f; 1. 对于接触过Linux操作系统的人来说&#xff0c;应该都听过说Linux中的文本三剑客吧&#xff0c;即awk、grep、sed&#xff0c;也是必须要掌握的Linux命令之一&#xff0c;三者都是用来处理文本的&#xff0c;但侧重点各不相同&#xff0c;a…

Docker-镜像迁移的三种方式=>备份恢复公有仓库私有仓库

制作好的镜像要被别人使用&#xff0c;有三种方式&#xff1a; 1.先备份镜像&#xff0c;别人通过u盘或者其它方式拷贝后&#xff0c;再恢复镜像&#xff0c;这种方式比较麻烦 2.将制作的镜像上传到公共镜像仓库&#xff0c;被别人拉取后使用&#xff0c;但可能存在网络不通畅或…

嵩山为什么称为三水之源

三水指黄河、淮河、济河&#xff0c;这三条河流环绕在嵩山周边。 黄河横亘在嵩山北部&#xff0c;其支流伊洛河从西南方环绕嵩山&#xff0c;然后汇入黄河。济河&#xff0c;古称济水&#xff0c;源自济源王屋山&#xff0c;自身河道在东晋时代被黄河夺占&#xff0c;从此消失。…

【Spring MVC】_SpringMVC项目返回数据

目录 1. 注解使用示例 1.1 使用Controller注解 1.2 使用RestController注解 1.3 使用Controller与ResponseBody注解 2. 关于ResponseBody注解 前文已经介绍过使用Controller注解向前端返回一个HTML页面&#xff0c;接下来将介绍向前端返回数据。 关于Controller和RestCon…

算法金 | Dask,一个超强的 python 库

本文来源公众号“算法金”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;Dask&#xff0c;一个超强的 python 库 1 Dask 概览 在数据科学和大数据处理的领域&#xff0c;高效处理海量数据一直是一项挑战。 为了应对这一挑战&am…

初学者都能掌握的操作符(中)

&#xff08;1&#xff09;位操作符&#xff08;& | ^&#xff09; &&#xff1a;&#xff08;按二进制位“与”&#xff09; 也就是两个数的每一位二进制数按照 “与” 的算法&#xff0c;如下&#xff1a; int a 3 ,b 5 ; c a & b; 我们首先写出a和b的二进…

Java面试八股之Synchronized和ReentrantLock的区别

Synchronized和ReentrantLock的区别 实现级别&#xff1a; synchronized是Java的一个关键字&#xff0c;属于JVM层面的原生支持&#xff0c;它通过监视器锁&#xff08;Monitor&#xff09;来实现同步控制&#xff0c;无需手动获取和释放锁。 ReentrantLock是java.util.conc…

【Linux网络编程】传输层中的TCP和UDP(TCP篇)

【Linux网络编程】传输层中的TCP和UDP&#xff08;TCP篇&#xff09; 目录 【Linux网络编程】传输层中的TCP和UDP&#xff08;TCP篇&#xff09;TCP协议TCP协议段格式确认应答&#xff08;ACK&#xff09;机制&#xff08;保证可靠性&#xff09;超时重传机制连接管理机制理解T…

aws msk加密方式和问控制连接方式

msk加密方式 msk提供了两种加密方式 静态加密传输中加密 创建集群时可以指定加密方式&#xff0c;参数如下 aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file:/…

ASP+ACCESS公司门户网站建设

【摘 要】随着计算机科学的发展&#xff0c;数据库技术在Internet中的应用越来越广泛&#xff0c;为广大网络用户提供了更加周到和人性化的服务。本文讲解了一个公司的网站的建设&#xff0c;它基于数据关联规则的公司个性化页面及动态数据生成案例&#xff0c;在网页方面&…

Kubeadm安装部署k8s集群、踩坑日常

背景 ​ Docker是一个非常流行的容器化平台&#xff0c;它可以让我们方便构建、打包、发布和运行容器化应用程序。但是&#xff0c;在生产环境中&#xff0c;我们可能需要处理成百上千个容器&#xff0c;需要更好的管理这些容器&#xff0c;这就是Kubernetes(K8S)的用武之地。…

利用大语言模型增强网络抓取:一种现代化的方法

本文将探讨大语言模型(LLMs)与网络抓取的集成&#xff0c;以及如何利用LLMs高效地将复杂的HTML转换为结构化的JSON。 作为一名数据工程师&#xff0c;我的职业生涯可以追溯到2016年。那时&#xff0c;我的主要职责是利用自动化工具从不同网站上获取海量数据&#xff0c;这个过…

网络模型-策略路由配置

在实际网络应用中&#xff0c;策略路由也是一种重要的技术手段。尽管在考试并不注重策略路由&#xff0c;但是实际上应用较多建议考生除了掌握基本的静态路由协议IP route-static&#xff0c;动态路由协议RIP、还要掌握如何配置策略路由。策略路由的基本原理:根据ACL定义的不同…

云界洞见:移动云服务开启技术创新与问题解决的新篇章

一、什么是移动云 移动云以“央企保障、安全智慧、算网一体、属地服务”为品牌支撑&#xff0c;聚焦智能算力建设&#xff0c;打造一朵智能、智慧、安全可信可控的云&#xff0c;提供更优质的算力服务&#xff0c;引领云计算产业发展。 那么下面博主带领大家了解移动云的优势所…

Golang单元测试

文章目录 传统测试方法基本介绍主要缺点 单元测试基本介绍测试函数基准测试示例函数 传统测试方法 基本介绍 基本介绍 代码测试是软件开发中的一项重要实践&#xff0c;用于验证代码的正确性、可靠性和预期行为。通过代码测试&#xff0c;开发者可以发现和修复潜在的错误、确保…

【vue-cli搭建vue项目的过程2.x】

vue-cli搭建vue项目 vue-cli搭建vue项目安装node安装vue-cli脚手架并创建项目安装 Ant Design Vue或element-ui(笔者使用Ant-design-vue组件&#xff0c;并全局引入)开发安装三方库包1、Package.json文件---引入如下package.json文件执行npm i或npm install命令即可下载如下依赖…

Cloudflare Worker 部署bingai

Cloudflare Worker 部署 1. 注册 Cloudflare 账号 2. 一键部署 登录账户后, 点击下面链接 https://deploy.workers.cloudflare.com/?urlhttps://github.com/Harry-zklcdc/go-proxy-bingai 点击「Authorize Workers」, 登录 Github 账号授权 Cloudflare 点击「I have a ac…

输入输出(3)——C++的标准输入流

目录 一、cin 流 二、成员函数 get 获取一个字符 (一)无参数的get函数。 (二)有一个参数的get函数。 (三&#xff09;有3个参数的get函数 (四&#xff09;用成员函数 getline 函数读取一行字符 (五&#xff09;用成员函数 read 读取一串字符 (六&#xff09;istream 类…

【Basic】Upload-Labs-Linux

文章目录 前言Pass-01Pass-02Pass-03Pass-04Pass-05Pass-06Pass-07Pass-08Pass-09Pass-10Pass-11Pass-12Pass-13Pass-14Pass-15Pass-16解题感悟 前言 美好的一天从刷题开始 Pass-01 我淦20道题&#xff1f;&#xff1f;&#xff1f;一道一道来吧 先看第一道题 先在home里搞一…

蜂窝物联四情监测:助力农业升级,科技赋能打造丰收新篇章!

农业四情指的是田间的虫情、作物的苗情、气候的灾情和土壤墒情。“四情”监测预警系统的组成包括管式土壤墒情监测站、虫情测报灯、气象站、农情监测摄像机&#xff0c;可实时监测基地状况,可以提高监测的效率和准确性&#xff0c;为农业生产提供及时、科学的数据支持&#xff…