Python多进程/多线程通信实例
1. 多进程/多线程
多线程的定义
多线程(Multithreading) 是一种并发执行的编程技术,在一个进程内创建和管理多个线程,每个线程可以独立执行任务。线程是进程中的一个执行单元,多个线程共享进程的资源(如内存、文件句柄等),但可以独立调度和执行。
多线程的优点包括:
响应更快:在 GUI 应用程序中,可以使用一个线程处理用户输入,另一个线程执行后台任务,这样可以提高应用程序的响应速度。
资源共享:线程共享进程的资源,可以更高效地利用系统资源。
更低的开销:相比于多进程,多线程创建和上下文切换的开销较低。
多进程的定义
多进程(Multiprocessing) 是指在操作系统中同时运行多个进程,每个进程都有自己独立的内存空间和资源。进程之间通过进程间通信(IPC)进行数据交换,如管道(Pipe)、消息队列、共享内存等。
多进程的优点包括:
独立性强:每个进程都有自己独立的内存空间,不会因为一个进程的崩溃影响到其他进程的执行。
充分利用多核 CPU:多个进程可以运行在不同的 CPU 核心上,真正实现并行计算,提高计算效率。
安全性高:进程间的资源独立性提高了程序的安全性,避免了资源争用和数据竞争问题。
多线程与多进程的关系
多线程和多进程都是实现并发编程的技术手段,但它们在实现方式、适用场景和性能特性上有所不同。
实现方式:
多线程在同一个进程内创建多个线程,这些线程共享进程的内存和资源。
多进程在操作系统中创建多个进程,每个进程有自己独立的内存空间和资源。
适用场景:
多线程适用于需要共享大量数据和资源、需要快速上下文切换的场景,如 GUI 应用程序、实时系统等。
多进程适用于需要充分利用多核 CPU 进行并行计算、需要高独立性和安全性的场景,如高性能计算、分布式系统等。
性能特性:
多线程的创建和上下文切换开销较小,但需要注意线程同步和数据竞争问题。
多进程的创建和上下文切换开销较大,但进程间隔离性强,不容易出现数据竞争问题。
2. 多线程通信方式
共享变量:通过共享变量进行线程间通信,但需要使用线程同步机制(如锁)来防止数据竞争。
import threading
class SharedCounter:
def __init__(self):
self.counter = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.counter += 1
print(f"Counter: {self.counter}")
def worker(counter: SharedCounter):
for _ in range(100):
counter.increment()
if __name__ == "__main__":
counter = SharedCounter()
threads = [threading.Thread(target=worker, args=(counter,)) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Final counter value: {counter.counter}")
事件(Event):通过设置和等待事件来进行线程间的同步和通信。
import threading
def worker(event: threading.Event):
print("Waiting for event to be set...")
event.wait()
print("Event received! Continuing work...")
if __name__ == "__main__":
event = threading.Event()
thread = threading.Thread(target=worker, args=(event,))
thread.start()
print("Main thread doing some work...")
threading.Event().wait(2) # 模拟主线程工作
print("Setting event...")
event.set()
thread.join()
队列(Queue):使用线程安全的队列(如queue.Queue)进行通信,这是最常见的方式。
import threading
import queue
def producer(queue: queue.Queue):
for i in range(5):
print(f"Producing {i}")
queue.put(i)
threading.Event().wait(1) # 模拟生产者工作
def consumer(queue: queue.Queue):
while True:
item = queue.get()
if item is None:
break # 结束信号
print(f"Consuming {item}")
queue.task_done()
if __name__ == "__main__":
q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
q.put(None) # 发送结束信号
consumer_thread.join()
4. 多进程通信方式
常见的多进程通信方式包括管道(Pipe)、消息队列(Message Queue)、共享内存(Shared Memory)、信号(Signal)、套接字(Socket)等。
- 管道(Pipe)
管道是一种半双工的通信方式,数据只能单向流动。通常用于父子进程之间的通信。 - 消息队列(Message Queue)
消息队列是一种先进先出(FIFO)的数据结构,允许多个进程将消息插入队列中并从队列中读取消息。 - 共享内存(Shared Memory)
共享内存允许多个进程访问同一块内存区域,需要借助同步机制(如锁)来避免数据竞争问题。 - 信号(Signal)
信号是一种进程间异步通信机制,通常用于通知进程某些事件的发生。信号主要用于进程间的简单通知,而不传递数据。
import os
import signal
import time
def signal_handler(signum, frame):
print(f"Received signal: {signum}")
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, signal_handler)
pid = os.fork()
if pid == 0: # 子进程
os.kill(os.getppid(), signal.SIGUSR1)
else: # 父进程
time.sleep(1) # 等待子进程发送信号
- 套接字(Socket)
套接字用于在进程间进行网络通信,可以实现跨机器的进程通信。
import multiprocessing
import socket
def worker(host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(b'Hello from child process!')
data = s.recv(1024)
print(f"Child process received: {data.decode()}")
def server(host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
data = conn.recv(1024)
print(f"Parent process received: {data.decode()}")
conn.sendall(b'Hello from parent process!')
if __name__ == '__main__':
host, port = '127.0.0.1', 65432
p = multiprocessing.Process(target=worker, args=(host, port))
p.start()
server(host, port)
p.join()
5. 源码实现
多线程源码说明
SimpleThreadedEnv
类:
__init__
方法:初始化环境数量并调用 _spawn_workers
方法。
_spawn_workers
方法:创建读写队列并启动工作线程。返回从队列读取和写入数据的函数列表。
_worker
方法:工作线程执行的任务。读取任务,进行简单计算(这里是乘以2),然后将结果放入队列。
send_tasks
方法:发送任务给工作线程。
get_results
方法:从工作线程获取结果。
close
方法:发送关闭信号并等待线程结束。
使用示例:
创建 SimpleThreadedEnv
实例。
发送一组任务给线程处理。
获取并打印结果。
关闭环境。
Callable
是一个类型提示,用于表示一个可调用的对象。这些对象可以是函数、方法,甚至是实现了__call__
方法的类实例。在类型提示中使用Callable
可以明确表明某个参数或变量应该是一个可以被调用的对象。
作用
Callable
的主要作用是使代码更具可读性和可维护性,特别是在使用静态类型检查工具(如mypy)时,可以更早地发现潜在的错误。
用法
Callable
是从typing
模块引入的,可以用来注释函数参数和返回值。例如,Callable[[Arg1Type, Arg2Type], ReturnType]
表示一个接受两个参数Arg1Type
和Arg2Type
并返回ReturnType
的可调用对象。
from queue import Queue
from threading import Thread
from typing import Callable, List, Tuple, Any
class SimpleThreadedEnv:
def __init__(self, num_envs: int):
self._num_envs = num_envs
(
self._connection_read_fns,
self._connection_write_fns,
) = self._spawn_workers()
def _spawn_workers(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
parent_read_queues, parent_write_queues = zip(
*[(Queue(), Queue()) for _ in range(self._num_envs)]
)
self._workers = []
for parent_read_queue, parent_write_queue in zip(
parent_read_queues, parent_write_queues
):
thread = Thread(
target=self._worker,
args=(
parent_write_queue.get,
parent_read_queue.put,
),
)
self._workers.append(thread)
thread.daemon = True
thread.start()
return (
[q.get for q in parent_read_queues],
[q.put for q in parent_write_queues],
)
def _worker(self, get_fn: Callable[[], Any], put_fn: Callable[[Any], None]):
while True:
task = get_fn()
if task is None:
break # Exit signal received
result = task * 2 # Simple computation: multiply by 2
put_fn(result)
def send_tasks(self, tasks: List[Any]):
for task, write_fn in zip(tasks, self._connection_write_fns):
write_fn(task)
def get_results(self) -> List[Any]:
return [read_fn() for read_fn in self._connection_read_fns]
def close(self):
for write_fn in self._connection_write_fns:
write_fn(None)
for worker in self._workers:
worker.join()
if __name__ == "__main__":
num_envs = 4
env = SimpleThreadedEnv(num_envs)
tasks = [i for i in range(num_envs)]
print(f"Sending tasks: {tasks}")
env.send_tasks(tasks)
results = env.get_results()
print(f"Received results: {results}")
env.close()
多进程通信源码实现说明
MultiprocessingExample
类: 管理多进程任务。
进程工作函数: 使用静态方法定义了 _worker_pipe
、_worker_queue
和 _worker_shared_memory
,分别处理管道、队列和共享内存的通信。
进程生成函数: 使用 _spawn_worker_pipe
、_spawn_worker_queue
和 _spawn_worker_shared_memory
函数来生成并启动进程。
run 方法: 管理多进程任务的执行,展示如何使用不同的通信方式进行进程间通信。
资源管理: 使用上下文管理器模式(__enter__
和 __exit__
方法)和析构函数(__del__
方法)确保进程在使用完毕后正确关闭。
import multiprocessing as mp
from multiprocessing import Pipe, Queue, Value, Array
from threading import Thread
from typing import Any, Callable, List, Tuple, Union
import time
import os
class MultiprocessingExample:
_workers: List[Union[mp.Process, Thread]]
_is_waiting: bool
_is_closed: bool
def __init__(self, num_workers: int) -> None:
self._num_workers = num_workers
self._workers = []
self._is_waiting = False
self._is_closed = True
@staticmethod
def _worker_pipe(pipe_conn: Any) -> None:
for i in range(5):
msg = f"Message {i} from process {os.getpid()}"
pipe_conn.send(msg)
print(pipe_conn.recv())
time.sleep(1)
pipe_conn.close()
@staticmethod
def _worker_queue(queue: Any) -> None:
for i in range(5):
msg = f"Message {i} from process {os.getpid()}"
queue.put(msg)
print(queue.get())
time.sleep(1)
@staticmethod
def _worker_shared_memory(val: Value, arr: Array) -> None:
for i in range(5):
with val.get_lock():
val.value += 1
with arr.get_lock():
for j in range(len(arr)):
arr[j] += 1
print(f"Process {os.getpid()} updated shared memory")
time.sleep(1)
def _spawn_worker_pipe(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
parent_conn, child_conn = Pipe()
process = mp.Process(target=self._worker_pipe, args=(child_conn,))
self._workers.append(process)
process.start()
return [parent_conn.recv], [parent_conn.send]
def _spawn_worker_queue(self) -> Tuple[List[Callable[[], Any]], List[Callable[[Any], None]]]:
queue = Queue()
process = mp.Process(target=self._worker_queue, args=(queue,))
self._workers.append(process)
process.start()
return [queue.get], [queue.put]
def _spawn_worker_shared_memory(self) -> Tuple[Value, Array, mp.Process]:
shared_val = Value('i', 0)
shared_arr = Array('i', range(5))
process = mp.Process(target=self._worker_shared_memory, args=(shared_val, shared_arr))
self._workers.append(process)
process.start()
return shared_val, shared_arr, process
def run(self) -> None:
self._is_closed = False
# Pipe example
pipe_read_fns, pipe_write_fns = self._spawn_worker_pipe()
for i in range(5):
print(pipe_read_fns[0]())
pipe_write_fns[0](f"Reply {i} from main process")
# Queue example
queue_read_fns, queue_write_fns = self._spawn_worker_queue()
for i in range(5):
print(queue_read_fns[0]())
queue_write_fns[0](f"Reply {i} from main process")
# Shared memory example
shared_val, shared_arr, shared_mem_process = self._spawn_worker_shared_memory()
shared_mem_process.join()
print(f"Final shared value: {shared_val.value}")
print(f"Final shared array: {list(shared_arr)}")
self._close()
def _close(self) -> None:
if self._is_closed:
return
for process in self._workers:
process.join()
self._is_closed = True
def __del__(self):
self._close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._close()
if __name__ == "__main__":
with MultiprocessingExample(num_workers=3) as example:
example.run()