Python 简单使用 RabbitMQ

一、安装

pip install pika

二、推送消息到队列中

执行pythone方法

import pika
import time

# 用户名和密码
user_info = pika.PlainCredentials('admin','admin')

# 连接服务器上的rabbitMQ服务
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))


# 创建一个channel
channel = connection.channel()

# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
channel.queue_declare(queue='pythone.test')

# 推送消息到队列  
# exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。   
# routing_key:指定消息要发送到哪个queue。 
# body:指定要发送的消息。
channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))

# 关闭连接
connection.close()

查看rabbitMQ网页后台

执行后我们进入rabbitMQ网页端后台查看pythone.test 队列已经被创建

并且我们执行了三次,此处产生3条数据未被消费,还被压在队列中。

查看队列内消息列表

我们改造一下,将推送消息 放到方法中。

三、封装成生产者、消费者方法

生产者product:

import pika
import time

# 用户名和密码
user_info = pika.PlainCredentials('admin','admin')

# 连接服务器上的rabbitMQ服务
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))


# 创建一个channel
channel = connection.channel()


# 生产者方法
def product():
    print("进入生产者方法!")
    # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
    channel.queue_declare(queue='pythone.test')

    # 推送消息到队列  
    # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。   
    # routing_key:指定消息要发送到哪个queue。 
    # body:指定要发送的消息。
    channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))

    # 关闭连接
    connection.close()

if __name__ == '__main__':
    start_time = time.time()    # 程序开始时间
    print("========start=========|"+str(start_time))

    product()
    
    end_time = time.time()    # 程序结束时间
    print("========end===========|"+str(end_time))

消费者consumer:

# 消费者方法
def consumer():
    print("进入消费者方法!")
    # 消费队列内的消息
    # queue:接收指定queue的消息
    # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
    # on_message_callback:设置收到消息的回调函数
    channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)

    # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
    channel.start_consuming()


# 消费者收到消息调用的回调函数
# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息
def mq_consumer_callback(ch, method, properties, body):
    print('消费者收到:{}'.format(body))

TestRabbitMQ.py

import pika
import time

# 用户名和密码
user_info = pika.PlainCredentials('admin','admin')

# 连接服务器上的rabbitMQ服务
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
# connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))


# 创建一个channel
channel = connection.channel()


# 生产者方法
def product():
    print("进入生产者方法!")
    # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
    channel.queue_declare(queue='pythone.test')

    # 推送消息到队列  
    # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。   
    # routing_key:指定消息要发送到哪个queue。 
    # body:指定要发送的消息。
    channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))

    # 关闭连接
    connection.close()

# 消费者方法
def consumer():
    print("进入消费者方法!")
    # 消费队列内的消息
    # queue:接收指定queue的消息
    # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
    # on_message_callback:设置收到消息的回调函数
    channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)

    # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
    channel.start_consuming()


# 消费者收到消息调用的回调函数
# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息
def mq_consumer_callback(ch, method, properties, body):
    print('消费者收到:{}'.format(body))




if __name__ == '__main__':
    start_time = time.time()    # 程序开始时间
    print("========start=========|"+str(start_time))

    # product()
    consumer()
    
    end_time = time.time()    # 程序结束时间
    print("========end===========|"+str(end_time))

四、测试验证

我们执行3次product方法,生产3条数据到 队列。

再执行consumer方法,对队列内数据进行消费。

可以看见控制台打印如下:

再查看rabbitMQ网页后台,发现消息已经被正常消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/511336.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python核心篇之网络通信

一. 发送请求 1. 发送get请求 2. 发送post请求 3. json数据与python数据的对应关系

隐私计算实训营第七讲-隐语SCQL的架构详细拆解

隐私计算实训营第七讲-隐语SCQL的架构详细拆解 文章目录 隐私计算实训营第七讲-隐语SCQL的架构详细拆解1.SCQL Overview1.1 多方数据分析场景1.2 多方数据分析技术路线1.2.1 TEE SQL方案1.2.2 MPC SQL方案 1.3 Secure Collaborative Query Language(SCQL)1.3.1 SCQL 系统组件1.…

css3之2D转换transform

2D转换transform 一.移动(translate)(中间用,隔开)二.旋转(rotate)(有单位deg)1.概念2.注意点3.转换中心点(transform-origin)(中间用空格)4.一些例子(css三角和旋转) 三…

SVM简介 详细推导 核函数 线性可分 非线性可分

注意:由于该文章由jupyter nbconvert导出,若单独执行代码可能出现变量找不到或者没有导入库的情况,正确的做法是将所有的代码片段按顺序放到一个.py文件里面或者按顺序放入一个.ipynb文件的多个代码块中。 SVM(Support Vector Machine) Vap…

第十五届蓝桥杯模拟考试I_物联网设计

反思: 本次模拟让我惊醒,写这个作品如同搭积木,在拼接的时候都要仔细检查这个积木是否出bug,确保没有问题再将其拼接到之前搭好的大模块之中,因为就是这样的题目我在处理过程中就遇到了BUG,原因竟出在输入模式要上拉&…

鸿蒙OS元服务开发:【(Stage模型)设置应用主窗口】

一、设置应用主窗口说明 在Stage模型下,应用主窗口由UIAbility创建并维护生命周期。在UIAbility的onWindowStageCreate回调中,通过WindowStage获取应用主窗口,即可对其进行属性设置等操作。还可以在应用配置文件中设置应用主窗口的属性&…

MegaSeg Pro for Mac v6.3.1 注册激活版 音视频DJ混音工具

MegaSeg Pro for Mac是一款专业的DJ和广播自动化软件,旨在为音乐专业人士提供强大的音乐播放和演播功能。这款软件具有多种功能,包括强大的音乐库管理,支持导入和组织大量音乐文件,可以轻松管理你的音乐收藏。它支持广泛的音频格式…

idea快速找到maven中冲突的依赖,解决依赖冲突

红色实线:冲突,红色虚线:依赖于同一个包的多版本 选择包,右键Excluede,排除 问题原因: 一个项目中需要jar包A和jar包B,而jar包A和jar包B都需要依赖jar包C,但A需要1.2.16版本的C,B需要1.2.17版本的C,这时候就可能会产…

vs2022断点找bug出错(打上100个断点)

初步分析:故障出自-具体功能模块 进一步分析:故障出自-该功能代码流程 进一步分析:从该功能起点-终点,一路打100个断点

电商技术揭秘五:电商平台的个性化营销与数据分析

文章目录 引言1. 个性化营销的概念与价值1.1 个性化营销的定义1.1.1 个性化营销的基本概念1.1.2 个性化营销在电商领域的重要性 1.2 个性化营销的核心价值1.2.1 提升用户体验1.2.2 增加转化率和客户忠诚度1.2.3 优化营销资源配置 2. 用户画像与行为分析2.1 用户画像的构建2.1.1…

【Linux】在生产环境中,Linux系统排查常用命令

问题排查 文章目录 问题排查top命令CPU:vmstatprocscpu内存:free硬盘:df硬盘IO:iostat网络IO:ifstat 生产环境服务器变慢,诊断思路和性能评估 top命令 查看整机系统新能 使用top命令的话,重点…

如何处理Flutter应用在iOS平台上的兼容性问题

本文探讨了使用Flutter开发的iOS应用能否上架,以及上架的具体流程。苹果提供了App Store作为正式上架渠道,同时也有TestFlight供开发者进行内测。合规并通过审核后,Flutter应用可以顺利上架。但上架过程可能存在一些挑战,因此可能…

使用TCP协议就一定零丢包了吗?

简述数据包发送流程 为了简化模型,我们把中间的服务器给省略掉,假设这是个端到端的通信。且为了保证消息的可靠性,它们之间用的是TCP协议进行通信。 为了发送数据包,两端首先会通过三次握手,建立TCP连接。 一个数据包&…

django-haystack,具有全文搜索功能的 Python 库!

目录 前言 安装与配置 全文搜索基础 搜索引擎配置 索引配置 搜索视图与模板 过滤器与排序 自定义搜索逻辑 应用场景 1. 电子商务网站的商品搜索 2. 新闻网站的文章搜索 3. 社交网站的用户搜索 4.企业内部系统的文档搜索 总结 前言 大家好,今天为大家分享…

零基础入门多媒体音频(7)-AAOS audio

概览 Android Automotive OS (AAOS) 是基于核心的 Android 音频堆栈打造,以支持用作车辆信息娱乐系统。AAOS 负责实现信息娱乐声音(即媒体、导航和通讯),但不直接负责具有严格可用性和时间要求的铃声和警告。 虽然 AAOS 提供了信号…

python爬虫+django新闻推荐系统可视化分析

1. 安装python3.7.0 2. 更新pip 控制台执行 python -m pip install -U pip 3. 安装依赖库 pip install -r requirements.txt 4. 更改mysql数据库配置 修改newsServer/settings.py中的数据库连接配置,比如修改下方PASSWORD密码为本机mysql密码&#xff1…

GPT 模型解析:ChatGPT 如何在语言处理领域引领潮流?

人工智能时代来临 我们正处于AI的iPhone时刻。——黄仁勋(英伟达CEO) ChatGPT 好得有点可怕了,我们距离危险的强人工智能不远了。——马斯克(Tesla/SpaceX/Twitter CEO) 以上的内容说明我们现在正处于一个技术大翻牌的…

Git分支提交时自动大写 fatal: the remote end hung up unexpectedly

先说结论: 进入 .git/refs/heads目录,会看到Feature文件夹,重命名为feature即可。 表现: 通过终端命令创建的分支 git checkout -b feature/name 使用git push后自动变成了Feature/name 并且有时候在本地创建feature/1234567…

视频基础学习四——视频编码基础一(冗余信息)

文章目录 前言一、编码压缩的原理1.空间冗余帧内预测 2.时间冗余帧间预测运动估计运动补偿 3.编码冗余4.视觉冗余 二、压缩编码的流程1.编码器2.编解码流程 总结 前言 上一篇文章介绍了视频帧率、码率、与分辨率。也介绍了为什么需要对视频进行压缩,因为720P、rgb2…

吴恩达深度学习笔记:深层神经网络(Deep Neural Networks)4.1-4.4

目录 第一门课:神经网络和深度学习 (Neural Networks and Deep Learning)第四周:深层神经网络(Deep Neural Networks)4.1 深层神经网络(Deep L-layer neural network) 第一门课:神经网络和深度学习 (Neural Networks a…