python rabbitmq实现简单/持久/广播/组播/topic/rpc消息异步发送可配置Django

windows首先安装rabbitmq 点击参考安装

1、环境介绍

Python 3.10.16
其他通过pip安装的版本(Django、pika、celery这几个必须要有最好版本一致)
amqp              5.3.1
asgiref           3.8.1
async-timeout     5.0.1
billiard          4.2.1
celery            5.4.0
click             8.1.7
click-didyoumean  0.3.1
click-plugins     1.1.1
click-repl        0.3.0
colorama          0.4.6
Django            4.2
dnspython         2.7.0
eventlet          0.38.2
greenlet          3.1.1
kombu             5.4.2
pika              1.3.2
pip               24.2
prompt_toolkit    3.0.48
python-dateutil   2.9.0.post0
redis             5.2.1
setuptools        75.1.0
six               1.17.0
sqlparse          0.5.3
typing_extensions 4.12.2
tzdata            2024.2
vine              5.1.0
wcwidth           0.2.13
wheel             0.44.0

2、创建Django 项目

django-admin startproject django_rabbitmq

3、在setting最下边写上

# settings.py    guest:guest 表示的是你安装好的rabbitmq的登录账号和密码
BROKER_URL = 'amqp://guest:guest@localhost:15672/'
CELERY_RESULT_BACKEND = 'rpc://'

4.1 简单模式

4.1.1 在和setting同级的目录下创建一个叫consumer.py的消费者文件,其内容如下:

import pika


def callback(ch, method, properties, body):
    print(f"[x] Received {body.decode()}")


def start_consuming():
    # 创建与RabbitMQ的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明一个队列
    channel.queue_declare(queue='hello')

    # 指定回调函数
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print('[*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == "__main__":
    start_consuming()

4.1.2 在和setting同级的目录下创建一个叫producer.py的生产者文件,其内容如下:

import pika


def publish_message():
    # message = request.GET.get('msg')
    # 创建与RabbitMQ的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明一个队列
    channel.queue_declare(queue='hello')

    # 发布消息
    message = "Hello World!"
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print(f"[x] Sent '{message}'")

    # 关闭连接
    connection.close()


if __name__ == "__main__":
    publish_message()

4.1.3 先运行消费者代码(consumer.py)再运行生产者代码(producer.py)

先:python consumer.py
再: python producer.py

4.1.4 运行结果如下:

在这里插入图片描述
在这里插入图片描述

4.2 消息持久化模式

4.2.1 在和setting同级的目录下创建一个叫recv_msg_safe.py的消费者文件,其内容如下:

import time
import pika


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    # 下边这个就是标记消费完成了,下次在启动接受消息就不用从头开始了,即
    # 手动确认消息消费完成 和auto_ack=False 搭配使用
    ch.basic_ack(delivery_tag=method.delivery_tag)  # method.delivery_tag就是一个标识符,方便找对人


def start_consuming():
    # 创建与RabbitMQ的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明一个队列
    channel.queue_declare(queue='hello2', durable=True)  # 若声明过,则换一个名字

    # 指定回调函数
    channel.basic_consume(queue='hello2',
                          on_message_callback=callback,
                          # auto_ack=True  # 为true则不能持久话消息,即消费者关闭后下次收不到之前未收取的消息
                          auto_ack=False  # 为False则下次依然从头开始收取消息,直到callback函数调用完成
                          )

    print('[*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == "__main__":
    start_consuming()

4.2.2 在和setting同级的目录下创建一个叫send_msg_safe.py的生产者文件,其内容如下:

import pika


def publish_message():
    # 创建与RabbitMQ的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明一个队列  durable=True队列持久化
    channel.queue_declare(queue='hello2', durable=True)

    channel.basic_publish(exchange='',
                          routing_key='hello2',
                          body='Hello World!',
                          # 消息持久话用,主要用作宕机的时候,估计是写入本地硬盘了
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
                      )

    # 关闭连接
    connection.close()


if __name__ == "__main__":
    publish_message()

4.2.3 先运行消费者代码(recv_msg_safe.py)再运行生产者代码(send_msg_safe.py) 执行结果如下:

在这里插入图片描述

4.3 广播模式

4.3.1 在和setting同级的目录下创建一个叫fanout_receive.py的消费者文件,其内容如下:

# 广播模式
import pika

# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(
#     host='localhost', credentials=credentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 指定发送类型
# 必须能过queue来收消息
result = channel.queue_declare("", exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)  # 随机生成的Q,绑定到exchange上面。
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()

4.3.2 在和setting同级的目录下创建一个叫fanout_send.py的生产者文件,其内容如下:

# 通过广播发消息
import pika
import sys

# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(
#     host='localhost', credentials=credentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') #发送消息类型为fanout,就是给所有人发消息

# 如果等于空,就输出hello world!
message = ' '.join(sys.argv[1:]) or "info: Hello World!"


channel.basic_publish(exchange='logs',
                      routing_key='',  # routing_key 转发到那个队列,因为是广播所以不用写了
                      body=message)

print(" [x] Sent %r" % message)
connection.close()

4.3.3 先运行消费者代码(fanout_receive.py)再运行生产者代码(fanout_send.py) 执行结果如下:

在这里插入图片描述

4.4 组播模式

4.4.1 在和setting同级的目录下创建一个叫direct_recv.py的消费者文件,其内容如下:

import pika
import sys

credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel() 

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]   # 接收那些消息(指info,还是空),没写就报错
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])  # 定义了三种接收消息方式info,warning,error
    sys.exit(1)

for severity in severities:  # [error  info  warning],循环severities
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)  # 循环绑定关键字
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(on_message_callback=callback, queue=queue_name,)
channel.start_consuming()

4.4.2 在和setting同级的目录下创建一个叫direct_send.py的生产者文件,其内容如下:

# 组播
import pika
import sys

credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #指定类型

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  #严重程序,级别;判定条件到底是info,还是空,后面接消息

message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity, #绑定的是:error  指定关键字(哪些队列绑定了,这个级别,那些队列就可以收到这个消息)
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()

4.4.3 先运行消费者代码(direct_recv.py)再运行生产者代码(direct_send.py) 执行结果如下:

在这里插入图片描述

4.5 更细致的topic模式

4.5.1 在和setting同级的目录下创建一个叫topic_recv.py的消费者文件,其内容如下:

import pika
import sys

credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print("sys.argv[0]", sys.argv[0])
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(on_message_callback=callback,queue=queue_name)

channel.start_consuming()

4.5.2 在和setting同级的目录下创建一个叫topic_send.py的生产者文件,其内容如下:

import pika
import sys

credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',exchange_type='topic') #指定类型

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息

channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

4.5.3 先运行消费者代码(topic_recv.py)再运行生产者代码(topic_send.py) 执行结果如下:

在这里插入图片描述

4.6 Remote procedure call (RPC) 双向模式

4.6.1 在和setting同级的目录下创建一个叫rpc_client.py的消费者文件,其内容如下:

import pika
import uuid
import time


# 斐波那契数列 前两个数相加依次排列
class FibonacciRpcClient(object):
    def __init__(self):
        # 赋值变量,一个循环值
        self.response = None
        # 链接远程
        # self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        #         host='localhost'))
        credentials = pika.PlainCredentials('guest', 'guest')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost', credentials=credentials))

        self.channel = self.connection.channel()

        # 生成随机queue
        result = self.channel.queue_declare("", exclusive=True)
        # 随机取queue名字,发给消费端
        self.callback_queue = result.method.queue

        # self.on_response 回调函数:只要收到消息就调用这个函数。
        # 声明收到消息后就 收queue=self.callback_queue内的消息  准备接受命令结果
        self.channel.basic_consume(queue=self.callback_queue,
                                   auto_ack=True, on_message_callback=self.on_response)

    # 收到消息就调用
    # ch 管道内存对象地址
    # method 消息发给哪个queue
    # body数据对象
    def on_response(self, ch, method, props, body):
        # 判断本机生成的ID 与 生产端发过来的ID是否相等
        if self.corr_id == props.correlation_id:
            # 将body值 赋值给self.response
            self.response = body

    def call(self, n):

        # 随机一次唯一的字符串
        self.corr_id = str(uuid.uuid4())

        # routing_key='rpc_queue' 发一个消息到rpc_queue内
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         # 执行命令之后结果返回给self.callaback_queue这个队列中
                                         reply_to=self.callback_queue,
                                         # 生成UUID 发送给消费端
                                         correlation_id=self.corr_id,),
                                   # 发的消息,必须传入字符串,不能传数字
                                   body=str(n))
        # 没有数据就循环收
        while self.response is None:
            # 非阻塞版的start_consuming()
            # 没有消息不阻塞  检查队列里有没有新消息,但不会阻塞
            self.connection.process_data_events()
            print("no msg...")
            time.sleep(0.5)
        return int(self.response)


# 实例化
fibonacci_rpc = FibonacciRpcClient()


response = fibonacci_rpc.call(5)
print(" [.] Got %r" % response)

4.6.2 在和setting同级的目录下创建一个叫rpc_server.py的生产者文件,其内容如下:

#_*_coding:utf-8_*_
import pika
import time
# 链接socket
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()

# 生成rpc queue  在这里声明的所以先启动这个
channel.queue_declare(queue='rpc_queue')

# 斐波那契数列
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


# 收到消息就调用
# ch 管道内存对象地址
# method 消息发给哪个queue
# props 返回给消费的返回参数
# body数据对象
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    # 调用斐波那契函数 传入结果
    response = fib(n)

    ch.basic_publish(exchange='',
                     # 生产端随机生成的queue
                     routing_key=props.reply_to,
                     # 获取UUID唯一 字符串数值
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     # 消息返回给生产端
                     body=str(response))
    # 确保任务完成
    # ch.basic_ack(delivery_tag = method.delivery_tag)


# 每次只处理一个任务
# channel.basic_qos(prefetch_count=1)
# rpc_queue收到消息:调用on_request回调函数
# queue='rpc_queue'从rpc内收
channel.basic_consume(queue="rpc_queue",
                      auto_ack=True,
                      on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()

4.6.3 先运行消费者代码(rpc_server.py)再运行生产者代码(rpc_client.py) 执行结果如下:

在这里插入图片描述

参考实现1

参考实现2

参考实现3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/940433.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Verilog】期末复习

数字逻辑电路分为哪两类?它们各自的特点是什么? 组合逻辑电路:任意时刻的输出仅仅取决于该时刻的输入,而与电路原来的状态无关 没有记忆功能,只有从输入到输出的通路,没有从输出到输入的回路 时序逻辑电路&…

光伏电站无人机巡检都有哪些功能?

焱图慧云光伏智能巡检系统主要依托于先进的无人机技术、传感器技术、图像处理技术和智能分析技术。 一、无人机自主飞行与航迹控制 全自主飞行:无人机能够按照预设的飞行路线自主飞行,完成指定的巡检任务,无需人工干预,大大提高了…

图书馆管理系统(三)基于jquery、ajax

任务3.4 借书还书页面 任务描述 这部分主要是制作借书还书的界面,这里我分别制作了两个网页分别用来借书和还书。此页面,也是通过获取books.txt内容然后添加到表格中,但是借还的操作没有添加到后端中去,只是一个简单的前端操作。…

如何使用 WebAssembly 扩展后端应用

1. WebAssembly 简介 随着互联网的发展,越来越多的应用借助 Javascript 转到了 Web 端,但人们也发现,随着移动互联网的兴起,需要把大量的应用迁移到手机端,随着手端的应用逻辑越来越复杂,Javascript 的解析…

《鸿蒙HarmonyOS应用开发从入门到精通(第2版)》简介

《鸿蒙HarmonyOS应用开发从入门到精通(第2版)》已于近日上市,该书由北京大学出版社出版。距离第1版上市已经过去二年半多。本文希望与读者朋友们分享下这本书里面的大致内容。 封面部分 首先是介绍封面部分。 《鸿蒙HarmonyOS应用开发从入门…

Linux -- 线程控制相关的函数

目录 pthread_create -- 创建线程 参数 返回值 代码 -- 不传 args: 编译时带 -lpthread 运行结果 为什么输出混杂? 如何证明两个线程属于同一个进程? 如何证明是两个执行流? 什么是LWP? 代码 -- 传 args&a…

VTK知识学习(26)- 图像基本操作(一)

1、前言 图像处理离不开一些基本的图像数据操作,例如获取和修改图像的基本信息、访问和修改图像像素值、图像显示、图像类型转换等。熟练掌握这些基本操作有助于使用 VTK进行图像处理应用程序的快速开发。 2、图像信息的访问与修改 1)利用vtkIamgeData…

【WPF】把DockPanel的内容生成图像

要在WPF中将一个 DockPanel 的内容生成为图像并保存,可以按照与之前类似的步骤进行,但这次我们将专注于 DockPanel 控件而不是整个窗口。 DockPanel的使用 WPF(Windows Presentation Foundation)中的 DockPanel 是一种布局控件&…

【Linux】处理用户输入

一、基本介绍 1、如何传递参数 向shell脚本传递数据的最基本方法就是通过命令行参数。如下,这条命令会向test.sh脚本传递10和20这两个参数。 ./test.sh 10 20 2、如何读取参数 bash shell会将所有的命令行参数都指派给称作位置参数(positional parame…

SpringBoot+Vue3实现阿里云视频点播 实现教育网站 在上面上传对应的视频,用户开会员以后才能查看视频

要使用阿里云视频点播(VOD)实现一个教育网站,其中用户需要成为会员后才能查看视频,这个过程包括上传视频、设置权限控制、构建前端播放页面以及确保只有付费会员可以访问视频内容。 1. 视频上传与管理 创建阿里云账号&#xff…

POI-TL插件开发-表格分组插件

POI-TL版本:1.12.2 改造于:LoopRowTableRenderPolicy 模板设计: 分组之前: 分组之后: 代码实现: public class LoopRowGroupTableRenderPolicy implements RenderPolicy {private String prefix;privat…

发送webhook到飞书机器人

发送webhook到飞书机器人 参考链接 自定义机器人使用指南 创建自定义机器人 邀请自定义机器人进群。 进入目标群组,在群组右上角点击更多按钮,并点击 设置。 在右侧 设置 界面,点击 群机器人。 在 群机器人 界面点击 添加机器人。 在 添…

36. Three.js案例-创建带光照和阴影的球体与平面

36. Three.js案例-创建带光照和阴影的球体与平面 实现效果 知识点 Three.js基础 WebGLRenderer WebGLRenderer 是Three.js中最常用的渲染器,用于将场景渲染到网页上。 构造器 new THREE.WebGLRenderer(parameters)参数类型描述parametersobject可选参数&#…

Mybatis分页插件的使用问题记录

项目中配置的分页插件依赖为 <dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper</artifactId><version>5.1.7</version></dependency>之前的项目代码编写分页的方式为&#xff0c;通过传入的条件…

RIP---路由信息协议

动态路由 自治系统 ---AS 由单一的机构或组织所管理的一系列 IP 网络设备的集合 。 AS 编号&#xff1a; ASN----1-65535----IANA &#xff08;互联网数字分配机构&#xff09; AS 的通讯方式 AS 内部 ---- 运行相同的路由协议 ---- 内部网关协议&#xff08; IGP &#x…

NLP 分词技术浅析

一、NLP 分词技术概述 &#xff08;一&#xff09;定义 自然语言处理&#xff08;NLP&#xff09;中的分词技术是将连续的文本序列按照一定的规则切分成有意义的词语的过程。例如&#xff0c;将句子 “我爱自然语言处理” 切分为 “我”、“爱”、“自然语言处理” 或者 “我…

排序算法:冒泡排序

每一次顺序便遍历&#xff0c;比较相邻的两个元素&#xff0c;交换。 void bubbleSort(vector<int>&v) { int n v.size();//元素个数 //外层j控制的是待排序区间的长度 for (int j n;j > 1;j--) { bool flag 0;//提高效率&#xff0c;判断比较好了就结束 /…

抽象之诗:C++模板的灵魂与边界

引言 在计算机科学的浩瀚长河中&#xff0c;C模板如同一颗璀璨的星辰&#xff0c;以其独特的泛型编程方式为程序设计注入了灵魂。它是抽象的艺术&#xff0c;是类型的舞蹈&#xff0c;是效率与灵活性的交响乐。模板不仅是一种技术工具&#xff0c;更是一种哲学思考&#xff0c…

Linux通信System V:消息队列 信号量

Linux通信System V&#xff1a;消息队列 & 信号量 一、信号量概念二、信号量意义三、操作系统如何管理ipc资源&#xff08;2.36版本&#xff09;四、如何对信号量资源进行管理 一、信号量概念 信号量本质上就是计数器&#xff0c;用来保护共享资源。多个进程在进行通信时&a…

day4:tomcat—maven-jdk

一&#xff0c;java项目部署过程 编译&#xff1a;使用javac命令将.java源文件编译成.class宇节码文件打包&#xff1a;使用工具如maven或Gradle将项目的依赖、资源和编译后的字节码打包成一个分发格式&#xff0c;如.jar文件&#xff0c;或者.war文件(用于web应用&#xff09…