一、核心要义
主要以三个模拟网络下载的代码-分别是依序下载、使用concurrent.futures模块中的Executor.map方法、以及使用该模块的executor.submit和futures.as_completed方法,来展示Python实现并发编程的其中一种方式。
二、代码示例
1、依序下载的脚本
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/4 20:10
# @Author : Maple
# @File : 01-依序下载的脚本.py
# @Software: PyCharm
from random import randint
from concurrent import futures
import time
"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""
# 使用futures的map函数
MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]
def get_randint():
return randint(1, 10)
def download_one_img(id):
print('***', id, '号图片开始下载***')
download_time = get_randint()
time.sleep(download_time)
print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
return str(id) + '号图片内容'
def download_many_img(cc_list):
for id in cc_list:
download_one_img(id)
return len(list(cc_list))
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)
2、futures模块中Executor.map方法实现并发
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/4 20:13
# @Author : Maple
# @File : 02-使用future实现并发下载(1).py
# @Software: PyCharm
from random import randint
from concurrent import futures
import time
"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""
# 使用futures的map函数
MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]
def get_randint():
return randint(1, 10)
def download_one_img(id):
print('***', id, '号图片开始下载***')
download_time = get_randint()
time.sleep(download_time)
print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
return str(id) + '号图片内容'
def download_many_img(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
# excutor的__exit__方法会在调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程
with futures.ThreadPoolExecutor(workers) as excutor:
# map方法返回一个生成器,因此可以通过迭代,获取每个函数的返回值
# res是一个迭代器,通过遍历获取每一个函数的返回值
# 调用map函数,任务立刻就开始执行,但是因为workers = 3,所以最开始只会有3个任务启动执行
# 直到某一个任务执行完成,该线程才会退让给另外一个任务
res = excutor.map(download_one_img, cc_list)
return len(list(res))
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)
执行阶段1:3个线程,开始执行前3个任务
执行阶段2:1号图片完成下载,被占用的线程被释放,开始下载4号图片
后续阶段,与上面类似。总之,最多同时只有3个任务执行。
3、futures模块中Executor.submit和as_completed方法实现并发
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/4 20:13
# @Author : Maple
# @File : 02-使用future实现并发下载(1).py
# @Software: PyCharm
from random import randint
from concurrent import futures
import time
import threading
"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""
# 使用futures的submit和as_completed函数
MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]
def get_randint():
return randint(1, 10)
def download_one_img(id):
t = threading.current_thread()
print('***{}号图片开始下载,线程id为{}***'.format(id,t.ident))
download_time = get_randint()
time.sleep(download_time)
print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
return str(id) + '号图片内容'
def download_many_img(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
# excutor的__exit__方法会在调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程
with futures.ThreadPoolExecutor(max_workers=workers) as excutor:
to_do = []
for cc in sorted(cc_list):
# 获取当前线程
t = threading.current_thread()
# 提交任务,任务立刻就开始执行,但是因为workers = 3,所以最开始只会有3个任务启动执行
# 直到某一个任务执行完成,该线程才会退让给另外一个任务
future = excutor.submit(download_one_img, cc)
to_do.append(future)
msg = 'Scheduled for {}: {},thread_id:{}'
print(msg.format(cc, future,t.ident))
results = []
for furture in futures.as_completed(to_do):
res = furture.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)
return len(results)
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)
注意观察结果:
(1) 与Executor.map方法类似,必须等到3号图片下载完成,才会开始下载4号图片
(2) 通过观察线程id,也可以发现一共有3个线程负责图片下载任务
(3) download_one_img的调用方download_many_img,启动的则是另外一个单独的线程
4、futures模块中Executor.map方法详测
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/5 19:50
# @Author : Maple
# @File : 04-补充测试Executor-map方法(1).py
# @Software: PyCharm
from random import randint
from time import strftime, sleep
from concurrent import futures
def get_randint():
return randint(1, 10)
def display(*args):
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)
def loiter(n):
msg = '{}loiter({}): doing nothing for {}s'
display(msg.format('\t' * n, n, n))
if n == 0:
sleep(10)
else:
sleep(n)
msg = '{} loiter({}): done'
display(msg.format('\t' * n, n))
return n * 10
def main():
display('Script starting')
executor = futures.ThreadPoolExecutor(max_workers=3)
# executor.map,被调用函数返回结果的顺序和调用的顺序必然一致
# 比如此例中第一个调用的函数时loiter(0),因为其会sleep(10),导致虽然在这10s内其它的函数已经执行完毕了
# 都必须等其执行返回,返回结果后,才依序返回其它函数的结果
results = executor.map(loiter, range(5))
display('results:', results)
display('waiting for individual result')
for i, result in enumerate(results):
display('result {}:{}'.format(i, result))
if __name__ == '__main__':
main()
执行阶段1:主线程先打印Script starting, 然后执行到executor.map(开启另外3个子线程, 由于最大线程数是3,会立刻先启动3个task) , 然后主程序继续向下执行,打印results:xx 和 waiting for....
执行阶段2:等到第一个子线程的任务执行完成(注意,此时并没有返回结果),该线程会开启下一个任务,之后的阶段都相同。
执行阶段3: 按照调用任务的顺序,返回任务的结果。注意:特意为`编号为0`的任务设置了一个比较长的休眠时间(10s), 导致后面所有任务的返回结果都阻塞, 要一直等到该任务休眠结束,返回结果,后面任务的结果才依序返回。
5、futures模块中Executor.submit和as_complete方法详测
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/5 19:50
# @Author : Maple
# @File : 04-补充测试Executor-map方法(1).py
# @Software: PyCharm
from random import randint
from time import strftime, sleep
from concurrent import futures
def get_randint():
return randint(1, 10)
def display(*args):
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)
def loiter(n):
msg = '{}loiter({}): doing nothing for {}s'
display(msg.format('\t' * n, n, n))
if n == 0:
sleep(10)
else:
sleep(n)
msg = '{} loiter({}): done'
display(msg.format('\t' * n, n))
# 返回一个元组:第一个元素记录调用函数的id
return (n,n * 10)
def main():
display('Script starting')
executor = futures.ThreadPoolExecutor(max_workers=3)
to_do = []
for i in sorted(range(5)):
# 通过submit和as_completed方式,函数返回结果的顺序,不受调用顺序的限制:谁先执行完毕,就可以先返回结果
future = executor.submit(loiter, i)
to_do.append(future)
for i,furture in enumerate(futures.as_completed(to_do)):
res = furture.result()
# 记录返回结果的id序号及结果
display('result {}:{}'.format(i,res))
if __name__ == '__main__':
main()
第一阶段:与Executor.map方法相同
第二阶段:与Executor.map方法不同的地方在于,任务1执行完毕(done)后,直接先返回结果了。不需要等任务编号为0的任务休眠完毕(10s),再返回result结果。
6、futures并发编程捕获异常
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/3/6 20:46
# @Author : Maple
# @File : 07-并发异常捕获.py
# @Software: PyCharm
import threading
import time
from concurrent import futures
from random import randint
MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]
class Status_404(Exception):
def __init__(self,msg):
self.msg = msg
class MyConnectionError(Exception):
def __init__(self,msg):
self.msg = msg
def get_randint():
return randint(1, 10)
def download_one_img(id):
try:
if id !=3 and id !=4:
print('***', id, '号图片开始下载***\n')
elif id == 4:
# 给4号图片制造一个连接异常
print('***', id, '号图片开始下载***\n')
raise MyConnectionError("遇到一个连接异常")
else:
# 给3号图片制造一个404异常
print('***', id, '号图片开始下载***\n')
raise Status_404("遇到了一个404异常")
except Status_404 as e:
# 捕获404异常
print(e.msg)
except Exception as e:
# 其它异常往上抛(调用方)
raise
else:
download_time = get_randint()
time.sleep(download_time)
print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
return str(id) + '号图片内容'
def download_many_img(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(max_workers=workers) as excutor:
to_do = []
for cc in sorted(cc_list):
future = excutor.submit(download_one_img, cc)
to_do.append(future)
results = []
for furture in futures.as_completed(to_do):
try:
res = furture.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)
# 捕获连接异常
except MyConnectionError as e:
print(e.msg)
return len(results)
def main(download_many_img):
t0 = time.time()
count = download_many_img(CC_LIST)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many_img)