本节聚焦于使用协程、线程、进程实现并发爬虫任务。 Python 线程受全局解释器锁(GIL)制约,同一时刻仅能执行一个线程,无法充分利用多核 CPU 优势,且频繁切换线程会增加开销,影响爬虫性能。 协程是轻量级线程,由程序自主调度,可在 I/O 等待时切换任务,避免线程切换开销,提升 CPU 利用率与爬虫并发性能,能弥补线程性能不足。 除协程外,线程池和进程池也是实现并发的有效方式。线程池预先创建线程,减少创建与销毁开销;进程池创建多个进程,利用多核 CPU 优势,适合 CPU 密集型爬虫任务。 接下来,我们会通过代码示例展开介绍。
1.asyncio结合requests完成爬虫任务
requests 是 python 中的同步网络爬虫库,并不能直接使用asyncio运行。所以我们使用 asyncio 中的 run_in_executor 方法创建线程池完成并发。
"""
requests同步爬虫库:本身不支持协程环境
"""
import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup
url='https://movie.douban.com/top250?start={}&filter='
headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
# 异步函数,用于获取电影信息
async def get_movie_info(page_num):
# 创建一个事件循环对象
loop=asyncio.get_running_loop()
# 使用线程池执行 requests.get请求
# partial 是偏函数,用于固定函数的部分参数
res=await loop.run_in_executor(None,partial(requests.get,url.format(page_num*25),headers))
soup = BeautifulSoup(res.text, 'lxml')
div_list = soup.find_all('div', class_='hd')
for title in div_list:
print(title.get_text())
# 主函数,是哦那个asyncio并发执行多个任务
async def main():
# 创建任务列表,每个任务对应一个页面的电影信息获取
tasks=[asyncio.create_task(get_movie_info(i)) for i in range(10)]
# 等待所有任务完成
await asyncio.wait(tasks)
if __name__ == '__main__':
# 使用asyncio.run运行主函数
asyncio.run(main())
注意点:
1. partial(requests.get,url.format(page*25),headers=headers)等同以下代码
- requests.get(url.format(page*25),headers=headers)
2.functools.partial:
- 可以帮你创建一个新的函数,这个函数在调用时会自动将某些参数给原函数
2.使用aiohttp完成爬虫任务
由于 requests 爬虫库本身不支持异步,在 asyncio 中需要开启线程池才能使用。在使用上稍微有些麻烦,为了解决这个问题,我们使用支持异步操作的 aiohttp 来完成爬虫任务。
2.1 介绍与安装
aiohttp 是一个异步的网络库,可以实现 HTTP 客户端,也可以实现 HTTP 服务器,爬虫阶段我们只能用它来实现 HTTP 客户端功能。
官网:https://docs.aiohttp.org/en/stable/
aiohttp 客户端相关的官方文档:
https://docs.aiohttp.org/en/stable/client.html#aiohttp-client
pip install aiohttp -i https://pypi.tuna.tsinghua.edu.cn/simple
2.2 基本使用
import asyncio
import aiohttp
url = 'https://www.baidu.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
async def get_baidu():
# 方法一:
# 1.创建请求对象
# session = aiohttp.ClientSession()
# 2.使用请求对象发送网络请求 - get(get方法回返回一个响应对象)
# res = await session.get(url, headers=headers)
# 3.获取响应对象中的结果
# 协程函数需要await来获取返回的结果
# result = await res.text()
# print(result)
# 4.关闭资源
# await session.close()
# res.close()
# 方法二:利用异步上下文的方式释放请求资源
async with aiohttp.ClientSession() as session:
async with session.get(url,headers=headers) as res:
result=await res.text()
print(result)
if __name__ == '__main__':
asyncio.run(get_baidu())
2.3 并发操作
import asyncio
import aiohttp
# 回调函数:任务完成后打印返回结果
def download_completed_callback(test_obj):
print('下载的内容为:',test_obj.result())
async def baidu():
print('百度爬虫')
url='https://www.baidu.com'
async with aiohttp.ClientSession()as session:
async with session.get(url) as r:
return await r.text()
async def sougou():
print('搜狗爬虫')
url='https://www.sogou.com'
async with aiohttp.ClientSession()as session:
async with session.get(url) as r:
return await r.text()
async def jingdou():
print('京东爬虫')
url='https://www.jd.com'
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
return await r.text()
async def main():
# 创建多个Task,且添加回调函数
task_baidu=asyncio.create_task(baidu())
task_baidu.add_done_callback(download_completed_callback)
task_sougou=asyncio.create_task(sougou())
task_sougou.add_done_callback(download_completed_callback)
task_jingdou=asyncio.create_task(jingdou())
task_jingdou.add_done_callback(download_completed_callback)
tasks=[task_baidu,task_sougou,task_jingdou]
# 等待下载
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
练习:使用aiohttp抓取搜狗标题
网址:https://weixin.sogou.com/weixin?_sug_type_=1&type=2&query=python
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_sougou_info(page):
url = f'https://weixin.sogou.com/weixin?query=python&_sug_type_=1&type=2&page={page}&ie=utf8'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'
}
try:
async with aiohttp.ClientSession() as session:
# 修正参数传递方式,将 headers 作为关键字参数传递
async with session.get(url, headers=headers) as r:
r.encoding = 'utf-8' # 显式指定编码
# 调用 r.text() 方法获取响应文本 (这一步要注意)
text = await r.text()
soup = BeautifulSoup(text, 'lxml')
title_list = soup.select('div[class="txt-box"] a')
for i in title_list:
title = i.get_text()
print(title)
except aiohttp.ClientError as e:
print(f"请求发生错误: {e}")
except Exception as e:
print(f"发生其他错误: {e}")
async def main():
tasks = [asyncio.create_task(get_sougou_info(i)) for i in range(10)]
# 等待所有任务完成
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
3.aiomysql的使用
安装:
pip install aiomysql
利用 python3 中新加入的异步关键词 async/await,我们使用各种异步操作作为来执行各种异步的操作,如使用 aiohttp 来替代 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作 mogo 数据库,我们在开发同步的 python 程序时,我们会使用 PyMySQL来操作 mysql 数据库,同样,我们会使用 aiomysql 来异步操作 mysql 数据库。
aiomysql的简单使用:
import aiomysql
import asyncio
async def get_tx_work():
# # 1.创建数据库链接对象
# db=await aiomysql.connect(host='localhost',port=3306,user='root',password='123456',db='py_spider')
#
# # 2.创建游标对象
# cursor=await db.cursor()
#
# # 3.查询数据
# sql='select * from tx_work'
#
# await cursor.execute(sql)
# result=await cursor.fetchall()
# print(result)
#
# # 4.关闭资源
# await cursor.close()
# db.close()
# 方法二:利用异步上下文的方式释放请求资源
async with aiomysql.connect(host='localhost',port=3306,user='root',password='123456',db='py_spider')as db:
async with db.cursor()as cursor:
await cursor.execute('select * from tx_work;')
result=await cursor.fetchall()
print(result)
if __name__ == '__main__':
asyncio.run(get_tx_work())
通过异步爬虫完成数据存储(案例:二手车)
使用
asyncio
完成汽车之家的汽车参数信息并保存到mysql
数据库中网址:二手车_二手车之家
思路分析:
- 当前页面数据为静态数据,在翻页时
url
中的sp1
会变更为sp2
,所以当前页面可以使用xpath
提取数据。 - 通过首页进入到详情页有当前汽车的配置信息,汽车配置信息页中的数据是动态数据,可以使用抓包的方式获取
api
。 - 根据获取的
api
链接发现当前链接中存在查询字符串:specid
- 回到首页,在汽车列表中通过元素发现
li
标签中存在汽车的id
值,获取id
值拼接api
链接地址。 - 构造请求访问构造好的
api
地址获取数据。
注意点:
- 查看
api
接口返回的数据我们发现当前返回的数据类型并不是json
数据,需要对返回的数据进行处理。处理方式有以下两种:
-
- 拿到返回数据后进行字符串切片,保留
json
数据部分 - 将
api
链接中的callback=configTitle
查询字符串参数删除
- 拿到返回数据后进行字符串切片,保留
- 汽车之家页面编码格式会随机变换,需要使用
chardet
第三方包实时监测编码格式,并且当页面编码格式为UTF-8-SIG
时specid
数据不存在。
pip install chardet
import redis
import chardet # 用于判断返回的页面的编码集合,如果你请求频繁,则会返回404页面--ut-8。真正返回的页面 --gbk
import aiohttp
import asyncio
import aiomysql
import hashlib
from lxml import etree
from fake_useragent import UserAgent
class CarInfo():
redis_client=redis.Redis()
def __init__(self):
self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
self.detail_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
self.ua = UserAgent()
def __del__(self):
print('redis数据库连接即将关闭...')
self.redis_client.close()
async def get_car_specid(self, page, session,pool):
headers = {'User-Agent': self.ua.random}
async with session.get(self.url.format(page), headers=headers)as r:
content = await r.read()
encoding = chardet.detect(content)['encoding']
if encoding == 'UTF-8-SIG' or encoding == 'GB2312' or encoding == 'ISO-8859-1':
result = content.decode('gbk')
else:
result = content.decode(encoding)
print('被反爬了....')
html = etree.HTML(result)
specid_list = html.xpath('//ul[@class="viewlist_ul"]/li/@specid')
if specid_list:
tasks = [asyncio.create_task(self.get_car_info(specid, session,pool)) for specid in specid_list]
await asyncio.wait(tasks)
else:
print('id为空')
async def get_car_info(self, specid, session,pool):
headers = {'User-Agent': self.ua.random}
async with session.get(url=self.detail_url.format(specid), headers=headers)as r:
content = await r.json()
item = dict()
if content['result']['paramtypeitems']:
item['name'] = content['result']['paramtypeitems'][0]['paramitems'][0]['value']
item['price'] = content['result']['paramtypeitems'][0]['paramitems'][1]['value']
item['brand'] = content['result']['paramtypeitems'][0]['paramitems'][3]['value']
item['length'] = content['result']['paramtypeitems'][1]['paramitems'][0]['value']
item['breadth'] = content['result']['paramtypeitems'][1]['paramitems'][1]['value']
item['altitude'] = content['result']['paramtypeitems'][1]['paramitems'][2]['value']
await self.save_car_info(item,pool)
# 数据去重
@staticmethod
def hashlib(item_dict):
md5=hashlib.md5()
md5.update(str(item_dict).encode('utf-8'))
return md5.hexdigest()
# 保存数据
async def save_car_info(self,item,pool):
async with pool.acquire()as conn:
async with conn.cursor() as cursor:
md5_hash=self.hashlib(item)
# sadd 方法的返回值是一个整数,表示实际添加到集合中的新成员的数量。如果元素已经存在于集合中,sadd 方法不会重复添加,返回值为 0;如果元素是新添加到集合中的,返回值为 1。
redis_result=self.redis_client.sadd('car:filter',md5_hash)
if redis_result:
sql="""insert into car_info(id,name,price,brand,length,breadth,altitude)values (%s,%s,%s,%s,%s,%s,%s)"""
try:
await cursor.execute(sql,(0,item['name'],item['price'],item['brand'],item['length'],item['breadth'],item['altitude']))
await conn.commit()
print('插入成功')
except Exception as e:
print('插入失败',e)
else:
print('数据重复...')
async def main(self):
async with aiomysql.create_pool(user='root',password='123456',db='py_spider')as pool:
async with pool.acquire() as conn:
async with conn.cursor()as cursor:
sql= """
create table car_info(
id int primary key auto_increment,
name varchar(100),
price varchar(100),
brand varchar(100),
length varchar(100),
breadth varchar(100),
altitude varchar(100)
)
"""
# 在异步代码中必须先要检测表是否存在,直接使用if not 语句无效
check_table_query="show tables like 'car_info'"
result=await cursor.execute(check_table_query)# 如果表存在返回1,不存在返回0
if not result:
await cursor.execute(sql)
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(self.get_car_specid(i, session,pool)) for i in range(10)]
await asyncio.wait(tasks)
if __name__ == '__main__':
car = CarInfo()
asyncio.run(car.main())
4.使用多线程完成并发爬虫
在上一小节中我们使用了asyncio
的方式完成了并发爬虫,但是大多数时候最常用的还是基于多线程的方式来完成爬虫需求。
import requests
import threading
from lxml import etree
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_movie_info(page):
response = requests.get(url.format(page * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
print(result)
if __name__ == '__main__':
thread_obj_list = [threading.Thread(target=get_movie_info, args=(page,)) for page in range(10)]
for thread_obj in thread_obj_list:
thread_obj.start()
5.使用线程池完成并发爬虫
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor, as_completed
url = 'https://movie.douban.com/top250?start={}&filter='
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_movie_info(page):
response = requests.get(url.format(page * 25), headers=headers).text
tree = etree.HTML(response)
result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
return result
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5) as pool:
futures = [pool.submit(get_movie_info, page) for page in range(10)]
# future对象获取返回值会造成主线程堵塞
# for future in futures:
# print(future.result())
# as_completed会立即返回处理完成的结果而不会堵塞主线程
for future in as_completed(futures):
print(future.result())
6.使用多进程完成并发爬虫
因为在Python
中存在GIL
锁,无法充分利用多核优势。所以为了能够提高程序运行效率我们也会采用进程的方式来完成代码需求。
进程的使用:
from multiprocessing import Process
# 创建进程对象
p = Process(target=func, args=(,))
# 设置守护进程
p.daemon = True
# 启动进程
p.start()
进程中的队列:
多进程中使用普通的队列模块会发生阻塞,对应的需要使用multiprocessing
提供的JoinableQueue
模块,其使用过程和在线程中使用的queue
方法相同。
接下来我们通过腾讯招聘代码案例学习如何在进程中使用JoinableQueue
队列模块。
页面地址:搜索 | 腾讯招聘
import time
import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue
url = 'https://careers.tencent.com/tencentcareer/api/post/Query'
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
def get_work_info_json(page_num, queue):
params = {
'timestamp': 1696774900608,
'countryId': '',
'cityId': '',
'bgIds': '',
'productId': '',
'categoryId': '',
'parentCategoryId': '',
'attrId': '',
'keyword': 'python',
'pageIndex': page_num,
'pageSize': 10,
'language': 'zh-cn',
'area': 'cn'
}
response = requests.get(url, headers=headers, params=params).json()
# 在某些页面中不存在当前的json数据会跑出异常
try:
for info in response['Data']['Posts']:
work_info_dict = dict()
work_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
work_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
work_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
work_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
work_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
work_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]
queue.put(work_info_dict)
except TypeError:
print('数据不存在:', params.get('pageIndex'))
def save_work_info(queue):
mongo_client = pymongo.MongoClient()
collection = mongo_client['py_spider']['tx_work']
while True:
dict_data = queue.get()
print(dict_data)
collection.insert_one(dict_data)
# 计数器减1, 为0解堵塞
queue.task_done()
if __name__ == '__main__':
dict_data_queue = Queue()
# 创建进程对象列表
process_list = list()
for page in range(1, 50):
p_get_info = Process(target=get_work_info_json, args=(page, dict_data_queue))
process_list.append(p_get_info)
# get_work_info_json不是无限循环任务, 无需设置守护进程直接启动即可
for process_obj in process_list:
process_obj.start()
# save_work_info是无限循环任务, 则需要设置为守护进程让主进程可以正常退出
p_save_work = Process(target=save_work_info, args=(dict_data_queue,))
p_save_work.daemon = True
p_save_work.start()
# 让主进程等待有限任务执行完毕
for process_obj in process_list:
process_obj.join()
# 等待队列任务完成
dict_data_queue.join()
print('爬虫任务完成...')
7.并发爬虫的实战
7.1 使用多线程抓取爱奇艺视频信息
网站地址:内地电视剧大全-好看的内地电视剧排行榜-爱奇艺
# title playUrl description
import requests
import pymongo
import threading
from fake_useragent import UserAgent
from queue import Queue
ua=UserAgent()
class AiQiYi_Info():
def __init__(self):
self.mongo_client=pymongo.MongoClient(host='localhost',port=27017)
self.collection=self.mongo_client['py_spider']['AiQiYi_info']
self.api_url='https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=4add98a45400dbbaf9c51312d0bb2a5f&three_category_id=15;must'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
# 创建队列
self.url_queue=Queue()
self.json_queue=Queue()
self.dict_queue=Queue()
def get_url(self):
for page in range(10):
self.url_queue.put(self.api_url.format(page))
def get_response(self):
while True:
url=self.url_queue.get()
res=requests.get(url=url,headers=self.headers)
self.json_queue.put(res.json())
self.url_queue.task_done()
def parse_vedio_info(self):
while True:
data=self.json_queue.get()['data']['list']
for i in data:
item = dict()
item['title']=i['name']
item['playUrl']=i['playUrl']
item['description']=i['description']
self.dict_queue.put(item)
self.json_queue.task_done()
def save_vedio_info(self):
while True:
item = self.dict_queue.get()
if item is None:
break
try:
# 移除手动设置的 _id 字段
if '_id' in item:
del item['_id']
self.collection.insert_one(item)
print(f"插入成功: {item}")
except Exception as e:
print(f"插入失败: {e}")
self.dict_queue.task_done()
def main(self):
# 初始化线程对象列表
thread_obj_list=[]
# 有限循环任务无需使用守护线程,直接启动即可
t_url=threading.Thread(target=self.get_url)
t_url.start()
t_url.join()
# 创建发送请求的线程对象并加入到线程对象列表中
for _ in range(3):
t_get_json = threading.Thread(target=self.get_response)
thread_obj_list.append(t_get_json)
# 创建数据提取的线程对象并加入到线程对象列表中
t_parse_info=threading.Thread(target=self.parse_vedio_info)
thread_obj_list.append(t_parse_info)
# 创建保存数据的线程对象并加入到线程对象列表中
t_save_info=threading.Thread(target=self.save_vedio_info)
thread_obj_list.append(t_save_info)
# 循环线程列表设置线程对象为守护线程并启动
for t_obj in thread_obj_list:
t_obj.daemon=True
t_obj.start()
# 判断所有队列中的计数器是否为零,如果为零则退出程序,否则让主线程堵塞
for q in [self.url_queue,self.json_queue,self.dict_queue]:
q.join()
print('主线程结束')
if __name__ == '__main__':
a=AiQiYi_Info()
a.main()
7.2 使用线程池完成百度招聘信息
网站地址:百度校园招聘
注意点:当前网站
api
请求方式为post
这个网站有反爬,打开开发者工具则会跳转到一个空白界面。解决这个问题的方法:
步骤一:请复制网址,打开无痕浏览器,在不进行搜索操作的前提下,先调出开发者工具。然后在开发者工具的源代码界面中,找到并勾选中脚本选项,具体操作可参照下图所示步骤进行:
步骤二:请复制空白页的网址,接着打开无痕窗口。在无痕窗口中发送请求,此时若发现有断点卡住,不要进行其他操作。然后使用快捷键“Ctrl+F”,在该页面进行全局搜索,输入之前所复制的空白页网址,具体操作步骤可参考下图:
步骤三:当发现调试程序处于暂停状态时,点击显示为蓝色的“继续执行脚本”按钮(即已在调试程序中暂停的蓝色按钮)。程序继续运行后,会逐步跳转相关的文件,在此过程中持续留意跳转情况,不断重复点击该蓝色按钮让程序继续执行,直至跳转到包含之前所复制空白页网址的 JavaScript(js)文件中,如下图所示:
步骤四:如下图所示
import requests
import pymysql
from dbutils.pooled_db import PooledDB
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor,as_completed
class BuDu_recruit():
def __init__(self):
self.pool=PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
maxcached=5, # 连接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享
blocking=True, # 连接池中如果没有可用的链接后,是否阻塞等待。False,不等待直接报错,等待直到有可用链接,再返回。
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='py_spider',
charset='utf8'
)
self.url='https://talent.baidu.com/httservice/getPostListNew'
self.ua=UserAgent()
self.headers={
'User-Agent':self.ua.random,
'referer':'https://talent.baidu.com/jobs/social-list?',
}
def get_recruit_info(self,page):
data={
'recruitType':'SOCIAL',
'pageSize':'10',
'keyWord':'python',
'curPage':f'{page}',
'projectType':'',
}
res=requests.post(url=self.url,headers=self.headers,data=data).json()
return res
def parse_recruit_info(self,res):
info_list=res['data']['list']
for info in info_list:
education=info['education']
name=info['name']
service_condition=info['serviceCondition']
self.save_recruit_info(education,name,service_condition)
def create_table(self):
with self.pool.connection()as conn:
with conn.cursor()as cursor:
sql="""
create table if not exists baidu_recruit_info(
id int primary key auto_increment,
educate varchar(50),
name varchar(200),
service_condition text
)
"""
try:
cursor.execute(sql)
print('表创建成功')
except Exception as e:
print('表创建失败',e)
def save_recruit_info(self,education,name,server_name):
with self.pool.connection() as conn:
with conn.cursor()as cursor:
sql = """insert into baidu_recruit_info(educate, name, service_condition) values (%s, %s, %s)"""
try:
cursor.execute(sql,(education,name,server_name))
print('插入成功:',(education,name,server_name))
conn.commit()# 提交事务
except pymysql.Error as e:
print(f'插入失败,错误代码: {e.args[0]}, 错误信息: {e.args[1]}')
conn.rollback()
def main(self):
self.create_table()
with ThreadPoolExecutor(max_workers=5) as pool:
futures=[pool.submit(self.get_recruit_info,page) for page in range(1,38)]
"""futures.result()方法会造成主线程堵塞需要使用as_completed转为非阻塞"""
for future in as_completed(futures):
pool.submit(self.parse_recruit_info,future.result())
if __name__ == '__main__':
bd=BuDu_recruit()
bd.main()
7.3使用多进程抓取芒果视频信息
网站网址:芒果TV
注意:在多进程环境中,数据库连接对象不能在__init__
方法中执行,会出现序列化失败的问题,需要将连接方法放置在类属性中。
import requests
import pymongo
import redis
import hashlib
from multiprocessing import Process,JoinableQueue as Queue
class MGTV():
# 进程环境下链接对象需要声明成类属性
redis_client=redis.Redis()
mogo_client=pymongo.MongoClient(host='localhost',port=27017)
collections=mogo_client['py_spider']['tv_info']
def __init__(self):
self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
self.params_queue=Queue()
self.json_queue=Queue()
self.dict_queue=Queue()
def get_params(self):
for page in range(6):
params={
'allowedRC':'1',
'platform':'pcweb',
'channelId':'2',
'pn':f'{page}',
'pc':'80',
'hudong':'1',
'_support':'10000000',
'kind':'19',
'area':'10',
'year':'all',
'chargeInfo':'a1',
'sort':'c2',
'feature':'all',
}
self.params_queue.put(params)
def get_tv_info(self):
while True:
params=self.params_queue.get()
res=requests.get(url=self.url,headers=self.headers,params=params)
self.json_queue.put(res.json())
self.params_queue.task_done()
def parse_tv_info(self):
while True:
tv_json=self.json_queue.get()
for i in tv_json['data']['hitDocs']:
item = dict()
item['title']=i['title']
item['subtitle']=i['subtitle']
item['story']=i['story']
self.dict_queue.put(item)
self.json_queue.task_done()
# 数据去重
@staticmethod
def get_haslib(dict_item):
md5=hashlib.md5()
md5.update(str(dict_item).encode('utf-8'))
return md5.hexdigest()
def save_tv_info(self):
while True:
item=self.dict_queue.get()
md5_haslib=self.get_haslib(item)
redis_result=self.redis_client.sadd('tv:filter',md5_haslib)
if redis_result:
# 确保插入的数据没有 _id 字段
# item.pop('_id', None)
self.collections.insert_one(item)
print('保存成功:',item)
else:
print('数据重复')
self.dict_queue.task_done()
def close(self):
print('爬虫即将退出,准备关闭数据库连接')
self.mogo_client.close()
self.redis_client.close()
def main(self):
process_list=[]
get_params=Process(target=self.get_params)
get_params.start()
for _ in range(3):
get_tv_info=Process(target=self.get_tv_info)
process_list.append(get_tv_info)
parse_tv_info=Process(target=self.parse_tv_info)
process_list.append(parse_tv_info)
save_tv_info=Process(target=self.save_tv_info)
process_list.append(save_tv_info)
for process_obj in process_list:
process_obj.daemon=True
process_obj.start()
# 让主进程等待有限任务执行完毕后解堵塞
get_params.join()
# 如果队列中的任务没有完成则放在住进程退出
for q in [self.params_queue,self.json_queue,self.dict_queue]:
q.join()
self.close()
if __name__ == '__main__':
tv=MGTV()
tv.main()
7.4使用进程池抓取芒果视频信息
import requests
import pymongo
import redis
import hashlib
from concurrent.futures import ProcessPoolExecutor,as_completed
class MGTV():
# 进程环境下链接对象需要声明成类属性
redis_client=redis.Redis()
mogo_client=pymongo.MongoClient(host='localhost',port=27017)
collections=mogo_client['py_spider']['tv_info']
def __init__(self):
self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&'
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
def get_params(self):
for page in range(6):
params_list=[]
params={
'allowedRC':'1',
'platform':'pcweb',
'channelId':'2',
'pn':f'{page}',
'pc':'80',
'hudong':'1',
'_support':'10000000',
'kind':'19',
'area':'10',
'year':'all',
'chargeInfo':'a1',
'sort':'c2',
'feature':'all',
}
params_list.append(params)
return params_list
def get_tv_info(self,params):
res=requests.get(url=self.url,headers=self.headers,params=params)
return res.json()
@staticmethod
def parse_tv_info(res):
items=[]
for i in res['data']['hitDocs']:
item =dict()
item['title']=i['title']
item['subtitle']=i['subtitle']
item['story']=i['story']
items.append(item)
return items
# 数据去重
@staticmethod
def get_haslib(dict_item):
md5=hashlib.md5()
md5.update(str(dict_item).encode('utf-8'))
return md5.hexdigest()
def save_tv_info(self,items):
for item in items:
md5_haslib=self.get_haslib(item)
redis_result=self.redis_client.sadd('tv:filter',md5_haslib)
if redis_result:
# 确保插入的数据没有 _id 字段
# item.pop('_id', None)
self.collections.insert_one(item)
print('保存成功:',item)
else:
print('数据重复')
def close(self):
print('爬虫即将退出,准备关闭数据库连接')
self.mogo_client.close()
self.redis_client.close()
def main(self):
params_list=self.get_params()
with ProcessPoolExecutor(max_workers=5) as executor:
# 提交请求数据的任务
future_to_params={executor.submit(self.get_tv_info,params): params for params in params_list}
# 处理请求数据的结果
for future in as_completed(future_to_params):
res=future.result()
items=self.parse_tv_info(res)
self.save_tv_info(items)
self.close()
if __name__ == '__main__':
tv=MGTV()
tv.main()
7.5使用协程完成王者荣耀英雄图片下载
采集王者荣耀官网中所有英雄的图片信息
网站地址:英雄资料列表页-英雄介绍-王者荣耀官方网站-腾讯游戏
import os
import aiofile
import aiohttp
import asyncio
class HeroSkin:
def __init__(self):
self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
self.headers = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
async def get_image_content(self, session, e_name, c_name):
# 因为不确定每个英雄具体的皮肤数量, 所以设置一个超出英雄皮肤数量的最大值
# 根据链接状态码判断是否请求成功, 如果请求失败则终止循环并获取下一个英雄的皮肤图片
for skin_id in range(1, 31):
async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
if response.status == 200:
content = await response.read()
async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
await f.write(content)
print('保存成功:', c_name)
else:
break
async def main(self):
tasks = list()
async with aiohttp.ClientSession() as session:
async with session.get(self.json_url, headers=self.headers) as response:
result = await response.json(content_type=None)
for item in result:
e_name = item['ename']
c_name = item['cname']
# print(e_name, c_name)
coro_obj = self.get_image_content(session, e_name, c_name)
tasks.append(asyncio.create_task(coro_obj))
await asyncio.wait(tasks)
if __name__ == '__main__':
if not os.path.exists('./images'):
os.mkdir('./images')
hero_skin = HeroSkin()
asyncio.run(hero_skin.main())