在实际处理需求的过程中,博主比较偏爱使用多线程threading + 队列queue的方式 去开发代码。
(本文注重的是搭建模板框架,仅供参考)
举例:
以豆瓣电影的排行为例,写个简单的demo。
豆瓣链接:https://movie.douban.com/top250?start=0&filter=
一共也就10页!
直接上代码:
import json
import urllib3
import threading
from queue import Queue
from loguru import logger
urllib3.disable_warnings()
class CrawlDoubanMovie(object):
def __init__(self):
self.max_times = 5
self.queue_req = Queue()
# 创建塞入任务
def create_tasks(self):
for i in range(1, 11):
task_info = {"page_num": i}
self.queue_req.put(json.dumps(task_info, ensure_ascii=False))
logger.info(f"本次任务数量:{self.queue_req.qsize()}")
# 请求解析操作
def get_content_from_web(self, page):
pass
# 解析任务
def parse_task(self, task_info):
page_num = task_info["page_num"]
logger.info(f"正在访问豆瓣电影第【{page_num}】页!")
if "retry_times" not in task_info.keys():
task_info["retry_times"] = 0
if task_info["retry_times"] > self.max_times:
logger.error(f"current task crawl failed, page_num:{page_num}")
return
else:
try:
self.get_content_from_web(page_num)
except Exception as exc:
retry_task = {"page_num": page_num}
self.queue_req.put(json.dumps(retry_task, ensure_ascii=False))
logger.error(f"抓取失败, 重新放入队列中! page_num:{page_num}")
else:
logger.info(f"抓取成功,返回内容! page_num:{page_num}")
pass
def main_run(self):
while True:
task_info = self.queue_req.get()
if task_info is None:
self.queue_req.task_done()
break
else:
self.parse_task(json.loads(task_info))
self.queue_req.task_done()
if __name__ == "__main__":
threading_num = 6
crawl = CrawlDoubanMovie()
crawl.create_tasks()
for i in range(threading_num):
t = threading.Thread(target=crawl.main_run)
t.daemon = True
t.start()
crawl.queue_req.join()
模块和函数之间非常的清晰明了,
- create_tasks() 就负责初始化创建任务
- get_content_from_web()负责请求和解析数据
- parse_task() 负责解析任务,这里面还包括了错误重试并重新放入队列中。
- main_run()主函数 启动程序
搭配了多线程提升很多,也用到了队列。
运行效果图:
不仅使用方式简单实用,而且速度也是相当的快。