multiprocessing多进程计算及与rabbitmq消息通讯实践

1. 需求与设计

我所设计的计算服务旨在满足多个客户对复杂计算任务的需求。由于这些计算任务通常耗时较长且资源消耗较大,为了优化客户体验并减少等待时间,我采取了并行计算的策略来显著提升计算效率。

为实现这一目标,我计划利用Python的multiprocessing库来构建一个高效的多进程计算服务。同时,为了与外部的RabbitMQ消息队列系统无缝集成,我选择了pika库作为与RabbitMQ通信的桥梁。

在计算服务的架构中,我设计了生产者和消费者两个核心角色。生产者负责监听RabbitMQ队列中的消息,每当接收到新的消息时,它将从消息中提取出待处理的数据,并将其放入multiprocessing.Queue中等待处理。这个过程充分利用了multiprocessing.Queue提供的进程间通信机制,确保数据的可靠传递和同步。

与此同时,消费者进程则负责从multiprocessing.Queue中取出待处理的数据,进行实际的计算工作。这些计算任务可能需要大约一个小时的时间来完成,取决于数据的复杂性和计算资源的分配情况。一旦计算完成,消费者进程将把结果重新封装成消息,并通过pika库发送回RabbitMQ的指定队列中,以便客户或其他系统能够获取到这些结果。

通过这种设计,我的计算服务能够同时处理多个客户的计算请求,并通过并行计算的方式提高整体的处理效率。同时,RabbitMQ的引入使得服务的可扩展性和可靠性得到了极大的提升,即使在面对高并发或系统故障的情况下,也能保证数据的完整性和服务的稳定性。

在这里插入图片描述

2. multiprocessing多进行模块

multiprocessing模块是Python标准库的一部分,提供了一种基于多进程的并行计算方法。通过使用multiprocessing模块,可以利用多个CPU核心来并行处理任务,从而提高程序的执行效率。与多线程相比,多进程避免了全局解释器锁(GIL)的限制,因此在CPU密集型任务中表现更优。

2.1. multiprocessing模块的核心概念和功能

  • 进程(Process):通过multiprocessing.Process类可以创建和管理独立的进程。每个进程都有自己的内存空间,互不干扰。

  • 进程间通信(IPC)multiprocessing提供了多种进程间通信的机制,如QueuePipeValueArray。在代码中,我们使用multiprocessing.Queue来在主进程和工作进程之间传递数据。

  • 同步(Synchronization)multiprocessing模块还提供了同步原语,如LockSemaphoreEventConditionBarrier,用于协调进程之间的操作。

2.2. 代码中的multiprocessing使用说明

以下是代码中使用multiprocessing的主要部分的解释:

创建和管理进程

# 创建工作进程
processes = []
for _ in range(3):
    p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_queue))
    p.start()
    processes.append(p)
    logger.info(f'Process started with PID: {p.pid}')
  • multiprocessing.Process类用于创建一个新的进程。
  • target参数指定了该进程将运行的目标函数,这里是worker函数。
  • args参数提供了传递给目标函数的参数。
  • p.start()方法启动了进程。

进程间通信

# 创建一个multiprocessing.Queue用于进程间通信
data_queue = multiprocessing.Queue()
  • multiprocessing.Queue用于在主进程和工作进程之间传递数据。它是线程和进程安全的队列,提供了先进先出(FIFO)的数据传递方式。

工作进程函数

def worker(data_queue, rabbitmq_params, target_queue):
    while True:
        try:
            data = data_queue.get()
            if data is None:
                break
            result = compute_result(data)
		    try:
		        logger.info('Worker started')
		        connection = pika.BlockingConnection(rabbitmq_params)
		        channel = connection.channel()
		    except pika.exceptions.AMQPError as e:
		        logger.error(f"Error connecting to RabbitMQ in worker: {e}")
		        return            
            send_result_to_rabbitmq(channel, target_queue, result)
        except Exception as e:
            logger.error(f"An error occurred in worker: {e}")

    try:
        channel.close()
        connection.close()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error closing RabbitMQ connection in worker: {e}")

    logger.info('Worker finished')
  • worker函数从data_queue中获取数据进行处理。如果队列为空,data_queue.get()将阻塞直到有数据可用。
  • 处理完成后,结果通过send_result_to_rabbitmq函数发送到RabbitMQ。

终止工作进程

# 等待所有工作进程结束
for p in processes:
    data_queue.put(None)  # 发送None信号以终止工作进程
for p in processes:
    p.join()
  • data_queue发送None信号,通知工作进程终止。
  • 使用p.join()方法等待每个进程完成。这是一个阻塞调用,直到对应的进程终止。

2.3. 小结

  • multiprocessing模块通过创建独立的进程,利用多核CPU的能力并行处理任务。
  • multiprocessing.Queue用于进程间通信,确保数据安全地在进程间传递。
  • 通过错误处理和日志记录,提高了程序的健壮性和可维护性。

这使得程序能够在多核环境中高效运行,并能够处理各种异常情况,确保程序的稳定性和可靠性。

3. 实践代码

import pika  
import multiprocessing  
import time 
from loguru import logger
  
# 假设这是你的计算函数  
def compute_result(data):  
    # 模拟计算过程  
    time.sleep(3 * 60)  # 假设需要3分钟,换成随机  
    return f"Result for {data}"  
  
# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, queue_name, result):  
    try:
        channel.queue_declare(queue=queue_name, durable=True)
        channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=result,
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
            )
        )
        logger.info(f"发送结果消息:{result} 到RabbitMQ")
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error sending result to RabbitMQ: {e}")
        raise 

# 从RabbitMQ接收数据并放入队列的函数(生产者)  
def consume_from_rabbitmq_and_enqueue(rabbitmq_connection, rabbitmq_queue, data_queue):  
    try:
        channel = rabbitmq_connection.channel()
        channel.queue_declare(queue=rabbitmq_queue, durable=True)

        def callback(ch, method, properties, body):
            try:
                data_queue.put(body.decode('utf-8'))
                logger.info(f"接收到消息:{body.decode('utf-8')}")
            except Exception as e:
                logger.error(f"Error putting message into data_queue: {e}")

        channel.basic_consume(queue=rabbitmq_queue, on_message_callback=callback, auto_ack=True)
        channel.start_consuming()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error consuming from RabbitMQ: {e}")
        raise
  
# 工作进程函数  
def worker(data_queue,rabbitmq_params, target_queue):  
    # 由于计算时间长,连接很容易被断开
    #try:
    #    logger.info('Worker started')
    #    workerconnection = pika.BlockingConnection(rabbitmq_params)

    #    channel = workerconnection.channel()
    #    channel.queue_declare(queue = target_queue, durable=True)
    #except pika.exceptions.AMQPError as e:
    #    logger.error(f"Error connecting to RabbitMQ in worker: {e}")
    #    return
    
    while True:  
        try:  
            # 从Queue中获取数据,如果队列为空,则阻塞等待  
            data = data_queue.get() 
            print(f'data_queue.get() is {data}') 
            if data is None:  
                # 收到None信号,表示应该退出  
                break  
              
            # 计算结果  
            result = compute_result(data)  
            try:
                logger.info('Worker started')
                workerconnection = pika.BlockingConnection(rabbitmq_params)
        
                channel = workerconnection.channel()
                channel.queue_declare(queue = target_queue, durable=True)
            except pika.exceptions.AMQPError as e:
                logger.error(f"Error connecting to RabbitMQ in worker: {e}")
                return                          
            # 发送结果到RabbitMQ  
            send_result_to_rabbitmq(channel, target_queue, result)  
        except Exception as e:  
            print(f"An error occurred: {e}")  

    try:
        channel.close()
        connection.close()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error closing RabbitMQ connection in worker: {e}")

    logger.info('Worker finished')
  
# 主程序  
if __name__ == "__main__":  
    rabbitmq_queue = 'hello_world'
    target_queue = 'target_station_response_queue'
    # 设置RabbitMQ连接和队列  
    credentials = pika.PlainCredentials('rabbit', '****')  # mq用户名和密码
    rabbitmq_params = pika.ConnectionParameters('192.168.*.*',port = 5671,virtual_host = '/dev',credentials = credentials)
    # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
    try:
        connection = pika.BlockingConnection(rabbitmq_params)
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error connecting to RabbitMQ in main process: {e}")
        exit(1)

    print(' [*] Waiting for messages. To exit press CTRL+C')      
    
    # 创建一个multiprocessing.Queue用于进程间通信  
    data_queue = multiprocessing.Queue()  
  
    # 创建工作进程  
    processes = []  
    for _ in range(3):  
        p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_queue))  
        p.start()  
        processes.append(p) 

        print(f'process id = {p.pid}') 
  
    # 假设这里是生产数据到Queue的代码(这里用模拟数据代替) 
    channel = connection.channel()
      
    for i in range(10):  # 发送10个模拟数据  
        send_result_to_rabbitmq(channel, rabbitmq_queue, str(i))
        time.sleep(2)  # 模拟生产数据的间隔  
    '''
    # 当所有数据都生产完毕后,发送None信号给所有工作进程以结束它们  
    for _ in range(3):  
        data_queue.put(None)  
    '''
    try:
        consume_from_rabbitmq_and_enqueue(connection, rabbitmq_queue, data_queue)
    except Exception as e:
        logger.error(f"Error in consuming from RabbitMQ and enqueuing: {e}")      
    # 等待所有工作进程结束
    for p in processes:
        data_queue.put(None)  # 发送None信号以终止工作进程
    for p in processes:
        p.join()

    try:
        connection.close()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error closing RabbitMQ connection in main process: {e}")

    logger.info("All processes have finished.")

4. 编者按

在当今日益复杂的数据处理场景中,高效、可靠的计算服务对于企业和个人用户而言都至关重要。特别是在需要处理大量计算任务,且这些任务耗时较长、资源消耗较大的情况下,如何优化计算流程、减少用户等待时间,成为了我们必须面对的挑战。

本博客详细探讨了如何利用Python的multiprocessing库和pika库构建一个高效、可扩展的多进程计算服务,并通过RabbitMQ实现与外部系统的消息通讯。这一实践不仅解决了计算资源瓶颈问题,还显著提升了服务的整体性能和可靠性。

在整个实践过程中,我们特别感谢“文言一心”提供的指导和帮助。文言一心能够根据我们的需求快速编写出程序框架,大大提高了我们的工作效率。同时,其丰富的实践经验和专业知识也为我们提供了宝贵的参考和借鉴。同时,也感谢ChatGPT验证上述方案和专业指导。

通过本博客的分享,我们希望能够为读者提供一种高效、可靠的多进程计算服务实现方案,并希望能够帮助读者更好地理解和应用Python的multiprocessing库和pika库。在未来的数据处理和计算服务中,我们相信这一实践将发挥越来越重要的作用。

欢迎反馈。

参考:

[1]. ithicker. 多进程(多核运算)Multiprocessing. CSDN博客. 2021.02
[2]. 擒贼先擒王. Python 多进程:multiprocessing、aiomultiprocess(异步多进程). CSDN博客. 2024.05

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/732698.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序毕业设计-“黄师日报”平安系统项目开发实战(附源码+论文)

大家好!我是程序猿老A,感谢您阅读本文,欢迎一键三连哦。 💞当前专栏:微信小程序毕业设计 精彩专栏推荐👇🏻👇🏻👇🏻 🎀 Python毕业设计…

奔驰EQS SUV升级原厂主动式氛围灯效果展示

以下是一篇关于奔驰 EQs 升级原厂主动氛围灯案例的宣传文案: 在汽车科技不断演进的今天,我们自豪地为您呈现奔驰 EQs 升级原厂主动氛围灯的精彩案例。 奔驰 EQs,作为豪华电动汽车的典范,其卓越品质与高端性能有目共睹。而此次升…

定义多个类对象,分别输入和输出各对象中的时间(时:分:秒)

在前面的文章中,类中只有公用数据而无成员函数,而且只有1个对象。可以直接在主函数中进行输入和输出。若有多个对象,需要分别引用多个对象中的数据成员,可以写出如下程序: (1)编写程序&#xff…

流程控制相关

1.break语句 只能用在循环体内,用来结束当前循环 语法: while 循环条件表达式: 语句块 if 条件表达式: break 语句块 for i in 迭代对象: 语句块 if 条件表达式: break 语句块 小练: 求一…

WPS相同字体但是部分文字样式不一样解决办法

如下图,在使用wps编辑文档的时候发现有些电脑的文字字体很奇怪,但是把鼠标移到这个文字的位置,发现它和其他正常文字的字体是一样的,都是仿宋_GB2312 正常电脑的文字如下图所示 打开C:\Windows找到Fonts这个文件夹 把仿宋_GB2312这…

MySQL 死锁查询和解决死锁

来了来了来了!客户现场又要骂街了,你们这是什么破系统怎么这么慢啊?!?! 今天遇到了mysql死锁,直接导致服务器CPU被PUA直接GUA了! 别的先别管,先看哪里死锁,或…

使用Mixamo极简绑骨,导入unity中使用

如果你只想专注于角色建模,对于动画设计没有过多精力;如果你想白嫖别人的角色动画,用到自己的模型上;那么,这个网站很适合你:https://www.mixamo.com/ 操作步骤: 首先将自己的模型上传到这个网…

如何混淆 net core 8 架构 C# 编译程序

如何混淆 net core 8 架构 C# 编译程序 一、使用混淆工具 .NET Reactor V6.9二、net core 8 架构 C# 编译程序(发布的单文件)1、通过发布的单文件程序,可以直接在 .NET Reactor 拖入或打开 ,勾选自己需要的保护功能。2、勾选自己需…

SHA256 安全散列算法加速器实验

1、SHA256 介绍 SHA256 加速器是用来计算 SHA-256 的计算单元, SHA256 是 SHA-2 下细分出的一种算法。 SHA-2 名称来自于安全散列算法 2 (英语: Secure Hash Algorithm 2 )的缩写,一种密码散列函 数算法标准…

群辉NAS中文件下载的三种方案

目录 一、迅雷套件 1、添加套件来源 2、安装套件 3、手机安装迅雷 二、qBittorrent套件 1、添加套件来源 2、改手工安装 3、更新后的问题 4、最后放弃DSM6 (1)上传文件手工安装 (2)添加套件来源 5、解决登陆报错 6、添加tracker 7、修改下载默认位置 8、手机…

经验总结--开关MOS管发热的一般原因/电源开发经验总结

开关MOS管发热的一般原因 做电源设计,或者做驱动方面的电路,难免要用到场效应管,也就是人们常说的MOS管。MOS管有很多种类,也有很多作用。做电源或者驱动的使用,当然就是用它的开关作用。 无论N型或者P型MOS管,其工作原理本质是一样的。MOS管是由加在输入端栅极的电压来控…

C#委托:事件驱动编程的基石

目录 了解委托 委托使用的基本步骤 声明委托(定义一个函数的原型:返回值 参数类型和个数) 根据委托定义的函数原型编写需要的方法 创建委托对象,关联“具体方法” 通过委托调用方法,而不是直接使用方法 委托对象所关联的方…

Electron快速入门(三):在(二)的基础上修改了一个文件夹做了个备忘录

Lingering Memories 诗绪萦怀 修改index.html <!--index.html--> <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><!-- https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP --><meta h…

一种简单的图像分析

简介 一种简单的边界分析&#xff0c;通过相邻的像素的灰度进行判断&#xff0c;计算出边界。 测试1 原图 结果 测试2 原图 结果 代码说明 主要的技术在makeTable过程中&#xff0c;这个过程主要执行了以下几步 计算每个像素的灰度计算相邻多个像素的最大灰度差统计灰度差…

【IC验证】UVM实验lab03

1. TLM端口的创建、例化与使用 创建&#xff1a; uvm_get_blocking_port #(fmt_trans) mon_bp_port; 例化&#xff1a; function new(string name "mcdf_refmod", uvm_component parent);super.new(name, parent);fmt_trans new("fmt_trans", this);…

程序猿大战Python——面向对象——魔法方法

什么是魔法方法&#xff1f; 目标&#xff1a;了解什么是魔法方法&#xff1f; 魔法方法指的是&#xff1a;可以给Python类增加魔力的特殊方法。有两个特点&#xff1a; &#xff08;1&#xff09;总是被双下划线所包围&#xff1b; &#xff08;2&#xff09;在特殊时刻会被…

MySQL进阶——视图

目录 1基本语法 1.1创建 1.2 查询 1.3 修改 1.4 删除 2 检查选项 2.1 级联CASCADED 2.2本地LOCAL 3 更新及作用 3.1视图的更新条件 3.2视图的作用 4视图案例 1基本语法 视图&#xff08;View&#xff09;是一种虚拟存在的表。视图中的数据并不在数据库中实际存在&…

YYU-5/80-260mm型钢筋残余变形测试仪 电子引伸计

YYU-5/80-260mm型钢筋接头专用引伸计&#xff0c;是按照《JGJ 107 2010 钢筋技术连接技术规程》的技术要求设计的&#xff0c;专门用于测试钢筋接头残余变形的双向平均引伸计&#xff0c;其标距可以根据钢筋直径要求进行调整。 完全符合《JGJ 107 2010 钢筋技术连接技术规程》…

Java 超详细实现导入导出 (包含时间转换问题和样式)

序言 工作中遇到了导入导出问题&#xff0c;并且出现了导入或导出Excel时间格式变为数字的问题。通过学习解决实现了这些功能&#xff0c;记录总结分享给大家。本文将详细介绍如何使用 Java 编程语言和 Apache POI 库来实现这些功能。我们将通过一个示例项目演示如何从数据库中…

C语言| 数组倒置II

数组倒置第二种方法&#xff1a;直接在数组内进行倒置 第一个元素和最后一个元素交换&#xff0c; 第二个元素和倒数第二个元素交换 第三个元素和倒数第三个元素交换...... 数组元素个数为偶数&#xff0c;每个元素都能交换一次&#xff1b; 数组元素个数为奇数&#xff0c;最…