在PyQt中,图形化界面(GUI)是运行在主线程中的,而多进程是在独立的进程中执行的。默认情况下,多进程之间是无法直接共享图形化界面的。
然而,有几种方法可以在多进程中与PyQt的图形化界面进行通信:
-
使用进程间通信(Inter-Process Communication,IPC)机制,如管道(Pipe)、共享内存(Shared Memory)或消息队列(Message Queue)。你可以在主线程中创建一个IPC对象,然后将其传递给子进程,子进程可以使用该对象与主线程进行通信。这样,你可以将任务的进度或结果发送回主线程,然后更新图形化界面。
无论使用哪种方法,都需要小心处理线程或进程间的同步和互斥,以避免出现竞争条件或其他并发问题。
需要注意的是,多进程会引入额外的开销和复杂性,因此在决定使用多进程之前,建议先考虑是否有其他的优化方案,例如使用多线程、异步编程或者优化算法等。
进程间通信
import sys
from PyQt5.QtCore import QProcess, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget
import time
# 子进程类
class WorkerProcess(QProcess): # 继承自QProcess,用于表示子进程
resultReady = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.readyReadStandardOutput.connect(self.handle_output)
def handle_output(self):
data = self.readAllStandardOutput().data().decode()
self.resultReady.emit(data)
# 主窗口类
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle('IPC Example')
self.button = QPushButton('Start', self)
self.button.clicked.connect(self.start_worker)
layout = QVBoxLayout()
layout.addWidget(self.button)
widget = QWidget(self)
widget.setLayout(layout)
self.setCentralWidget(widget)
def start_worker(self):
self.button.setEnabled(False)
self.worker_process = WorkerProcess()
self.worker_process.finished.connect(self.worker_finished)
self.worker_process.resultReady.connect(self.handle_result)
self.worker_process.start('python', ['worker.py'])
@pyqtSlot(str)
def handle_result(self, result):
print(f'Result: {result}')
# 在这里处理子进程的结果
def worker_finished(self):
self.button.setEnabled(True)
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
将在终端的的显示结果显示在PyQt上
import sys
from PyQt5.QtCore import QProcess, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QTextEdit
# 子进程类
class WorkerProcess(QProcess):
resultReady = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.readyReadStandardOutput.connect(self.handle_output)
def handle_output(self):
data = self.readAllStandardOutput().data().decode()
self.resultReady.emit(data)
# 主窗口类
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle('IPC Example')
self.button = QPushButton('Start', self)
self.button.clicked.connect(self.start_worker)
self.text_edit = QTextEdit(self)
layout = QVBoxLayout()
layout.addWidget(self.button)
layout.addWidget(self.text_edit)
widget = QWidget(self)
widget.setLayout(layout)
self.setCentralWidget(widget)
def start_worker(self):
self.button.setEnabled(False) # 首先禁用按钮
self.worker_process = WorkerProcess() # 创建一个WorkerProcess对象作为子进程。
self.worker_process.finished.connect(self.worker_finished) # 子进程的finished信号连接到worker_finished槽函数
self.worker_process.resultReady.connect(self.handle_result) # 将子进程的resultReady信号连接到handle_result槽函数
self.worker_process.start('python', ['worker.py']) # 使用start方法启动子进程,执行worker.py脚本
@pyqtSlot(str)
def handle_result(self, result): # 槽函数 接收到子进程的输出结果
self.text_edit.append(result) # 将结果添加到文本编辑框中
def worker_finished(self): # 当子进程完成时,将执行worker_finished函数
self.button.setEnabled(True)
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
worker.py
import time
# 模拟耗时任务
def do_work():
for i in range(20):
time.sleep(1)
print(f'Progress: {i+1}')
if __name__ == '__main__':
do_work()
进程与进程之间通信
在多进程编程中,进程与进程之间可以使用多种方式进行通信,消息队列只是其中一种方式。以下是一些常用的进程间通信(IPC)机制:
-
消息队列(Message Queues):进程可以通过消息队列发送和接收消息。消息队列提供了一种异步的通信方式,进程之间可以通过队列传递数据。
-
共享内存(Shared Memory):进程可以通过共享内存区域在它们之间共享数据。多个进程可以访问同一块内存,从而实现数据共享。
-
管道(Pipes):管道是一种单向的通信方式,用于在两个进程之间传递数据。一个进程充当管道的写入端,另一个进程充当管道的读取端。
-
套接字(Sockets):套接字是一种网络编程中常用的通信方式,但它也可以用于进程间通信。进程可以通过套接字在网络上或本地主机上进行通信。
-
文件(Files):进程可以通过读写文件的方式进行通信。一个进程可以将数据写入文件,另一个进程可以读取该文件来获取数据。
这些是一些常见的进程间通信方式,每种方式都有其适用的场景和特点。选择适当的通信方式取决于具体的应用需求和设计考虑。
消息队列queue
案例1:
在PyQt中,可以使用信号(Signal)和槽(Slot)机制实现多进程通信。通过使用QObject
类及其子类的pyqtSignal
信号对象,可以在多个进程之间传递数据。
以下是一个示例代码,展示了如何在多进程中使用emit
发送信号:
from PyQt5.QtCore import QObject, pyqtSignal
from multiprocessing import Process, Queue
class Worker(QObject):
finished = pyqtSignal() # 自定义信号,用于通知任务完成
result = pyqtSignal(int) # 自定义信号,用于传递结果
def __init__(self, queue):
super().__init__()
self.queue = queue
def do_work(self):
while True:
item = self.queue.get()
if item is None:
break
# 执行任务
result = item * item
# 发送结果信号
self.result.emit(result)
# 发送完成信号
self.finished.emit()
def worker_process(queue):
# 创建Worker对象
worker = Worker(queue)
# 连接信号与槽
worker.result.connect(print)
worker.finished.connect(lambda: print("Worker finished"))
# 执行任务
worker.do_work()
if __name__ == '__main__':
# 创建进程间通信的队列
queue = Queue()
# 创建子进程
process = Process(target=worker_process, args=(queue,))
# 启动子进程
process.start()
# 向队列中放入任务
queue.put(2)
queue.put(5)
queue.put(None) # 任务结束的标志
# 等待子进程结束
process.join()
在上述代码中,我们定义了一个Worker
类,继承自QObject
,并包含了两个自定义信号:finished
和result
。do_work
方法中,我们通过调用self.result.emit(result)
发送结果信号,通过调用self.finished.emit()
发送完成信号。
在主进程中,我们创建了一个Queue
对象用于进程间通信,并创建了一个子进程,将该队列作为参数传递给子进程。接着,我们向队列中放入任务,子进程会从队列中取出任务并执行。子进程通过连接信号与槽,将结果打印出来。
需要注意的是,由于PyQt的信号与槽机制是基于事件循环的,因此在多进程中使用时,需要确保每个进程都有自己的事件循环。在上述示例中,我们在子进程中创建了一个Worker
对象,并在子进程的事件循环中执行任务。
案例2:
在queue中放置不同数据类型
from multiprocessing import Process, Queue
def worker(queue):
# 向队列中放置不同数据类型
queue.put(42) # 整数
queue.put('Hello') # 字符串
queue.put([1, 2, 3]) # 列表
if __name__ == '__main__':
# 创建进程间通信的队列
queue = Queue()
# 创建子进程
process = Process(target=worker, args=(queue,))
# 启动子进程
process.start()
# 从队列中获取数据
data1 = queue.get() # 获取整数
data2 = queue.get() # 获取字符串
data3 = queue.get() # 获取列表
# 打印获取到的数据
print(data1) # 输出: 42
print(data2) # 输出: Hello
print(data3) # 输出: [1, 2, 3]
# 等待子进程结束
process.join()
在上述代码中,我们创建了一个multiprocessing.Queue
对象,然后在子进程中通过queue.put()
方法将不同的数据类型放入队列中。在主进程中,我们通过queue.get()
方法从队列中获取数据,并打印出来。
因为multiprocessing.Queue
是进程安全的,所以可以在多个进程之间安全地传递不同的数据类型。
共享一个列表对象(共享内存)
from multiprocessing import Process, Value, Array
def modify_list(shared_list):
# 在共享内存中修改列表
shared_list[0] = 10
shared_list[1] = 20
shared_list[2] = 30
if __name__ == '__main__':
# 创建共享内存的列表
shared_list = Array('i', [0, 0, 0])
# 创建子进程,传递共享内存对象
p = Process(target=modify_list, args=(shared_list,))
# 启动子进程
p.start()
# 等待子进程结束
p.join()
# 在主进程中访问共享内存中的列表
print(shared_list[:])
在上述示例中,我们首先导入了 multiprocessing
模块中的 Process
、Value
和 Array
对象。然后,我们定义了一个名为 modify_list
的函数,该函数接受一个共享内存的列表对象,并在列表中修改了几个元素的值。
在主程序中,我们使用 Array
对象创建了一个共享内存的列表 shared_list
,初始值为 [0, 0, 0]
。然后,我们创建了一个子进程 p
,并将共享内存对象 shared_list
作为参数传递给子进程的函数 modify_list
。
接下来,我们启动子进程,并使用 join()
方法等待子进程结束。最后,在主进程中访问共享内存中的列表,并打印出列表的值。
在运行该示例时,你会看到输出结果为 [10, 20, 30]
,表示子进程成功地修改了共享内存中的列表。
需要注意的是,共享内存是一种低级别的操作,需要谨慎使用,并确保在不同进程之间正确地同步对共享内存的访问。
multiprocessing
模块中的Manager
当涉及到子进程修改对象并自动更新到主进程时,可以使用multiprocessing
模块中的Manager
来创建一个可以在主进程和子进程之间共享的对象。下面是一个完整的示例代码:
案例1:
self.shared_dict = manager.dict()
import multiprocessing
from PyQt5 import QtWidgets
class MyProcess(multiprocessing.Process):
def __init__(self, shared_dict):
super().__init__()
self.shared_dict = shared_dict
def run(self):
# 在子进程中修改共享字典的值
self.shared_dict['value'] = 'Updated value'
class MyWidget(QtWidgets.QWidget):
def __init__(self):
super().__init__()
# 创建一个可以在主进程和子进程之间共享的字典
manager = multiprocessing.Manager()
self.shared_dict = manager.dict()
self.start_child_process()
def start_child_process(self):
process = MyProcess(self.shared_dict)
process.start()
process.join()
# 在主进程中访问共享字典的值
print(self.shared_dict['value']) # 输出: Updated value
if __name__ == '__main__':
app = QtWidgets.QApplication([])
widget = MyWidget()
widget.show()
app.exec_()
process.join()是什么意思
在multiprocessing
模块中,join()
方法用于等待子进程结束。当调用join()
方法时,主进程会阻塞,直到子进程执行完毕才会继续执行主进程的后续代码。
join()
方法的作用是将主进程与子进程进行同步,确保在主进程继续执行之前,子进程已经完成了任务。如果不调用join()
方法,主进程将继续执行后续代码,可能会导致子进程还未完成时主进程已经结束。
在示例代码中,process.join()
的作用是等待子进程执行完毕。这样可以确保在访问共享字典之前,子进程已经完成了对字典的修改。
关于join
一词在英语中确实有“加入”的意思,但在这里的上下文中,它表示主进程等待子进程的结束。术语可能有些混淆,但在编程中,join()
方法的含义是等待子进程的完成。
案例2
manager.list(["Item 1", "Item 2"])
from multiprocessing import Process, Manager
def worker_func(shared_list):
shared_list.append("New item from worker")
if __name__ == "__main__":
manager = Manager()
shared_list = manager.list(["Item 1", "Item 2"]) # 创建共享的列表对象
print("Before worker:", shared_list)
worker = Process(target=worker_func, args=(shared_list,))
worker.start()
worker.join()
print("After worker:", shared_list)
如果是两个进程之间通信,可以通过字典传递各自的对象吗
在两个进程之间进行通信时,直接通过字典传递对象是不可行的。这是因为不同进程拥有各自独立的内存空间,对象的地址在不同的进程中是不一样的,无法直接共享。
在进程间通信时,可以考虑使用一些特定的机制来传递对象,例如序列化和反序列化、共享内存、套接字等。
序列化和反序列化:可以使用 pickle 或其他序列化库将对象序列化为字节流,然后在进程之间传递字节流,最后在接收进程中反序列化为对象。这样可以实现对象的传递,但需要注意对象是否支持序列化,并且在反序列化时需要保证对象的类定义存在。
共享内存:可以使用共享内存来在进程之间共享数据,包括对象。可以使用 multiprocessing 模块中的共享内存对象,如 Value 和 Array,或者使用第三方库,如 mmap。在共享内存中存储对象时,需要确保对象的数据结构在不同进程之间是可访问和可理解的。
套接字:可以使用套接字进行进程间通信,例如使用 TCP 或 UDP 套接字。在这种情况下,可以将对象的数据转换为字节流,并通过套接字发送给接收进程,然后在接收进程中将字节流转换回对象。
需要注意的是,在进行进程间对象传递时,需要考虑对象的序列化和反序列化过程,并确保进程之间的同步。此外,还需要注意对象的大小和性能影响,以及对象所包含的其他对象是否也需要进行传递。
总之,直接通过字典传递对象是不可行的,但可以使用其他适当的机制来在进程间进行对象传递。具体选择哪种方式取决于应用程序的需求和环境。
进程池
进程池中有两个方法:
apply:同步,一般不使用
apply_async:异步
案例1
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QLabel, QVBoxLayout, QWidget
from PyQt5.QtCore import Qt, QThreadPool, QRunnable, pyqtSlot
import multiprocessing
# 自定义任务类
class Task(QRunnable):
def __init__(self, num):
super().__init__()
self.num = num
@pyqtSlot()
def run(self):
result = self.num * self.num
print(f"Task {self.num}: {result}")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("PyQt Process Pool Example")
self.central_widget = QWidget(self)
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout()
self.central_widget.setLayout(self.layout)
self.label = QLabel("Click the button to start tasks.", self)
self.layout.addWidget(self.label)
self.button = QPushButton("Start Tasks", self)
self.button.clicked.connect(self.start_tasks)
self.layout.addWidget(self.button)
self.pool = multiprocessing.Pool()
def start_tasks(self):
for i in range(1, 6):
self.pool.apply_async(task_function, args=(i,))
def closeEvent(self, event):
self.pool.close()
self.pool.join()
event.accept()
def task_function(num):
result = num * num
print(f"Task {num}: {result}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
案例2
from multiprocessing import Process,Pool
import os, time, random
def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
pool = Pool(5) #创建一个5个进程的进程池
for i in range(10):
pool.apply_async(func=fun1, args=(i,))
pool.close()
pool.join()
print('结束测试')
Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
结束测试
案例3
import sys
from multiprocessing import Pool
from PyQt5.QtWidgets import *
class Window(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("hello")
self.resize(800, 600)
self.btn = QPushButton('Start')
self.setup_ui()
def setup_ui(self):
layout = QVBoxLayout()
self.setLayout(layout)
layout.addWidget(self.btn)
def updater(num):
print(num)
def main_tracker():
p = Pool(processes=4)
p.map(updater, range(0, 100))
p.close()
p.join()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = Window()
# 建立槽连接
window.btn.clicked.connect(main_tracker)
window.show()
sys.exit(app.exec_())
进程池一定会用到map吗
不,进程池并不一定需要使用map
方法。map
方法是用于将函数应用于可迭代对象的每个元素,并返回结果列表。它是进程池中常用的方法之一,但并不是唯一的方法。
在你的代码中,你可以看到使用了apply_async
方法来异步地将任务提交给进程池。这个方法不同于map
,它允许你提交单个任务并立即返回一个AsyncResult
对象,而不需要等待任务完成。你可以使用AsyncResult
对象的get
方法来获取任务的结果。
所以,进程池的使用并不一定需要使用map
方法,你可以根据具体的需求选择适合的方法来提交任务和获取结果。