python多线程与多进程开发实践及填坑记(1)

1. 需求分析

1.1. 概述

基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:

  1. 并行计算任务:使用multiprocessing模块实现并行计算任务,提高计算效率、计算能力。
  2. 消息侦听任务:使用threading模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue中,以便并行计算任务处理。
  3. Web服务:使用Flask框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。
  4. 任务交互:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  5. 非阻塞运行:使用threading模块非阻塞地运行Flask Web服务。

1.2. 多线程与多进程

在Python环境中,多线程和多进程的区别主要体现在并发模型、资源利用、数据共享、以及适用场景等方面。

1.2.1. 并发模型

多线程 (Threading):

  • 并发性: 多线程是一种并发模型,多个线程共享同一进程的资源和内存空间,能够在单个进程内并发执行。
  • GIL 限制: 由于 Python 的全局解释器锁 (Global Interpreter Lock, GIL),同一时刻只有一个线程在执行 Python 字节码。这限制了多线程在 CPU 密集型任务中的并行性。
  • 适用任务: 适用于 I/O 密集型任务,如网络请求、文件操作等。

多进程 (Multiprocessing):

  • 并行性: 多进程是一种并行模型,每个进程都有独立的内存空间和资源,可以在多个 CPU 核心上并行执行。
  • 无 GIL 限制: 每个进程有自己的 Python 解释器和 GIL,能够真正实现并行计算,充分利用多核 CPU。
  • 适用任务: 适用于 CPU 密集型任务,如计算密集型数据处理。

1.2.2. 资源利用

多线程:

  • 内存利用: 线程共享同一进程的内存空间,内存开销较小。创建和销毁线程的成本较低。
  • CPU 利用: 由于 GIL 限制,多线程在 Python 中无法充分利用多核 CPU,尤其是在 CPU 密集型任务中。

多进程:

  • 内存利用: 每个进程有独立的内存空间,内存开销较大。创建和销毁进程的成本较高。
  • CPU 利用: 无 GIL 限制,可以充分利用多核 CPU,适合并行处理 CPU 密集型任务。

1.2.3. 数据共享和通信

多线程:

  • 数据共享: 线程共享同一进程的全局变量和内存空间,数据共享容易。
  • 同步机制: 由于共享内存,线程之间需要使用锁 (Lock)、条件变量 (Condition) 等同步机制来避免竞争条件和数据不一致。

多进程:

  • 数据共享: 进程间不共享内存,数据共享复杂。需要使用进程间通信 (IPC) 机制,如管道 (Pipe)、队列 (Queue)、共享内存 (Shared Memory) 等。
  • 同步机制: 使用 IPC 机制进行数据传递和同步。

1.2.4. 适用场景

多线程:

  • I/O 密集型任务: 例如网络爬虫、文件读写、数据库操作等。这些任务在等待 I/O 操作完成时可以切换到其他线程继续执行,提高效率。
  • 轻量级任务: 由于线程的创建和销毁成本较低,适合处理大量短时任务。

多进程:

  • CPU 密集型任务: 例如科学计算、图像处理、大数据分析等。这些任务需要大量计算资源,多进程可以充分利用多核 CPU 并行处理。
  • 隔离性要求高的任务: 进程间独立运行,互不影响,适合需要高隔离性的任务。

1.2.5. 常见问题与解决

多线程:

  • 死锁: 当多个线程互相等待对方释放锁时,会出现死锁情况。需要设计合理的锁机制或使用死锁检测工具。
  • GIL 限制: 对于 CPU 密集型任务,GIL 限制了多线程的并行性。可以通过多进程绕过 GIL 实现并行计算。

多进程:

  • 高内存消耗: 每个进程有独立的内存空间,内存使用较高。可以通过共享内存或减少进程数量优化内存使用。
  • 进程间通信复杂: 需要使用 IPC 机制进行进程间数据传递,设计和实现较为复杂。可以使用 Python 的 multiprocessing 模块提供的队列、管道等工具简化实现。

1.2.6. 总结

  • 多线程: 适合 I/O 密集型任务,内存使用高效,但受 GIL 限制。
  • 多进程: 适合 CPU 密集型任务,可以充分利用多核 CPU,但内存消耗较大,进程间通信复杂。

根据任务性质选择合适的并发模型,可以提高程序的效率和性能。

2. 我初步实现多线程多进程核心代码

2.1. 程序结构及代码说明

程序结构原理图:
在这里插入图片描述
函数说明:

序号名称说明备注
1compute_result计算函数,例如你的优化算法需要并行处理
2send_result_to_rabbitmq发送消息到RabbitMQ
3consume_from_rabbitmq_and_enqueue侦听RabbltMQ消息
4worker工作进程函数
5start_listening启动消息侦听函数Web API
6stop_listening停止消息侦听函数Web API
7start_worker启动工作函数Web API
8stop_worker停止工作函数Web API

2.2. 示例代码

from flask import Flask, jsonify
from threading import Thread, Event
import multiprocessing
import pika
import json
from loguru import logger

# 创建一个事件来控制侦听
stop_event = Event()

# 定义web服务
app = Flask(__name__)

# 假设这是你的计算函数  
def compute_result(data, pso_params):  
    # 进行计算逻辑,这里简化为返回数据本身
    return {"result": data}

# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  
    try:
        channel.queue_declare(queue=queue_name, durable=True)
        # 初始化交换机
        channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=True)
        channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=json.dumps(result, ensure_ascii=False),
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
            )
        )
        logger.info(f"发送结果消息:{result} 到RabbitMQ")
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error sending result to RabbitMQ: {e}")
        raise 

# 从RabbitMQ接收数据并放入队列的函数(生产者)  
def consume_from_rabbitmq_and_enqueue(rabbitmq_params, rabbitmq_queue, data_queue):
    try:
        rabbitmq_connection = pika.BlockingConnection(rabbitmq_params)
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error connecting to RabbitMQ in main process: {e}")
        exit(1)
        
    try:
        channel = rabbitmq_connection.channel()
        channel.queue_declare(queue=rabbitmq_queue, durable=True)
        
        def callback(ch, method, properties, body):
            try:
                data_queue.put(body.decode('utf-8'))
                logger.info(f"接收到消息:{body.decode('utf-8')}")
            except Exception as e:
                logger.error(f"Error putting message into data_queue: {e}")

        channel.basic_consume(queue=rabbitmq_queue, on_message_callback=callback, auto_ack=True)
        
        try:
            while not stop_event.is_set():
                channel.connection.process_data_events(time_limit=1)  # 非阻塞地处理事件  
                # 注意:这里使用了process_data_events()而不是start_consuming(),因为我们需要非阻塞地运行  
        except KeyboardInterrupt:
            channel.stop_consuming()
        finally:
            rabbitmq_connection.close()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error consuming from RabbitMQ: {e}")
        raise

# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): 
    logger.info('Worker started') 
    while True:  
        try:  
            data = data_queue.get() 
            print(f'data_queue.get() is {data}') 
            if data is None:  
                break  
              
            result = compute_result(data, pso_params)  
            try:
                workerconnection = pika.BlockingConnection(rabbitmq_params)
                channel = workerconnection.channel()
            except pika.exceptions.AMQPError as e:
                logger.error(f"Error connecting to RabbitMQ in worker: {e}")
                return                          
            send_result_to_rabbitmq(channel, target_exchange, target_queue, routing_key, result)  
        except Exception as e:  
            print(f"An error occurred: {e}")  
    logger.info('Worker finished')

@app.route('/startlistening', methods=['GET'])
def start_listening():
    if stop_event.is_set():
        stop_event.clear()
    if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():
        app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue))
        app.pika_thread.start()
        return jsonify({'status': 'listening'}), 200
    else:
        return jsonify({'status': 'already listening'}), 200

@app.route('/stoplistening', methods=['GET'])
def stop_listening():
    if hasattr(app, 'pika_thread') and app.pika_thread.is_alive():
        stop_event.set()
        app.pika_thread.join()
        del app.pika_thread
        return jsonify({'status': 'stopped'}), 200  
    else:
        return jsonify({'status': 'not running'}), 400

@app.route('/startworking', methods=['GET'])
def start_worker():
    if len(processes) == 0: 
        for _ in range(3):  
            p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  
            p.start()  
            processes.append(p)   
            print(f'process id = {p.pid}')
        return jsonify({'status': 'started working'}), 200
    else:
        return jsonify({'status': 'already working'}), 202

@app.route('/stopworking', methods=['GET'])
def stop_worker():
    for p in processes:
        data_queue.put(None)
    for p in processes:
        p.join()
    processes.clear() 
    return jsonify({'status': 'stopped working'}), 200

if __name__ == "__main__":
    rabbitmq_queue = 'energyStorageStrategy.queue'
    target_queue = 'energyStorageStrategy.queue.typc-fpd-tysh'
    target_exchange = 'energyStorageStrategy.direct'
    routing_key = 'typc-fpd-tysh'
    pso_params = {}  # 假设你的PSO参数

    credentials = pika.PlainCredentials('rabbit', '****')  # mq用户名和密码
    rabbitmq_params = pika.ConnectionParameters('192.168.*.*', port=5671, virtual_host='/typc-fpd-dev', credentials=credentials)

    # 创建一个multiprocessing.Queue用于进程间通信  
    data_queue = multiprocessing.Queue()  

    # 创建工作进程列表
    processes = [] 

    print(' [*] Waiting for messages. To exit press CTRL+C')

    flask_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=5002, debug=True))
    flask_thread.start()
    
    start_worker()
    start_listening()

3. 代码中问题及其他

3.1. 重新启动侦听线程失败

程序没有报错,但是,没有启动侦听服务线程。
详见后续文章,主题是消息侦听与线程。

3.2. 重新启动工作进行报错

AttributeError: Can’t get attribute ‘worker’ on <module ‘main’ (built-in)>
详见后续文章。

3.3. Flask 应用上下文之外操作问题

问题描述:

site-packages\werkzeug\local.py", line 508, in _get_current_object
    raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of application context.

问题解析:
这个错误 RuntimeError: Working outside of application context. 通常发生在 Flask 应用中,当你尝试在 Flask 应用上下文之外执行某些操作时。在 Flask 中,应用上下文是一个用于存储当前应用相关信息的对象,比如配置和 URL 映射。当你调用 jsonify 或其他依赖于应用上下文的函数时,必须确保你处于应用上下文中。

在你的代码中,这个错误很可能是因为 start_listening 函数被设计为在 Flask 路由之外的地方被调用,或者是在 Flask 请求处理流程之外被调用。

解决办法:

    with app.app_context():
        start_worker()
        start_listening()

使用 app.app_context() 来手动创建一个应用上下文。

3.4. Linux环境防火墙

在Linux环境中,别忘了打开防火墙的端口:

root@DeepLearning pvet]# sudo iptables -I INPUT -p tcp --dport 5003 -j ACCEPT

4. 总结

  1. 模块化:将不同的功能模块化,便于维护和扩展。
  2. 多进程与多线程结合:使用multiprocessing实现并行计算任务,使用threading实现RabbitMQ消息侦听和Flask Web服务的非阻塞运行。
  3. 进程间通信:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  4. 事件控制:通过threading.Event控制消息侦听任务的启停。

这种架构设计能够满足需求,并且具有较好的扩展性和可维护性。如果有更多具体的需求或优化,可以在此基础上进一步完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/774833.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大厂都在加急招人的大模型LLM,到底怎么学?

大模型如何入坑&#xff1f; 想要完全了解大模型&#xff0c;你首先要了解市面上的LLM大模型现状&#xff0c;学习Python语言、Prompt提示工程&#xff0c;然后深入理解Function Calling、RAG、LangChain 、Agents等 很多人不知道想要自学大模型&#xff0c;要按什么路线学&a…

电脑刚删除的文件怎么恢复?可使用这几种恢复方法!

在日常生活和工作中&#xff0c;我们时常会在电脑上进行各种文件操作&#xff0c;包括删除不需要的文件。然而&#xff0c;有时候我们可能会误删一些重要的文件&#xff0c;或者在删除后立刻意识到这些文件的重要性。 那么&#xff0c;电脑刚删除的东西怎么恢复呢&#xff1f;本…

神领物流项目第一天

文章目录 聚焦快递领域首先第一个是验证码模块流程登录接口权限管家 聚焦快递领域 首先第一个是验证码模块流程 首先生成验证码的流程 可以使用工具类去生成验证码 LineCaptcha lineCaptcha CaptchaUtil.createLineCaptcha(160, 60, 4, 26);// 获取值然后存入redis中 strin…

WEB04MyBatis

Mybatis mybatis查询 准备 准备工作 在目前的数据库中添加一张数据表emp 将资料中提供的day04-01-mybatis导入的目前的工程中 修改配置文件中的数据库的账户和密码 观察实体类中的属性和数据表中的字段的对应关系 查询结果封装 查询所有 SQL语句 select * from emp; …

基于Oauth2.0的OpenFeign远程调用

目录 前言 1.引入openfeign相关依赖 2.开启openFeign远程调用&#xff0c;在启动类头加上注解即可 3. 提供远程调用接口&#xff0c;接口名称必须与controler名称保持一致 4.远程调用关键代码 4.1 注入restTemplate 4.2 配置拦截器 4.3 设置请求头 4.4 获取请求结果 4.5 远…

两次叛国投敌,没有祸及子孙反而家族长盛不衰的传奇

这个人就是韩国国王韩王信&#xff0c;汉朝八大异姓王之一。 第一次叛国投敌&#xff0c;发生在楚汉争霸时期。有一次他的军队被项羽包围&#xff0c;于是选择了投降。不过&#xff0c;这是权宜之计&#xff0c;不久就借机回到刘邦阵营。 第二次叛国投敌&#xff0c;发生在西…

openssl交叉编译-移植ARM

OpenSSL是一个开源的密码学工具包&#xff0c;提供了一组用于网络安全的加密和解密算法、协议以及相关工具的库&#xff0c;它通过提供多种加密算法、协议和工具&#xff0c;为网络通信和数据存储提供了强大的安全保障。 主要功能 加密和解密&#xff1a; OpenSSL提供了多种对…

字节码编程javassist之结合hotswap在运行期动态修改方法返回值

写在前面 本文看下如何通过javassist结合hotswap在运行期动态修改方法的返回值。 1&#xff1a;代码 要修改的代码 public class ApiTest {public String m1(String info) {return "info is: " info;} }javasssit代码 package com.dahuyou.javassist.huohuo;im…

娱乐圈新瓜《庆余年》二皇子刘端端被曝“深夜秘事”

《庆余年》外&#xff0c;二皇子刘端端的“深夜冒险”网友笑称&#xff1a;权谋剧外更有戏话说《庆余年》里的二皇子李承泽&#xff0c; 那可是权谋与颜值并存的典范&#xff0c;但戏外的刘端端&#xff0c; 最近却成了“深夜冒险家”&#xff0c;让一众吃瓜群众笑中带惊&…

CNN文献综述

卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;简称CNN&#xff09;是深度学习领域中的一种重要模型&#xff0c;主要用于图像识别和计算机视觉任务。其设计灵感来自于生物学中视觉皮层的工作原理&#xff0c;能够高效地处理图像和语音等数据。 基本原理…

JVM专题之性能优化

运行时优化 方法内联 > 方法内联,是指 **JVM在运行时将调用次数达到一定阈值的方法调用替换为方法体本身** ,从而消除调用成本,并为接下来进一步的代码性能优化提供基础,是JVM的一个重要优化手段之一。 > > **注:** > > * **C++的inline属于编译后内联,…

特殊用途二极管+二极管故障检测+三极管(BJT)的工作原理+定时器的使用(小灯定时闪烁实现)

2024-7-5&#xff0c;星期五&#xff0c;17:27&#xff0c;天气&#xff1a;晴&#xff0c;心情&#xff1a;晴。今天没有什么特殊的事情发生&#xff0c;继续学习啦&#xff0c;加油加油&#xff01;&#xff01;&#xff01; 今日完成模电自选教材第二章内容的学习&#xff…

Linux-C语言实现一个进度条小项目

如何在linux中用C语言写一个项目来实现进度条&#xff1f;&#xff08;如下图所示&#xff09; 我们知道\r是回车&#xff0c;\n是换行&#xff08;且会刷新&#xff09; 我们可以用 \r 将光标移回行首&#xff0c;重新打印一样格式的内容&#xff0c;覆盖旧的内容&#xff0c;…

二重积分 - 包括计算方法和可视化

二重积分 - 包括计算方法和可视化 flyfish 计算在矩形区域 R [ 0 , 1 ] [ 0 , 2 ] R [0, 1] \times [0, 2] R[0,1][0,2] 下&#xff0c;函数 z 8 x 6 y z 8x 6y z8x6y 的二重积分。这相当于计算曲面 z 8 x 6 y z 8x 6y z8x6y 与 xy 平面之间的体积。 二重积分…

网页计算器的实现

简介 该项目实现了一个功能完备、交互友好的网页计算器应用。只使用了 HTML、CSS 和 JavaScript &#xff0c;用于检验web前端基础水平。 开发环境&#xff1a;Visual Studio Code开发工具&#xff1a;HTML5、CSS3、JavaScript实现效果 功能设计和模块划分 显示模块&#…

Chapter11让画面动起来——Shader入门精要学习笔记

Chapter11让画面动起来 一、Unity Shader中的内置变量&#xff08;时间篇&#xff09;二、纹理动画1.序列帧动画2.滚动背景 三、顶点动画1.流动的河流2.广告牌3.注意事项①批处理问题②阴影投射问题 一、Unity Shader中的内置变量&#xff08;时间篇&#xff09; Unity Shader…

Chiasmodon:一款针对域名安全的公开资源情报OSINT工具

关于Chiasmodon Chiasmodon是一款针对域名安全的公开资源情报OSINT工具&#xff0c;该工具可以帮助广大研究人员从各种来源收集目标域名的相关信息&#xff0c;并根据域名、Google Play应用程序、电子邮件地址、IP地址、组织和URL等信息进行有针对性的数据收集。 该工具可以提…

window系统openssl开发环境搭建(VS2017)

window系统openssl开发环境搭建 VS2017 一、下载openssl二、安装openssl三、openssl项目配置3.1 配置include文件3.2 配置openssl动态库四、编写openssl测试代码五、问题总结5.1 问题 一5.2 问题二一、下载openssl https://slproweb.com/products/Win32OpenSSL.html 根据自己…

如何查看MCU编译生成的elf(out)文件内容

一般地&#xff0c;我们想要知道单片机程序编译完后的结构我们可以查看map文件或者是elf/out文件&#xff0c;map文件不能看函数的汇编格式&#xff0c;只能查看编译完成后变量、代码的地址和占用空间大小&#xff0c;而elf文件里面更加详细&#xff0c;还包含了函数的汇编&…

CobaltStrike的内网安全

1.上线机器的Beacon的常用命令 2.信息收集和网站克隆 3.钓鱼邮件 4.CS传递会话到MSF 5.MSF会话传递到CS 1上线机器的Beacon的常用命令 介绍&#xff1a;CobaltStrike分为服务端和客户端&#xff0c;一般我们将服务端放在kali&#xff0c;客户端可以在物理机上面&#xff0…