Python广为使用的并发处理库futures使用入门与内部原理_concurrent.futures-CSDN博客
ThreadPoolExecutor(max_workers=1)
池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例、支持上下文,进入时返回自己,退出时调用submit(fn, *args, **kwargs)
提交执行的函数及其参数,如有空闲开启daemon线程,返回Future类的实例shutdown(wait=True)
清理池,wait表示是否等待到任务线程完成
future类
done()
如果调用被成功的取消或者执行完成,返回Truecancelled()
如果调用被成功的取消,返回Truerunning()
如果正在运行且不能被取消,返回Truecancel()
尝试取消调用。如果已经执行且不能取消返回False,否则返回Trueresult(timeout=None)
取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常exception(timeout=None)
取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常
# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# 分割子任务
def each_task(index):
time.sleep(1) # 睡1s,模拟IO
print("thread %s square %d" % (threading.current_thread().ident, index))
return index * index # 返回结果
def run(thread_num, task_num):
# 实例化线程池,thread_num个线程
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future列表
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # 提交任务
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # 求和
print("total result=%s cost: %.2fs" % (s, duration))
executor.shutdown() # 销毁线程池
if __name__ == '__main__':
fire.Fire(run)
python test5.py 5 10 设置了5个线程10个任务
# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# 分割子任务
def each_task(n):
# 按公式计算圆周率
s = 0.0
for i in range(n):
s += 1.0 / (i + 1) / (i + 1)
time.sleep(2)
# print(s)
pi = math.sqrt(6 * s)
print(str(os.getpid())+ "===="+str(pi))
# os.getpid可以获得子进程号
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # 输入多个n值,分成多个子任务来计算结果
# 实例化进程池,process_num个进程
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future列表
for n in ns:
fs.append(executor.submit(each_task, int(n))) # 提交任务
print("len(fs)", len(fs))
wait(fs) # 等待计算结束
end = time.time()
duration = end - start
print("total cost: %.2fs" % duration)
executor.shutdown() # 销毁进程池
if __name__ == '__main__':
# fire.Fire(run)
run(2,6,10) # 2个线程2个任务
import time
from concurrent.futures import ThreadPoolExecutor, wait
import datetime
import logging
import threading
FORMAT = "%(asctime)s [%(processName)s %(threadName)s] %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
def calc(base):
sum = base
for i in range(5):
time.sleep(1)
sum += 1
logging.info(str(sum)+"==========="+str(threading.current_thread()))
print("sum为",sum)
return sum
start = datetime.datetime.now()
executor = ThreadPoolExecutor(3) # 设置3个线程
with executor: # 默认shutdown阻塞
fs = []
for i in range(3): # 设置了3个任务同时执行,互不影响
future = executor.submit(calc, i * 100)
fs.append(future)
# wait(fs) # 阻塞
print('-' * 30)
print("len(fs)",len(fs))
for f in fs:
print(f, f.done(), f.result()) # done不阻塞,result阻塞
print('=' * 30)
delta = (datetime.datetime.now() - start).total_seconds()
print(delta)
import time
from concurrent.futures import ProcessPoolExecutor, wait
import datetime
import logging
import os
FORMAT = "%(asctime)s [%(processName)s %(threadName)s] %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
def calc(base):
sum = base
for i in range(5):
sum += 1
time.sleep(2)
print(str(os.getpid())+"========="+str(sum))
logging.info(sum)
return sum
if __name__ == '__main__':
start = datetime.datetime.now()
executor = ProcessPoolExecutor(3)
with executor: # 默认shutdown阻塞
fs = []
for i in range(3):
future = executor.submit(calc, i * 100)
fs.append(future)
# wait(fs) # 阻塞
print('-' * 30)
for f in fs:
print(f, f.done(), f.result()) # done不阻塞,result阻塞
print('=' * 30)
delta = (datetime.datetime.now() - start).total_seconds()
print(delta)