Python 标准库 - 并发执行
- 1. 简单介绍
- 2. 程序示例
- 2.1 threading 编程示例
- 2.2 multiprocessing 编程示例
- 2.3 concurrent.futures 编程示例
1. 简单介绍
Python 标准库
非常庞大,所提供的组件涉及范围十分广泛,官方参考链接https://docs.python.org/zh-cn/3.9/library/index.html。这个库包含了多个内置模块 (以 C 编写),Python 程序员必须依靠它们来实现系统级功能,例如文件 I/O,此外还有大量以 Python 编写的模块,提供了日常编程中许多问题的标准解决方案
。
我们通常在 Python 上进行算法开发,因为 Python 编程方便,易于快速验证算法。而在验证算法正确后,如果对运行效率有更高要求的话,则会将计算密集的模块使用 多线程 / 多进程 来执行,来达到代码运行效率加速的效果。所以,这就涉及到了 Python 标准库 - 并发执行。https://docs.python.org/zh-cn/3.9/library/concurrency.html
2. 程序示例
以下编程示例均为个人根据 Python 标准库 - 并发执行,开展的相关实战代码。
2.1 threading 编程示例
import threading
import queue
g_result = queue.Queue()
# calculate sum from start to end
def do_threading(start, end, thread_id):
sum_value = 0
for i in range(start, end):
sum_value += i
g_result.put((thread_id, sum_value))
thread_num = 16
thread_pool = []
for task_id in range(thread_num):
thread_task = threading.Thread(target=do_threading,
args=(task_id * 100, (task_id + 1) * 100, task_id))
thread_pool.append(thread_task)
for thread_task in thread_pool:
thread_task.start()
for thread_task in thread_pool:
thread_task.join()
for task_id in range(thread_num):
result = g_result.get()
print("threading no.", result[0], ", result is", result[1])
2.2 multiprocessing 编程示例
import multiprocessing
# calculate sum from start to end
def do_process(start, end):
sum_value = 0
for i in range(start, end):
sum_value += i
return sum_value
cpu_num = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cpu_num)
results = []
task_num = 16
for task_id in range(task_num):
results.append(pool.apply_async(do_process, (task_id * 100, (task_id + 1) * 100)))
pool.close()
pool.join()
for task_id in range(task_num):
print("process no.", task_id, ", result is", results[task_id].get())
2.3 concurrent.futures 编程示例
import time
import numpy as np
from concurrent.futures import ThreadPoolExecutor
def read_files_thread(files_seq, threads_num=4):
thread_pool = ThreadPoolExecutor(max_workers=threads_num)
def read_file(file_path):
return np.fromfile(file_path, dtype=np.float32)
files_val_seq = [i for i in thread_pool.map(read_file, files_seq)]
thread_pool.shutdown()
return files_val_seq
def read_files(files_seq):
def read_file(file_path):
return np.fromfile(file_path, dtype=np.float32)
files_val_seq = [read_file(i) for i in files_seq]
return files_val_seq
file_sequence = ['1.bin', '2.bin', '3.bin', '4.bin', '5.bin', '6.bin']
start_time = time.time()
method_1 = read_files_thread(file_sequence, 8)
print("method_1 time consume ", time.time() - start_time)
start_time = time.time()
method_2 = read_files(file_sequence)
print("method_2 time consume ", time.time() - start_time)