如何使用RabbitMQ和Python实现广播消息

使用 RabbitMQ 和 Python 实现广播消息的过程涉及设置一个消息队列和多个消费者,以便接收相同的消息。RabbitMQ 的 “fanout” 交换机允许你将消息广播到所有绑定的队列。以下是如何实现这一过程的详细步骤。

在这里插入图片描述

1、问题背景

在将系统从Morbid迁移到RabbitMQ时,发现RabbitMQ无法提供Morbid默认提供的广播行为。在广播模式下,当一个消息被添加到队列时,所有的消费者都会收到它。然而,在RabbitMQ中,消息会以轮询的方式分发给各个监听器。

代码例子如下:

# 消费者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

# 发送者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()

通过上述代码,将会出现问题,导致无法实现广播消息。

2、解决方案

  1. 使用交换机和队列来实现广播消息。具体方法如下:

(1)使用amqplib库来创建交换机和队列。在发送消息时,将消息发送到交换机,而不是队列。在接收消息时,将队列绑定到交换机,这样就可以收到交换机上所有消息。代码修改如下:

# 发送者
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

# 发送消息到交换机
exchange = 'my_exchange'
conn.send(str(i), exchange=exchange, destination='')

# 接收者
import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()

(2)使用StompJS 库来实现广播消息。具体方法如下:

// 消费者
var stompClient = Stomp.client('ws://localhost:61613');

stompClient.connect({}, function(frame) {
  stompClient.subscribe('/topic/demoqueue', function(message) {
    console.log('Received message: ' + message.body);
  });
});

// 发送者
var stompClient = Stomp.client('ws://localhost:61613');

stompClient.connect({}, function(frame) {
  stompClient.send('/topic/demoqueue', {}, 'Hello, world!');
});

通过以上步骤,你可以实现 RabbitMQ 的消息广播功能。多个消费者可以同时接收来自同一个生产者的消息,这是构建分布式系统时非常常见的场景。如果需要更复杂的消息处理,可以在此基础上进行扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/911117.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【RabbitMQ】04-发送者可靠性

1. 生产者重试机制 spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval…

java的类加载机制的学习

一、类加载的过程 一个类被加载到虚拟机内存中开始,到卸载出虚拟机内存为止,整个生命周期分为七个阶段,分别是加载、验证、准备、解析、初始化、使用和卸载。其中验证、准备和解析这三个阶段统称为连接。 除去使用和卸载,就是Ja…

uni-app跨域set-cookie

set-cookie的值是作为一个权限控制的 首先,无论什么接口都会返回一个set-cookie,但未登录时,set-cookie是没有任何权限的 其次,登录接口请求时会修改set-cookie,并且在后续其他接口发起请求时,会在请求头…

让智能体—“正念365”陪你一起“养心”

佛学的“八正道”中,笔者个人观点,“正念”是最适合当代人低门槛练习的一个,因为不需要阅读大量的知识来理解概念,只需要保持对当下的觉察,发现分心了,就不带评价的把注意力拉回到当前的事情上就好。就是佛…

浅析Android Handler机制实现原理

0. 背景描述 Android系统出于对简单、高效的考虑,在设计UI体系时采用了单线程模型,即不会在多个线程中对同一个UI界面执行操作。简单是指单线程模型可以不用考虑和处理在多线程环境下操作UI带来的线程安全问题,高效是指单线程模型下无需通过…

vue 3:监听器

目录 1. 基本概念 2. 侦听数据源类型 1. 监听getter函数 2. 监听 ref 或 reactive 的引用 3. 多个来源组成的数组 4. 避免直接传递值!!! 3. 深层侦听器 4. 立即回调的侦听器 5. 一次性侦听器 6. watchEffect() 7. 暂停、恢复和停止…

沉浸式学习新体验:3D虚拟展厅如何重塑教育格局!

3D虚拟展厅对于教育行业产生了深远的影响,主要体现在以下几个方面: 一、创新教学方式 3D虚拟展厅利用三维技术构建的虚拟展示空间,为教育行业带来了一种全新的教学方式。传统的教学方式往往局限于书本和课堂,而3D虚拟展厅则能够…

【Kafka】Windows+KRaft部署指南

【Kafka】WindowsKRaft部署指南 摘要本地环境说明官网快速开始修改config/kraft/server.properties初始化数据存储目录启动 测试创建topic创建生产者创建消费者 FAQ输入行太长。命令语法不正确。问题描述解决方案 参考资料 摘要 Kafka是一种高吞吐量的分布式发布订阅消息系统&…

面相小白的php反序列化漏洞原理剖析

前言 欢迎来到我的博客 个人主页:北岭敲键盘的荒漠猫-CSDN博客 本文整理反序列化漏洞的一些成因原理 建议学习反序列化之前 先对php基础语法与面向对象有个大体的了解 (我觉得我整理的比较细致,了解这俩是个啥就行) 漏洞实战情况 这个漏洞黑盒几乎不会被发现&am…

景联文科技专业数据标注公司:高质量数据标注推动AI产业发展

在当今数据驱动的时代,高质量的数据标注对于机器学习、自然语言处理(NLP)和计算机视觉等技术领域的发展起着至关重要的作用。 数据标注是指对原始数据进行处理,标记对象的特征,生成满足机器学习训练要求的可读数据编码…

yelp数据集上识别潜在的热门商家

yelp数据集是研究B2C业态的一个很好的数据集,要识别潜在的热门商家是一个多维度的分析过程,涉及用户行为、商家特征和社区结构等多个因素。从yelp数据集里我们可以挖掘到下面信息有助于识别热门商家 用户评分和评论分析 评分均值: 商家的平均评分是反映其…

YOLO11改进 | 融合改进 | C3k2融合ContextGuided 【独家改进, 两种方式】

秋招面试专栏推荐 :深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 本文给大家带来的教程是将YOLO11的C3k2替…

【harbor】离线安装2.9.0-arm64架构服务制作和升级部署

harbor官网地址:Harbor 参考文档可以看这里:部署 harbor 2.10.1 arm64 - 简书。 前提环境准备: 安装docker 和 docker-compose 先拉arm64架构的harbor相关镜像 docker pull --platformlinux/arm64 ghcr.io/octohelm/harbor/harbor-regist…

InfluxDB 2 关闭pprof

背景: Go 语言的 net/http/pprgf包如未配置正确暴露在公网容易引起敏感信息泄漏问题,导致源码等信息泄漏。 influxdb 2 默认是开启pprof的 使用 localhost:8086/debug/pprof/goroutine?debug1 可以看到接口暴露的信息 如何关闭pprof 官方文档&…

CJ/T188-2004 详细介绍

REDISANT 提供互联网与物联网开发测试套件 # 互联网与中间件: Redis AssistantZooKeeper AssistantKafka AssistantRocketMQ AssistantRabbitMQ AssistantPulsar AssistantHBase AssistantNoSql AssistantEtcd AssistantGarnet Assistant 工业与物联网&#xff1…

阿里云k8s-master部署CNI网络插件遇到的问题

问题 按照网络上的部署方法 cd /opt/k8s # 下载 calico-kube-controllers配置文件,可能会网络超时 curl https://docs.projectcalico.org/manifests/calico.yaml -O kubectl apply -f calico.yaml 试了很多次都不行,k8s-master都是Not ready的状态 ca…

Netty篇(学习前言)

目录 一、为什么使用Netty 1. Netty编程相比NIO编程的优势 2. Netty 相比其它网络应用框架的优势 二、让我们走进Netty 1. 简介 2. 设计目标 3. 主要特点 4. Netty的作者 5. Netty 的地位 6. Netty 的优势 五、Netty版本说明 六、Netty架构设计 1. 线程模型基本介绍…

C/C++使用AddressSanitizer检测内存错误

AddressSanitizer 是一种内存错误检测工具,编译时添加 -fsanitizeaddress 选项可以在运行时检测出非法内存访问,当发生段错误时,AddressSanitizer 会输出详细的错误报告,包括出错位置的代码行号和调用栈,有助于快速定位…

JavaScript基础语法部分-黑马跟课笔记

一、Javascript介绍 1.JavaScript是什么? 1.是什么? 是一种运行在客户端(浏览器)的编程语言,实现人机交互效果 2.作用(做什么?) 网页特效(监听用户的一些行为让网页做…

【MongoDB】MongoDB的Java API及Spring集成(Spring Data)

文章目录 Java APISpring 集成1. 添加依赖2. 配置 MongoDB3. 创建实体类4. 创建 Repository 接口5. 创建 Service 类6. 创建 Controller 类7. 启动 Spring Boot 应用8. 测试你的 API 更多相关内容可查看 Java API maven <dependency><groupId>org.mongodb</gr…