RabbitMQ python第三方库pika应用入门实践

1. RabbitMQ简介

RabbitMQ是一个可靠、高效的开源消息代理服务器,基于AMQP协议。它具备以下特点:

  • 可以支持多种消息协议,如AMQP、STOMP和MQTT等。
  • 提供了持久化、可靠性和灵活的路由等功能。
  • 支持消息的发布和订阅模式。
  • 具备高可用性和可扩展性。

RabbiMQ的核心概念包括生产者、消费者、队列、交换机和绑定。生产者将消息发送到交换机,交换机根据其类型和绑定规则将消息路由到队列,然后消费者从队列中获取消息进行处理。

RabbitMQ相关概念

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Virtual host:出于多租户和安全因素的设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念,当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  • Connection:publisher/consumer和broker之间的TCP连接。
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销都将是巨大的,效率也是非常低的。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread会创建单独的Channel进行通信,AMQP的method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection,极大减少了操作系统建立TCP连接的开销。

相关术语

  • producer:生产者,向队列中发送消息的程序。(在图表中通常使用P表示)
  • queue:队列,用于存储消息,定义在RabbitMQ内部,queue本质上是一个消息缓存buffer,生产者可以往里发送消息,消费者也可以从里面获取消息。(在图表中通常使用Q表示)
  • consumer:消费者,等待并从消息队列中获取消息的程序。(在图表中通常使用C表示)
  • exchange:交换机,用于将producer发送来的消息发送到queue,事实上,producer是不能直接将message发送到queue,必须先发送到exchange,再由exchange发送到queue。

注:生产者和消费者可能在不同的程序或主机中,当然也有可能一个程序有可能既是生产者,也是消费者。

2. pika简介

在Python中,pika是一个用于处理RabbitMQ消息队列的第三方库,它允许开发者在Python应用程序中发送和接收消息,实现应用程序之间的异步通信。

主要功能

  • 连接管理:pika提供了与RabbitMQ服务器建立连接的功能。
  • 通道管理:通过连接,可以创建多个通道(channel),每个通道代表一个独立的通信流。
  • 消息发送与接收:开发者可以使用pika发送消息到指定的队列(queue),并从队列中接收消息。
  • 交换机与队列声明:支持声明交换机(exchange)、队列,以及它们之间的绑定(binding)关系。
  • 消息确认:支持消息的自动确认(auto-ack)或手动确认(manual ack),以确保消息的可靠传递。

使用流程

  • 创建连接:使用pika.BlockingConnection或pika.SelectConnection等类创建与RabbitMQ服务器的连接。
  • 创建通道:通过连接对象的channel()方法创建通道。
  • 声明交换机与队列:使用通道对象的exchange_declare()和queue_declare()方法声明交换机和队列。
  • 绑定交换机与队列:使用通道对象的queue_bind()方法将队列绑定到交换机。
  • 发送消息:使用通道对象的basic_publish()方法发送消息到指定的交换机和路由键(routing key)。
  • 接收消息:使用通道对象的basic_consume()方法开始消费队列中的消息,并通过回调函数处理接收到的消息。
  • 关闭连接:在不再需要时,使用连接对象的close()方法关闭连接。

安装pika

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika

3. pika应用入门

3.1. 生产者

import pika
 
# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()

# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
channel.queue_declare(queue='hello_world', durable=True)
 
# 3.向指定队列插入数据
poiid = 'xxxxxx'
channel.basic_publish(exchange='',  # 简单模式
                      routing_key='hello_world',  # 指定队列
                      body=poiid,  # 向队列中添加的数据
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'") 

在这里插入图片描述
查看虚拟主机virtual-host: /typc-fpd-dev下队列hello_world。
在这里插入图片描述

3.2. 侦听消费者

import pika
 
# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
 
# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello_world队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)

# 3.确定回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 手动应答
    poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 
    Core.setPIO(poiid)            # 输入数据
    Core.task_process()           # 处理数据
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 4.确定监听队列参数
channel.basic_consume(queue='hello_world',  # 指定队列
                      auto_ack=False,  # 手动应答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')

    
# 5.正式监听
channel.start_consuming()

3.3. 主动处理消费消息

在Pika中,basic_get方法确实可以用于从队列中直接获取消息,但通常不推荐在生产环境中使用,因为它不是高效的消息处理方式。不过,如果你确实需要这种方法,以下是如何使用basic_get的示例:

# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()

time.sleep(1)
     
# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello2队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)
count = 5
for i in range(count):  
    print('取消息开始时间')  
    method_frame, header_frame, body = channel.basic_get(queue='hello_world', auto_ack=False)  
    if method_frame:  
        # 处理消息体 
        print('body:',body)  
        poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 
        Core.setPIO(poiid)
        Core.task_process()
        time.sleep(2)        
    
        # 如果你设置了auto_ack=False,则需要手动确认消息  
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)  
    else:  
        print("没有消息可以获取,", str(i))
    time.sleep(1)
print('取消息完成时间')
connection.close()#来关闭连接 

参考:

三只松鼠. python 操作RabbitMq详解. 博客园. 2019.03

卫玠_juncheng. Python三方库:Pika(RabbitMQ基础使用). CSDN博客. 2024.03

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/692101.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

天才程序员周弈帆 | Stable Diffusion 解读(一):回顾早期工作

本文来源公众号“天才程序员周弈帆”,仅用于学术分享,侵权删,干货满满。 原文链接:Stable Diffusion 解读(一):回顾早期工作 在2022年的这波AI绘画浪潮中,Stable Diffusion无疑是最…

pdf怎么编辑修改内容?3个实用软件!

在当今数字化时代,PDF文件因其跨平台、格式固定的特性,成为我们日常工作和生活中不可或缺的一部分。然而,PDF文件的修改和编辑往往成为许多人的难题。本文将为您详细介绍如何编辑修改PDF文件的内容,并推荐几款实用的编辑软件&…

Java——数组排序和查找

一、排序介绍 1、排序的概念 排序是将多个数据按照指定的顺序进行排列的过程。 2、排序的种类 排序可以分为两大类:内部排序和外部排序。 3、内部排序和外部排序 1)内部排序 内部排序是指数据在内存中进行排序,适用于数据量较小的情况…

【CS.SE】使用 docker pull confluentinc/cp-kafka 的全面指南

文章目录 1 引言2 准备工作2.1 安装 Docker2.1.1 在 Linux 上安装 Docker2.1.2 在 macOS 上安装 Docker2.1.3 在 Windows 上安装 Docker 2.2 验证 Docker 安装 3 拉取 confluentinc/cp-kafka Docker 镜像3.1 拉取镜像3.2 验证镜像 4 运行 Kafka 容器4.1 启动 ZooKeeper4.2 启动…

【启明智显彩屏应用】Model3A 7寸触摸彩屏AGV小车应用方案

一、AGV小车概述 (一)介绍 自动导向车(Automated Guided Vehicle,简称AGV),也称为自动导向搬运车、自动引导搬运车。AGV广泛应用在自动化的生产当中,大大节约劳动力和提高生产效率。 (二)现状…

调试环境搭建(Redis 6.X 版本)

今儿,我们来搭建一个 Redis 调试环境,目标是: 启动 Redis Server ,成功断点调试 Server 的启动过程。使用 redis-cli 启动一个 Client 连接上 Server,并使用 get key 指令,发起一次 key 的读取。 视频可见…

170.二叉树:平衡二叉树(力扣)

代码解决 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr, right(nullptr) {}* Tree…

pytorch之猫狗识别项目

1. 导入资源包 资源包: import torchvision:PyTorch 提供的视觉库,包含了常用的计算机视觉模型架构、数据集以及图像转换工具。 from torchvision import datasets, models:导入 torchvision 中的 datasets 和 models 模块&#…

【NPS】微软NPS配置802.1x,验证域账号,动态分配VLAN(有线网络续篇)

继上一篇文章中成功实施了有线802.1x验证域账号并动态分配VLAN的策略之后,我们迎来了一个新的目标:在用户验证失败时,自动分配一个Guest VLAN,以确保用户至少能够访问基本的网络服务。这一改进将显著提升网络的灵活性和用户的上网…

东航携手抖音生活服务开启机票首播,推出国内、国际超值机票次卡

在民航暑运旺季到来之际,越来越多的用户选择提前做好旅行规划,囤下高性价比的出游商品。6月6日18点,中国东方航空(以下简称“东航”)将在抖音开启首次机票直播,推荐多款超值机票次卡及空中Wi-Fi等特色产品&…

【Python机器学习】PCA——特征提取(1)

PCA的一个重要应用是特征提取。特征提取背后的思想是,可以找到一种数据表示,比给定的原始表示更适合于分析。特征提取很有用,它的一个很好的应用实例就是图像。图像由像素组成,通常存储于红绿蓝强度。图像中的对象通常由上千个像素…

Postman 连接数据库 利用node+xmysql

1、准备nodejs环境 如果没有安装,在网上找教程,安装好后,在控制台输入命令查看版本,如下就成功了 2、安装xmysql 在控制台输入 npm install -g xmysql 3、连接目标数据库 帮助如下: 示例: 目标数据库…

【稳定检索/投稿优惠】2024年智慧金融与财务管理国际会议(SFFM 2024)

2024 International Conference on Smart Finance and Financial Management 2024年智慧金融与财务管理国际会议 【会议信息】 会议简称:SFFM 2024 截稿时间:以官网为准 大会地点:中国广州 会议官网:www.iacsffm.com 会议邮箱&am…

Linux 35.5 + JetPack v5.1.3@FUEL编译安装

Linux 35.5 JetPack v5.1.3FUEL编译安装 1. 源由2. 编译&安装Step 1:依赖库安装Step 2:建立工程Step 3:编译工程Step 4:安装工程 3. 问题汇总3.1 fuel_planner/exploration_manager - dw3.2 fuel_planner/plan_env - OpenCV库…

Anaconda软件:安装、管理python相关包

Anaconda的作用 一个python环境中需要有一个解释器, 和一个包集合. 解释器: 根据python的版本大概分为2和3. python2和3之间无法互相兼容, 也就是说用python2语法写出来的脚本不一定能在python3的解释器中运行. 包集合:包含了自带的包和第三方包, 第三…

RPA影刀 | 变量的使用

1.什么是变量 2.变量的作用 作用1:方便后续流程调用 这里在后续流程“点击元素”中,就可以选中这个变量 作用2:区分相同属性的变量 如果要打开两个网页,总不能都叫web_page吧。 所以这里一个叫百度web_page,一个叫…

Docker:认识镜像仓库及其命令

文章目录 Docker Registry什么是Docker Registry 镜像仓库工作机制使用流程实际使用方法仓库的拉取机制 常用的镜像仓库---DockerHub什么是DockerHub私有仓库 镜像仓库命令docker logindocker pulldocker pushdocker searchdocker logout Docker Registry 什么是Docker Regist…

Java Web学习笔记25——Vue组件库Element

什么是Element? Element: 是饿了么团队研发的,一套为开发者、设计师和产品经理准备的基于Vue2.0的桌面端组件库。 组件:组成网页的部件,例如:超链接、按钮、图片、表格、表单、分页条等等。 官网:https:…

C++基础编程100题-007 OpenJudge-1.3-05 计算分数的浮点数值

更多资源请关注纽扣编程微信公众号 http://noi.openjudge.cn/ch0103/05/ 描述 两个整数a和b分别作为分子和分母,既分数 a/b ,求它的浮点数值(双精度浮点数,保留小数点后9位) 输入 输入仅一行,包括两个…

PVE安装CENTOS9提示“Fatal glibc error: CPU does not support x86-64-v2”

问题描述:PVE安装CENTOS9提示“Fatal glibc error: CPU does not support x86-64-v2” RHEL 9要求x86_64的CPU支持x86-64-v2,x86-64-v2需要处理器支持 CMPXCHG16B、LAHF-SAHF、POPCNT、SSE3、SSE4.1、SSE4.2、SSSE3 等现代指令集 解决方法:…