Python多进程/多线程通信实例

在这里插入图片描述

Python多进程/多线程通信实例

1. 多进程/多线程

多线程的定义
多线程(Multithreading) 是一种并发执行的编程技术,在一个进程内创建和管理多个线程,每个线程可以独立执行任务。线程是进程中的一个执行单元,多个线程共享进程的资源(如内存、文件句柄等),但可以独立调度和执行。

多线程的优点包括:

响应更快:在 GUI 应用程序中,可以使用一个线程处理用户输入,另一个线程执行后台任务,这样可以提高应用程序的响应速度。
资源共享:线程共享进程的资源,可以更高效地利用系统资源。
更低的开销:相比于多进程,多线程创建和上下文切换的开销较低。
多进程的定义
多进程(Multiprocessing) 是指在操作系统中同时运行多个进程,每个进程都有自己独立的内存空间和资源。进程之间通过进程间通信(IPC)进行数据交换,如管道(Pipe)、消息队列、共享内存等。

多进程的优点包括:

独立性强:每个进程都有自己独立的内存空间,不会因为一个进程的崩溃影响到其他进程的执行。
充分利用多核 CPU:多个进程可以运行在不同的 CPU 核心上,真正实现并行计算,提高计算效率。
安全性高:进程间的资源独立性提高了程序的安全性,避免了资源争用和数据竞争问题。
多线程与多进程的关系
多线程和多进程都是实现并发编程的技术手段,但它们在实现方式、适用场景和性能特性上有所不同。
在这里插入图片描述

实现方式:

多线程在同一个进程内创建多个线程,这些线程共享进程的内存和资源。
多进程在操作系统中创建多个进程,每个进程有自己独立的内存空间和资源。
适用场景:

多线程适用于需要共享大量数据和资源、需要快速上下文切换的场景,如 GUI 应用程序、实时系统等。
多进程适用于需要充分利用多核 CPU 进行并行计算、需要高独立性和安全性的场景,如高性能计算、分布式系统等。
性能特性:

多线程的创建和上下文切换开销较小,但需要注意线程同步和数据竞争问题。
多进程的创建和上下文切换开销较大,但进程间隔离性强,不容易出现数据竞争问题。

2. 多线程通信方式

共享变量:通过共享变量进行线程间通信,但需要使用线程同步机制(如锁)来防止数据竞争。

import threading

class SharedCounter:
    def __init__(self):
        self.counter = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.counter += 1
            print(f"Counter: {self.counter}")

def worker(counter: SharedCounter):
    for _ in range(100):
        counter.increment()

if __name__ == "__main__":
    counter = SharedCounter()
    threads = [threading.Thread(target=worker, args=(counter,)) for _ in range(5)]
    
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print(f"Final counter value: {counter.counter}")

事件(Event):通过设置和等待事件来进行线程间的同步和通信。

import threading

def worker(event: threading.Event):
    print("Waiting for event to be set...")
    event.wait()
    print("Event received! Continuing work...")

if __name__ == "__main__":
    event = threading.Event()
    thread = threading.Thread(target=worker, args=(event,))
    
    thread.start()

    print("Main thread doing some work...")
    threading.Event().wait(2)  # 模拟主线程工作
    print("Setting event...")
    event.set()

    thread.join()

队列(Queue):使用线程安全的队列(如queue.Queue)进行通信,这是最常见的方式。

import threading
import queue

def producer(queue: queue.Queue):
    for i in range(5):
        print(f"Producing {i}")
        queue.put(i)
        threading.Event().wait(1)  # 模拟生产者工作

def consumer(queue: queue.Queue):
    while True:
        item = queue.get()
        if item is None:
            break  # 结束信号
        print(f"Consuming {item}")
        queue.task_done()

if __name__ == "__main__":
    q = queue.Queue()
    producer_thread = threading.Thread(target=producer, args=(q,))
    consumer_thread = threading.Thread(target=consumer, args=(q,))
    
    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    q.put(None)  # 发送结束信号
    consumer_thread.join()

4. 多进程通信方式

常见的多进程通信方式包括管道(Pipe)、消息队列(Message Queue)、共享内存(Shared Memory)、信号(Signal)、套接字(Socket)等。

  • 管道(Pipe)
    管道是一种半双工的通信方式,数据只能单向流动。通常用于父子进程之间的通信。
  • 消息队列(Message Queue)
    消息队列是一种先进先出(FIFO)的数据结构,允许多个进程将消息插入队列中并从队列中读取消息。
  • 共享内存(Shared Memory)
    共享内存允许多个进程访问同一块内存区域,需要借助同步机制(如锁)来避免数据竞争问题。
  • 信号(Signal)
    信号是一种进程间异步通信机制,通常用于通知进程某些事件的发生。信号主要用于进程间的简单通知,而不传递数据。
import os
import signal
import time

def signal_handler(signum, frame):
    print(f"Received signal: {signum}")

if __name__ == '__main__':
    signal.signal(signal.SIGUSR1, signal_handler)
    
    pid = os.fork()
    if pid == 0:  # 子进程
        os.kill(os.getppid(), signal.SIGUSR1)
    else:  # 父进程
        time.sleep(1)  # 等待子进程发送信号

  • 套接字(Socket)
    套接字用于在进程间进行网络通信,可以实现跨机器的进程通信。
import multiprocessing
import socket

def worker(host, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        s.sendall(b'Hello from child process!')
        data = s.recv(1024)
        print(f"Child process received: {data.decode()}")

def server(host, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, port))
        s.listen()
        conn, addr = s.accept()
        with conn:
            print('Connected by', addr)
            data = conn.recv(1024)
            print(f"Parent process received: {data.decode()}")
            conn.sendall(b'Hello from parent process!')

if __name__ == '__main__':
    host, port = '127.0.0.1', 65432
    p = multiprocessing.Process(target=worker, args=(host, port))
    p.start()
    
    server(host, port)
    
    p.join()

5. 源码实现

多线程源码说明
SimpleThreadedEnv 类:

__init__ 方法:初始化环境数量并调用 _spawn_workers 方法。
_spawn_workers 方法:创建读写队列并启动工作线程。返回从队列读取和写入数据的函数列表。
_worker 方法:工作线程执行的任务。读取任务,进行简单计算(这里是乘以2),然后将结果放入队列。
send_tasks 方法:发送任务给工作线程。
get_results 方法:从工作线程获取结果。
close 方法:发送关闭信号并等待线程结束。
使用示例:

创建 SimpleThreadedEnv 实例。
发送一组任务给线程处理。
获取并打印结果。
关闭环境。
Callable是一个类型提示,用于表示一个可调用的对象。这些对象可以是函数、方法,甚至是实现了__call__方法的类实例。在类型提示中使用Callable可以明确表明某个参数或变量应该是一个可以被调用的对象。

作用
Callable的主要作用是使代码更具可读性和可维护性,特别是在使用静态类型检查工具(如mypy)时,可以更早地发现潜在的错误。

用法
Callable是从typing模块引入的,可以用来注释函数参数和返回值。例如,Callable[[Arg1Type, Arg2Type], ReturnType]表示一个接受两个参数Arg1TypeArg2Type并返回ReturnType的可调用对象。

from queue import Queue
from threading import Thread
from typing import Callable, List, Tuple, Any

class SimpleThreadedEnv:
    def __init__(self, num_envs: int):
        self._num_envs = num_envs
        (
            self._connection_read_fns,
            self._connection_write_fns,
        ) = self._spawn_workers()

    def _spawn_workers(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
        parent_read_queues, parent_write_queues = zip(
            *[(Queue(), Queue()) for _ in range(self._num_envs)]
        )
        self._workers = []
        for parent_read_queue, parent_write_queue in zip(
            parent_read_queues, parent_write_queues
        ):
            thread = Thread(
                target=self._worker,
                args=(
                    parent_write_queue.get,
                    parent_read_queue.put,
                ),
            )
            self._workers.append(thread)
            thread.daemon = True
            thread.start()
        return (
            [q.get for q in parent_read_queues],
            [q.put for q in parent_write_queues],
        )

    def _worker(self, get_fn: Callable[[], Any], put_fn: Callable[[Any], None]):
        while True:
            task = get_fn()
            if task is None:
                break  # Exit signal received
            result = task * 2  # Simple computation: multiply by 2
            put_fn(result)

    def send_tasks(self, tasks: List[Any]):
        for task, write_fn in zip(tasks, self._connection_write_fns):
            write_fn(task)

    def get_results(self) -> List[Any]:
        return [read_fn() for read_fn in self._connection_read_fns]

    def close(self):
        for write_fn in self._connection_write_fns:
            write_fn(None)
        for worker in self._workers:
            worker.join()

if __name__ == "__main__":
    num_envs = 4
    env = SimpleThreadedEnv(num_envs)

    tasks = [i for i in range(num_envs)]
    print(f"Sending tasks: {tasks}")
    env.send_tasks(tasks)

    results = env.get_results()
    print(f"Received results: {results}")

    env.close()

多进程通信源码实现说明
MultiprocessingExample 类: 管理多进程任务。
进程工作函数: 使用静态方法定义了 _worker_pipe_worker_queue_worker_shared_memory,分别处理管道、队列和共享内存的通信。
进程生成函数: 使用 _spawn_worker_pipe_spawn_worker_queue_spawn_worker_shared_memory 函数来生成并启动进程。
run 方法: 管理多进程任务的执行,展示如何使用不同的通信方式进行进程间通信。
资源管理: 使用上下文管理器模式(__enter____exit__ 方法)和析构函数(__del__ 方法)确保进程在使用完毕后正确关闭。

import multiprocessing as mp
from multiprocessing import Pipe, Queue, Value, Array
from threading import Thread
from typing import Any, Callable, List, Tuple, Union
import time
import os


class MultiprocessingExample:
    _workers: List[Union[mp.Process, Thread]]
    _is_waiting: bool
    _is_closed: bool

    def __init__(self, num_workers: int) -> None:
        self._num_workers = num_workers
        self._workers = []
        self._is_waiting = False
        self._is_closed = True

    @staticmethod
    def _worker_pipe(pipe_conn: Any) -> None:
        for i in range(5):
            msg = f"Message {i} from process {os.getpid()}"
            pipe_conn.send(msg)
            print(pipe_conn.recv())
            time.sleep(1)
        pipe_conn.close()

    @staticmethod
    def _worker_queue(queue: Any) -> None:
        for i in range(5):
            msg = f"Message {i} from process {os.getpid()}"
            queue.put(msg)
            print(queue.get())
            time.sleep(1)

    @staticmethod
    def _worker_shared_memory(val: Value, arr: Array) -> None:
        for i in range(5):
            with val.get_lock():
                val.value += 1
            with arr.get_lock():
                for j in range(len(arr)):
                    arr[j] += 1
            print(f"Process {os.getpid()} updated shared memory")
            time.sleep(1)

    def _spawn_worker_pipe(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
        parent_conn, child_conn = Pipe()
        process = mp.Process(target=self._worker_pipe, args=(child_conn,))
        self._workers.append(process)
        process.start()
        return [parent_conn.recv], [parent_conn.send]

    def _spawn_worker_queue(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
        queue = Queue()
        process = mp.Process(target=self._worker_queue, args=(queue,))
        self._workers.append(process)
        process.start()
        return [queue.get], [queue.put]

    def _spawn_worker_shared_memory(self) -> Tuple[Value, Array, mp.Process]:
        shared_val = Value('i', 0)
        shared_arr = Array('i', range(5))
        process = mp.Process(target=self._worker_shared_memory, args=(shared_val, shared_arr))
        self._workers.append(process)
        process.start()
        return shared_val, shared_arr, process

    def run(self) -> None:
        self._is_closed = False

        # Pipe example
        pipe_read_fns, pipe_write_fns = self._spawn_worker_pipe()
        for i in range(5):
            print(pipe_read_fns[0]())
            pipe_write_fns[0](f"Reply {i} from main process")

        # Queue example
        queue_read_fns, queue_write_fns = self._spawn_worker_queue()
        for i in range(5):
            print(queue_read_fns[0]())
            queue_write_fns[0](f"Reply {i} from main process")

        # Shared memory example
        shared_val, shared_arr, shared_mem_process = self._spawn_worker_shared_memory()
        shared_mem_process.join()

        print(f"Final shared value: {shared_val.value}")
        print(f"Final shared array: {list(shared_arr)}")

        self._close()

    def _close(self) -> None:
        if self._is_closed:
            return
        for process in self._workers:
            process.join()
        self._is_closed = True

    def __del__(self):
        self._close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._close()


if __name__ == "__main__":
    with MultiprocessingExample(num_workers=3) as example:
        example.run()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924161.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(Java)—— ArrayList

1.线性表 线性表( linear list)是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构,常见的线性表:顺序表、链表、栈、队列... 线性表在逻辑上是线性结构,也就说是连续的一条直线。但是在…

【Qt】重写QComboBox下拉展示多列数据

需求 点击QComboBox时&#xff0c;下拉列表以多行多列的表格展示出来。 实现 直接上代码&#xff1a; #include <QComboBox> #include <QTableWidget> #include <QVBoxLayout> #include <QWidget> #include <QEvent> #include <QMouseEve…

NVR录像机汇聚管理EasyNVR多个NVR同时管理基于B/S架构的技术特点与能力应用

EasyNVR视频融合平台基于云边端协同设计&#xff0c;能够轻松接入并管理海量的视频数据。该平台兼容性强、拓展灵活&#xff0c;提供了视频监控直播、录像存储、云存储服务、回放检索以及平台级联等一系列功能。B/S架构使得EasyNVR实现了视频监控的多元化兼容与高效管理。 其采…

了解网络威胁情报:全面概述

网络威胁情报 CTI 是指系统地收集和分析与威胁相关的数据&#xff0c;以提供可操作的见解&#xff0c;从而增强组织的网络安全防御和决策过程。 在数字威胁不断演变的时代&#xff0c;了解网络威胁情报对于组织来说至关重要。复杂网络攻击的兴起凸显了制定强有力的策略以保护敏…

linux运行vue编译后的项目

如果你的 Vue 项目使用了 history 模式&#xff08;而非默认的 hash 模式&#xff09;&#xff0c;在纯静态服务器中会出现类似的问题。因为 Vue Router 的 history 模式要求所有未匹配的路径都重定向到 index.html&#xff0c;以便 Vue 前端处理路径。 首先在本地执行npm run…

基础入门-Web应用架构类别源码类别镜像容器建站模版编译封装前后端分离

知识点&#xff1a; 1、基础入门-Web应用-搭建架构上的技术要点 2、基础入门-Web应用-源码类别上的技术要点 一、演示案例-架构类别-模版&分离&集成&容器&镜像 1、套用模版型 csdn / cnblog / github / 建站系统等 安全测试思路上的不同&#xff1a; 一般…

使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用

文章目录 一、前言二、 工具准备&#xff1a;三、最终效果示例四、具体步骤第一大部分是配置阿里云1. 首先登录阿里云容器镜像服务 [服务地址](https://cr.console.aliyun.com/cn-hangzhou/instances)2. 选择个人版本3. 创建 命名空间4. 进入访问凭证来查看&#xff0c;用户名字…

Goland或Idea启动报错

Goland或Idea启动不了 报错如图&#xff1a; 原因&#xff1a;破解导致 解决方案 环境变量中有关Goland的全部删除

keepalived+lVS(dr)高可用集群

keepalivedlVS(dr)高可用集群 规划 服务器名称IP描述masterkeepalivedlvsVIP:192.168.238.100DIP:192.168.238.151keepalived的master节点和lvs负载均衡backupkeepalivedlvsVIP:192.168.238.100DIP:192.168.238.152keepalived的备份节点和lvs负载均衡server1VIP:192.168.238.…

探索.NET世界的无限可能——带你轻松了解.NET

前言 由于目前用到的技术栈有C#&#xff0c;而学习C#离不开.NET框架&#xff0c;正如学习Java离不开学习Spring框架一样。 .NET是微软开发的一个非常强大的框架&#xff0c;它不仅擅长桌面和移动开发&#xff0c;而且还能够支持Web开发和游戏引擎开发&#xff0c;在现在热门的…

web3.js + Ganache 模拟以太坊账户间转账

转账前&#xff1a; 转账后&#xff1a; async function interact() {const web3 new Web3(new Web3.providers.HttpProvider(http://127.0.0.1:7545))web3.eth.Contract.handleRevert trueconst accounts await web3.eth.getAccounts()console.log(accounts)let balance1, …

题解 洛谷 Luogu P1182 数列分段 Section II 二分答案 C/C++

题目传送门&#xff1a; P1182 数列分段 Section II - 洛谷 | 计算机科学教育新生态https://www.luogu.com.cn/problem/P1182思路&#xff1a; 二分答案&#xff0c;每次以区间 [l, r] 中点 m 为每段和的阈值 判断在此前提下&#xff0c;划分段数是否不大于 M 是就记录答案…

26.100ASK_T113-PRO 测试摄像头 输出信息

1.测试代码 读到摄象头参数 输出 video_test.c #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/ioctl.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <linux/type…

git使用文档手册

创建一个本地代码工作空间&#xff0c;比如这里使用test目录作为工作目录 针对仓库地址 http://192.168.31.125:9557/poxiaoai-crm/project-crm.git。 1. 安装 Git 确保您的系统已经安装了 Git。如果未安装&#xff0c;请根据操作系统访问 Git 官网 下载并安装。 验证安装 …

HTML5和CSS3新增特性

HTML5的新特性 HTML5新增的语义化标签 HTML5 的新增特性主要是针对于以前的不足&#xff0c;增加了一些新的标签、新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是 IE9 以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量…

BUUCTF—Reverse—不一样的flag(7)

是不是做习惯了常规的逆向题目&#xff1f;试试这道题&#xff0c;看你在能不能在程序中找到真正的flag&#xff01;注意&#xff1a;flag并非是flag{XXX}形式&#xff0c;就是一个’字符串‘&#xff0c;考验眼力的时候到了&#xff01; 注意&#xff1a;得到的 flag 请包上 f…

通信与网络安全之IPSEC

IPSec&#xff08;IP Security&#xff09;是IETF制定的为保证在Internet上传送数据的安全保密性能的三层隧道加密协议。IPSec在网络层对IP报文提供安全服务。IPSec协议本身定义了如何在IP数据包中增加字段来保证IP包的完整性、 私有性和真实性&#xff0c;以及如何加密数据包。…

树莓派搭建NextCloud:给数据一个安全的家

前言 NAS有很多方案&#xff0c;常见的有 Nextcloud、Seafile、iStoreOS、Synology、ownCloud 和 OpenMediaVault &#xff0c;以下是他们的特点&#xff1a; 1. Nextcloud 优势&#xff1a; 功能全面&#xff1a;支持文件同步、共享、在线文档编辑、视频会议、日历、联系人…

AWS账户注册未完成会收费吗?

在当今云计算的时代&#xff0c;亚马逊网络服务&#xff08;AWS&#xff09;已经成为众多企业和开发者的首选平台。然而&#xff0c;对于许多刚接触云服务的人来说&#xff0c;关于AWS账户注册的费用问题常常引发疑虑&#xff1a;如果我在注册过程中未能完成操作&#xff0c;是…

在线音乐播放器 —— 测试报告

自动化脚本源代码&#xff1a;Java: 利用Java解题与实现部分功能及小项目的代码集合 - Gitee.com 目录 前言 一、项目简介 1.项目背景 2.应用技术 &#xff08;1&#xff09;后端开发 &#xff08;2&#xff09;前端开发 &#xff08;3&#xff09;数据库 二、项目功能…