五、并发爬虫

本节聚焦于使用协程、线程、进程实现并发爬虫任务。 Python 线程受全局解释器锁(GIL)制约,同一时刻仅能执行一个线程,无法充分利用多核 CPU 优势,且频繁切换线程会增加开销,影响爬虫性能。 协程是轻量级线程,由程序自主调度,可在 I/O 等待时切换任务,避免线程切换开销,提升 CPU 利用率与爬虫并发性能,能弥补线程性能不足。 除协程外,线程池和进程池也是实现并发的有效方式。线程池预先创建线程,减少创建与销毁开销;进程池创建多个进程,利用多核 CPU 优势,适合 CPU 密集型爬虫任务。 接下来,我们会通过代码示例展开介绍。

1.asyncio结合requests完成爬虫任务

requests 是 python 中的同步网络爬虫库,并不能直接使用asyncio运行。所以我们使用 asyncio 中的 run_in_executor 方法创建线程池完成并发。

"""
requests同步爬虫库:本身不支持协程环境
"""

import asyncio
import requests
from functools import partial
from bs4 import BeautifulSoup

url='https://movie.douban.com/top250?start={}&filter='
headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}


# 异步函数,用于获取电影信息
async def get_movie_info(page_num):
    # 创建一个事件循环对象
    loop=asyncio.get_running_loop()
    # 使用线程池执行 requests.get请求
    # partial 是偏函数,用于固定函数的部分参数
    res=await loop.run_in_executor(None,partial(requests.get,url.format(page_num*25),headers))
    soup = BeautifulSoup(res.text, 'lxml')
    div_list = soup.find_all('div', class_='hd')
    for title in div_list:
        print(title.get_text())
# 主函数,是哦那个asyncio并发执行多个任务
async def main():
    # 创建任务列表,每个任务对应一个页面的电影信息获取
    tasks=[asyncio.create_task(get_movie_info(i)) for i in range(10)]
    # 等待所有任务完成
    await asyncio.wait(tasks)

if __name__ == '__main__':
    # 使用asyncio.run运行主函数
    asyncio.run(main())

注意点:

1. partial(requests.get,url.format(page*25),headers=headers)等同以下代码

  • requests.get(url.format(page*25),headers=headers)

2.functools.partial:

  • 可以帮你创建一个新的函数,这个函数在调用时会自动将某些参数给原函数

2.使用aiohttp完成爬虫任务

由于 requests 爬虫库本身不支持异步,在 asyncio 中需要开启线程池才能使用。在使用上稍微有些麻烦,为了解决这个问题,我们使用支持异步操作的 aiohttp 来完成爬虫任务。

2.1 介绍与安装

aiohttp 是一个异步的网络库,可以实现 HTTP 客户端,也可以实现 HTTP 服务器,爬虫阶段我们只能用它来实现 HTTP 客户端功能。

官网:https://docs.aiohttp.org/en/stable/

aiohttp 客户端相关的官方文档:

https://docs.aiohttp.org/en/stable/client.html#aiohttp-client

pip install aiohttp -i https://pypi.tuna.tsinghua.edu.cn/simple

2.2 基本使用

import asyncio
import aiohttp

url = 'https://www.baidu.com'
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}


async def get_baidu():
    # 方法一:
    # 1.创建请求对象
    # session = aiohttp.ClientSession()

    # 2.使用请求对象发送网络请求 - get(get方法回返回一个响应对象)
    # res = await session.get(url, headers=headers)

    # 3.获取响应对象中的结果
    # 协程函数需要await来获取返回的结果
    # result = await res.text()
    # print(result)

    # 4.关闭资源
    # await session.close()
    # res.close()

    # 方法二:利用异步上下文的方式释放请求资源
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers) as res:
            result=await res.text()
            print(result)

if __name__ == '__main__':
    asyncio.run(get_baidu())

2.3 并发操作

import asyncio
import aiohttp

# 回调函数:任务完成后打印返回结果
def download_completed_callback(test_obj):
    print('下载的内容为:',test_obj.result())

async def baidu():
    print('百度爬虫')
    url='https://www.baidu.com'
    async with aiohttp.ClientSession()as session:
        async with session.get(url) as r:
            return await r.text()

async def sougou():
    print('搜狗爬虫')
    url='https://www.sogou.com'
    async with aiohttp.ClientSession()as session:
        async with session.get(url) as r:
            return await r.text()

async def jingdou():
    print('京东爬虫')
    url='https://www.jd.com'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as r:
            return await r.text()
async def main():
    # 创建多个Task,且添加回调函数
    task_baidu=asyncio.create_task(baidu())
    task_baidu.add_done_callback(download_completed_callback)

    task_sougou=asyncio.create_task(sougou())
    task_sougou.add_done_callback(download_completed_callback)

    task_jingdou=asyncio.create_task(jingdou())
    task_jingdou.add_done_callback(download_completed_callback)

    tasks=[task_baidu,task_sougou,task_jingdou]
    # 等待下载
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

练习:使用aiohttp抓取搜狗标题

网址:https://weixin.sogou.com/weixin?_sug_type_=1&type=2&query=python

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def get_sougou_info(page):
    url = f'https://weixin.sogou.com/weixin?query=python&_sug_type_=1&type=2&page={page}&ie=utf8'
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'
    }
    try:
        async with aiohttp.ClientSession() as session:
            # 修正参数传递方式,将 headers 作为关键字参数传递
            async with session.get(url, headers=headers) as r:
                r.encoding = 'utf-8'  # 显式指定编码
                # 调用 r.text() 方法获取响应文本 (这一步要注意)
                text = await r.text()
                soup = BeautifulSoup(text, 'lxml')
                title_list = soup.select('div[class="txt-box"] a')
                for i in title_list:
                    title = i.get_text()
                    print(title)
    except aiohttp.ClientError as e:
        print(f"请求发生错误: {e}")
    except Exception as e:
        print(f"发生其他错误: {e}")

async def main():
    tasks = [asyncio.create_task(get_sougou_info(i)) for i in range(10)]
    # 等待所有任务完成
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

3.aiomysql的使用

安装:

pip install aiomysql

利用 python3 中新加入的异步关键词 async/await,我们使用各种异步操作作为来执行各种异步的操作,如使用 aiohttp 来替代 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作 mogo 数据库,我们在开发同步的 python 程序时,我们会使用 PyMySQL来操作 mysql 数据库,同样,我们会使用 aiomysql 来异步操作 mysql 数据库。

aiomysql的简单使用:

import aiomysql
import asyncio

async def get_tx_work():
    # # 1.创建数据库链接对象
    # db=await aiomysql.connect(host='localhost',port=3306,user='root',password='123456',db='py_spider')
    #
    # # 2.创建游标对象
    # cursor=await db.cursor()
    #
    # # 3.查询数据
    # sql='select * from tx_work'
    #
    # await cursor.execute(sql)
    # result=await cursor.fetchall()
    # print(result)
    #
    # # 4.关闭资源
    # await cursor.close()
    # db.close()

    # 方法二:利用异步上下文的方式释放请求资源
    async with aiomysql.connect(host='localhost',port=3306,user='root',password='123456',db='py_spider')as db:
        async with db.cursor()as cursor:
            await cursor.execute('select * from tx_work;')
            result=await cursor.fetchall()
            print(result)

if __name__ == '__main__':
    asyncio.run(get_tx_work())

通过异步爬虫完成数据存储(案例:二手车)

使用asyncio完成汽车之家的汽车参数信息并保存到mysql数据库中

网址:二手车_二手车之家

思路分析:

  1. 当前页面数据为静态数据,在翻页时url中的sp1会变更为sp2,所以当前页面可以使用xpath提取数据。
  2. 通过首页进入到详情页有当前汽车的配置信息,汽车配置信息页中的数据是动态数据,可以使用抓包的方式获取api
  3. 根据获取的api链接发现当前链接中存在查询字符串:specid
  4. 回到首页,在汽车列表中通过元素发现li标签中存在汽车的id值,获取id值拼接api链接地址。
  5. 构造请求访问构造好的api地址获取数据。

注意点:

  • 查看api接口返回的数据我们发现当前返回的数据类型并不是json数据,需要对返回的数据进行处理。处理方式有以下两种:
    • 拿到返回数据后进行字符串切片,保留json数据部分
    • api链接中的callback=configTitle查询字符串参数删除
  • 汽车之家页面编码格式会随机变换,需要使用chardet第三方包实时监测编码格式,并且当页面编码格式为UTF-8-SIGspecid数据不存在。
pip install chardet
import redis
import chardet  # 用于判断返回的页面的编码集合,如果你请求频繁,则会返回404页面--ut-8。真正返回的页面 --gbk
import aiohttp
import asyncio
import aiomysql
import hashlib
from lxml import etree
from fake_useragent import UserAgent


class CarInfo():
    redis_client=redis.Redis()

    def __init__(self):
        self.url = 'https://www.che168.com/china/a0_0msdgscncgpi1ltocsp{}exf4x0/?pvareaid=102179#currengpostion'
        self.detail_url = 'https://cacheapigo.che168.com/CarProduct/GetParam.ashx?specid={}'
        self.ua = UserAgent()


    def __del__(self):
        print('redis数据库连接即将关闭...')
        self.redis_client.close()

    async def get_car_specid(self, page, session,pool):
        headers = {'User-Agent': self.ua.random}
        async with session.get(self.url.format(page), headers=headers)as r:
            content = await r.read()
            encoding = chardet.detect(content)['encoding']
            if encoding == 'UTF-8-SIG' or encoding == 'GB2312' or encoding == 'ISO-8859-1':
                result = content.decode('gbk')

            else:
                result = content.decode(encoding)
                print('被反爬了....')
            html = etree.HTML(result)
            specid_list = html.xpath('//ul[@class="viewlist_ul"]/li/@specid')
            if specid_list:
                tasks = [asyncio.create_task(self.get_car_info(specid, session,pool)) for specid in specid_list]
                await asyncio.wait(tasks)
            else:
                print('id为空')

    async def get_car_info(self, specid, session,pool):
        headers = {'User-Agent': self.ua.random}
        async with session.get(url=self.detail_url.format(specid), headers=headers)as r:
            content = await r.json()
            item = dict()
            if content['result']['paramtypeitems']:
                item['name'] = content['result']['paramtypeitems'][0]['paramitems'][0]['value']
                item['price'] = content['result']['paramtypeitems'][0]['paramitems'][1]['value']
                item['brand'] = content['result']['paramtypeitems'][0]['paramitems'][3]['value']
                item['length'] = content['result']['paramtypeitems'][1]['paramitems'][0]['value']
                item['breadth'] = content['result']['paramtypeitems'][1]['paramitems'][1]['value']
                item['altitude'] = content['result']['paramtypeitems'][1]['paramitems'][2]['value']
                await self.save_car_info(item,pool)

    # 数据去重
    @staticmethod
    def hashlib(item_dict):
        md5=hashlib.md5()
        md5.update(str(item_dict).encode('utf-8'))
        return md5.hexdigest()

    # 保存数据
    async def save_car_info(self,item,pool):
        async with pool.acquire()as conn:
            async with conn.cursor() as cursor:
                md5_hash=self.hashlib(item)
                # sadd 方法的返回值是一个整数,表示实际添加到集合中的新成员的数量。如果元素已经存在于集合中,sadd 方法不会重复添加,返回值为 0;如果元素是新添加到集合中的,返回值为 1。
                redis_result=self.redis_client.sadd('car:filter',md5_hash)
                if redis_result:
                    sql="""insert into car_info(id,name,price,brand,length,breadth,altitude)values (%s,%s,%s,%s,%s,%s,%s)"""
                    try:
                        await cursor.execute(sql,(0,item['name'],item['price'],item['brand'],item['length'],item['breadth'],item['altitude']))
                        await conn.commit()
                        print('插入成功')
                    except Exception as e:
                        print('插入失败',e)
                else:
                    print('数据重复...')

    async def main(self):
        async with aiomysql.create_pool(user='root',password='123456',db='py_spider')as pool:
            async with pool.acquire() as conn:
                async with conn.cursor()as cursor:
                    sql= """
                            create table car_info(
                                id int primary key auto_increment,
                                name varchar(100),
                                price varchar(100),
                                brand varchar(100),
                                length varchar(100),
                                breadth varchar(100),
                                altitude varchar(100)
                            )
                        """
                    # 在异步代码中必须先要检测表是否存在,直接使用if not 语句无效
                    check_table_query="show tables like 'car_info'"
                    result=await cursor.execute(check_table_query)# 如果表存在返回1,不存在返回0
                    if not result:
                        await cursor.execute(sql)

            async with  aiohttp.ClientSession() as session:
                tasks = [asyncio.create_task(self.get_car_specid(i, session,pool)) for i in range(10)]
                await asyncio.wait(tasks)


if __name__ == '__main__':
    car = CarInfo()
    asyncio.run(car.main())

4.使用多线程完成并发爬虫

在上一小节中我们使用了asyncio的方式完成了并发爬虫,但是大多数时候最常用的还是基于多线程的方式来完成爬虫需求。

import requests
import threading
from lxml import etree

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_movie_info(page):
    response = requests.get(url.format(page * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    print(result)


if __name__ == '__main__':
    thread_obj_list = [threading.Thread(target=get_movie_info, args=(page,)) for page in range(10)]
    for thread_obj in thread_obj_list:
        thread_obj.start()

5.使用线程池完成并发爬虫

import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor, as_completed

url = 'https://movie.douban.com/top250?start={}&filter='

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_movie_info(page):
    response = requests.get(url.format(page * 25), headers=headers).text
    tree = etree.HTML(response)
    result = tree.xpath("//div[@class='hd']/a/span[1]/text()")
    return result


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5) as pool:
        futures = [pool.submit(get_movie_info, page) for page in range(10)]
        # future对象获取返回值会造成主线程堵塞
        # for future in futures:
        #     print(future.result())

        # as_completed会立即返回处理完成的结果而不会堵塞主线程
        for future in as_completed(futures):
            print(future.result())

6.使用多进程完成并发爬虫

因为在Python中存在GIL锁,无法充分利用多核优势。所以为了能够提高程序运行效率我们也会采用进程的方式来完成代码需求。

进程的使用:

from multiprocessing import Process


# 创建进程对象
p = Process(target=func, args=(,))

# 设置守护进程
p.daemon = True

# 启动进程
p.start()

进程中的队列:

多进程中使用普通的队列模块会发生阻塞,对应的需要使用multiprocessing提供的JoinableQueue模块,其使用过程和在线程中使用的queue方法相同。

接下来我们通过腾讯招聘代码案例学习如何在进程中使用JoinableQueue队列模块。

页面地址:搜索 | 腾讯招聘

import time
import pymongo
import requests
import jsonpath
from multiprocessing import Process, JoinableQueue as Queue

url = 'https://careers.tencent.com/tencentcareer/api/post/Query'

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}


def get_work_info_json(page_num, queue):
    params = {
        'timestamp': 1696774900608,
        'countryId': '',
        'cityId': '',
        'bgIds': '',
        'productId': '',
        'categoryId': '',
        'parentCategoryId': '',
        'attrId': '',
        'keyword': 'python',
        'pageIndex': page_num,
        'pageSize': 10,
        'language': 'zh-cn',
        'area': 'cn'
    }

    response = requests.get(url, headers=headers, params=params).json()
    # 在某些页面中不存在当前的json数据会跑出异常
    try:
        for info in response['Data']['Posts']:
            work_info_dict = dict()
            work_info_dict['recruit_post_name'] = jsonpath.jsonpath(info, '$..RecruitPostName')[0]
            work_info_dict['country_name'] = jsonpath.jsonpath(info, '$..CountryName')[0]
            work_info_dict['location_name'] = jsonpath.jsonpath(info, '$..LocationName')[0]
            work_info_dict['category_name'] = jsonpath.jsonpath(info, '$..CategoryName')[0]
            work_info_dict['responsibility'] = jsonpath.jsonpath(info, '$..Responsibility')[0]
            work_info_dict['last_update_time'] = jsonpath.jsonpath(info, '$..LastUpdateTime')[0]

            queue.put(work_info_dict)
    except TypeError:
        print('数据不存在:', params.get('pageIndex'))


def save_work_info(queue):
    mongo_client = pymongo.MongoClient()
    collection = mongo_client['py_spider']['tx_work']
    while True:
        dict_data = queue.get()
        print(dict_data)
        collection.insert_one(dict_data)
        # 计数器减1, 为0解堵塞
        queue.task_done()


if __name__ == '__main__':
    dict_data_queue = Queue()
    # 创建进程对象列表
    process_list = list()

    for page in range(1, 50):
        p_get_info = Process(target=get_work_info_json, args=(page, dict_data_queue))
        process_list.append(p_get_info)

    # get_work_info_json不是无限循环任务, 无需设置守护进程直接启动即可
    for process_obj in process_list:
        process_obj.start()

    # save_work_info是无限循环任务, 则需要设置为守护进程让主进程可以正常退出
    p_save_work = Process(target=save_work_info, args=(dict_data_queue,))
    p_save_work.daemon = True
    p_save_work.start()

    # 让主进程等待有限任务执行完毕
    for process_obj in process_list:
        process_obj.join()

    # 等待队列任务完成
    dict_data_queue.join()
    print('爬虫任务完成...')

7.并发爬虫的实战

7.1 使用多线程抓取爱奇艺视频信息

网站地址:内地电视剧大全-好看的内地电视剧排行榜-爱奇艺

# title playUrl description

import requests
import pymongo
import threading
from fake_useragent import UserAgent
from queue import Queue

ua=UserAgent()


class AiQiYi_Info():
    def __init__(self):
        self.mongo_client=pymongo.MongoClient(host='localhost',port=27017)
        self.collection=self.mongo_client['py_spider']['AiQiYi_info']
        self.api_url='https://pcw-api.iqiyi.com/search/recommend/list?channel_id=2&data_type=1&mode=11&page_id={}&ret_num=48&session=4add98a45400dbbaf9c51312d0bb2a5f&three_category_id=15;must'
        self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}

        # 创建队列
        self.url_queue=Queue()
        self.json_queue=Queue()
        self.dict_queue=Queue()

    def get_url(self):
        for page in range(10):
            self.url_queue.put(self.api_url.format(page))

    def get_response(self):
        while True:
            url=self.url_queue.get()
            res=requests.get(url=url,headers=self.headers)
            self.json_queue.put(res.json())
            self.url_queue.task_done()

    def parse_vedio_info(self):
        while True:
            data=self.json_queue.get()['data']['list']
            for i in data:
                item = dict()
                item['title']=i['name']
                item['playUrl']=i['playUrl']
                item['description']=i['description']
                self.dict_queue.put(item)
            self.json_queue.task_done()

    def save_vedio_info(self):
        while True:
            item = self.dict_queue.get()
            if item is None:
                break
            try:
                # 移除手动设置的 _id 字段
                if '_id' in item:
                    del item['_id']
                self.collection.insert_one(item)
                print(f"插入成功: {item}")
            except Exception as e:
                print(f"插入失败: {e}")
            self.dict_queue.task_done()

    def main(self):
        # 初始化线程对象列表
        thread_obj_list=[]

        # 有限循环任务无需使用守护线程,直接启动即可
        t_url=threading.Thread(target=self.get_url)
        t_url.start()
        t_url.join()

        # 创建发送请求的线程对象并加入到线程对象列表中
        for _ in range(3):
            t_get_json = threading.Thread(target=self.get_response)
            thread_obj_list.append(t_get_json)

        # 创建数据提取的线程对象并加入到线程对象列表中
        t_parse_info=threading.Thread(target=self.parse_vedio_info)
        thread_obj_list.append(t_parse_info)

        # 创建保存数据的线程对象并加入到线程对象列表中
        t_save_info=threading.Thread(target=self.save_vedio_info)
        thread_obj_list.append(t_save_info)

        # 循环线程列表设置线程对象为守护线程并启动
        for t_obj in thread_obj_list:
            t_obj.daemon=True
            t_obj.start()

        # 判断所有队列中的计数器是否为零,如果为零则退出程序,否则让主线程堵塞
        for q in [self.url_queue,self.json_queue,self.dict_queue]:
            q.join()
        print('主线程结束')


if __name__ == '__main__':
    a=AiQiYi_Info()
    a.main()

7.2 使用线程池完成百度招聘信息

网站地址:百度校园招聘

注意点:当前网站api请求方式为post

这个网站有反爬,打开开发者工具则会跳转到一个空白界面。解决这个问题的方法:

步骤一:请复制网址,打开无痕浏览器,在不进行搜索操作的前提下,先调出开发者工具。然后在开发者工具的源代码界面中,找到并勾选中脚本选项,具体操作可参照下图所示步骤进行:

 步骤二:请复制空白页的网址,接着打开无痕窗口。在无痕窗口中发送请求,此时若发现有断点卡住,不要进行其他操作。然后使用快捷键“Ctrl+F”,在该页面进行全局搜索,输入之前所复制的空白页网址,具体操作步骤可参考下图:

步骤三:当发现调试程序处于暂停状态时,点击显示为蓝色的“继续执行脚本”按钮(即已在调试程序中暂停的蓝色按钮)。程序继续运行后,会逐步跳转相关的文件,在此过程中持续留意跳转情况,不断重复点击该蓝色按钮让程序继续执行,直至跳转到包含之前所复制空白页网址的 JavaScript(js)文件中,如下图所示:

 步骤四:如下图所示

import requests
import pymysql
from dbutils.pooled_db import PooledDB
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor,as_completed

class BuDu_recruit():
    def __init__(self):
        self.pool=PooledDB(
            creator=pymysql,
            maxconnections=6,
            mincached=2,
            maxcached=5,  # 连接池中最多闲置的链接,0和None不限制
            maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享
            blocking=True,  # 连接池中如果没有可用的链接后,是否阻塞等待。False,不等待直接报错,等待直到有可用链接,再返回。
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123456',
            database='py_spider',
            charset='utf8'
        )
        self.url='https://talent.baidu.com/httservice/getPostListNew'
        self.ua=UserAgent()
        self.headers={
            'User-Agent':self.ua.random,
            'referer':'https://talent.baidu.com/jobs/social-list?',
        }

    def get_recruit_info(self,page):
        data={
            'recruitType':'SOCIAL',
            'pageSize':'10',
            'keyWord':'python',
            'curPage':f'{page}',
            'projectType':'',
        }
        res=requests.post(url=self.url,headers=self.headers,data=data).json()
        return res

    def parse_recruit_info(self,res):
        info_list=res['data']['list']
        for info in info_list:
            education=info['education']
            name=info['name']
            service_condition=info['serviceCondition']
            self.save_recruit_info(education,name,service_condition)

    def create_table(self):
        with self.pool.connection()as conn:
            with conn.cursor()as cursor:
                sql="""
                    create table if not exists baidu_recruit_info(
                        id int primary key auto_increment,
                        educate varchar(50),
                        name varchar(200),
                        service_condition text
                    )
                """
                try:
                    cursor.execute(sql)
                    print('表创建成功')
                except Exception as e:
                    print('表创建失败',e)


    def save_recruit_info(self,education,name,server_name):
        with self.pool.connection() as conn:
            with conn.cursor()as cursor:
                sql = """insert into baidu_recruit_info(educate, name, service_condition) values (%s, %s, %s)"""
                try:
                    cursor.execute(sql,(education,name,server_name))
                    print('插入成功:',(education,name,server_name))
                    conn.commit()# 提交事务
                except pymysql.Error as e:
                    print(f'插入失败,错误代码: {e.args[0]}, 错误信息: {e.args[1]}')
                    conn.rollback()

    def main(self):
        self.create_table()
        with ThreadPoolExecutor(max_workers=5) as pool:
            futures=[pool.submit(self.get_recruit_info,page) for page in range(1,38)]
            """futures.result()方法会造成主线程堵塞需要使用as_completed转为非阻塞"""
            for future in as_completed(futures):
                pool.submit(self.parse_recruit_info,future.result())

if __name__ == '__main__':
    bd=BuDu_recruit()
    bd.main()

7.3使用多进程抓取芒果视频信息

 网站网址:芒果TV

注意:在多进程环境中,数据库连接对象不能在__init__方法中执行,会出现序列化失败的问题,需要将连接方法放置在类属性中。 

import requests
import pymongo
import redis
import hashlib
from multiprocessing import Process,JoinableQueue as Queue


class MGTV():
    # 进程环境下链接对象需要声明成类属性
    redis_client=redis.Redis()
    mogo_client=pymongo.MongoClient(host='localhost',port=27017)
    collections=mogo_client['py_spider']['tv_info']

    def __init__(self):
        self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&'
        self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}
        self.params_queue=Queue()
        self.json_queue=Queue()
        self.dict_queue=Queue()

    def get_params(self):
        for page in range(6):
            params={
                    'allowedRC':'1',
                    'platform':'pcweb',
                    'channelId':'2',
                    'pn':f'{page}',
                    'pc':'80',
                    'hudong':'1',
                    '_support':'10000000',
                    'kind':'19',
                    'area':'10',
                    'year':'all',
                    'chargeInfo':'a1',
                    'sort':'c2',
                    'feature':'all',
            }
            self.params_queue.put(params)

    def get_tv_info(self):
        while True:
            params=self.params_queue.get()
            res=requests.get(url=self.url,headers=self.headers,params=params)
            self.json_queue.put(res.json())
            self.params_queue.task_done()

    def parse_tv_info(self):
        while True:
            tv_json=self.json_queue.get()
            for i in tv_json['data']['hitDocs']:
                item = dict()
                item['title']=i['title']
                item['subtitle']=i['subtitle']
                item['story']=i['story']
                self.dict_queue.put(item)
            self.json_queue.task_done()

    # 数据去重
    @staticmethod
    def get_haslib(dict_item):
        md5=hashlib.md5()
        md5.update(str(dict_item).encode('utf-8'))
        return md5.hexdigest()

    def save_tv_info(self):
        while True:
            item=self.dict_queue.get()
            md5_haslib=self.get_haslib(item)
            redis_result=self.redis_client.sadd('tv:filter',md5_haslib)
            if redis_result:
                # 确保插入的数据没有 _id 字段
                # item.pop('_id', None)
                self.collections.insert_one(item)
                print('保存成功:',item)
            else:
                print('数据重复')
            self.dict_queue.task_done()

    def close(self):
        print('爬虫即将退出,准备关闭数据库连接')
        self.mogo_client.close()
        self.redis_client.close()


    def main(self):
        process_list=[]

        get_params=Process(target=self.get_params)
        get_params.start()

        for _ in range(3):
            get_tv_info=Process(target=self.get_tv_info)
            process_list.append(get_tv_info)

        parse_tv_info=Process(target=self.parse_tv_info)
        process_list.append(parse_tv_info)

        save_tv_info=Process(target=self.save_tv_info)
        process_list.append(save_tv_info)

        for process_obj in process_list:
            process_obj.daemon=True
            process_obj.start()

        # 让主进程等待有限任务执行完毕后解堵塞
        get_params.join()

        # 如果队列中的任务没有完成则放在住进程退出
        for q in [self.params_queue,self.json_queue,self.dict_queue]:
            q.join()

        self.close()



if __name__ == '__main__':
    tv=MGTV()
    tv.main()

7.4使用进程池抓取芒果视频信息

import requests
import pymongo
import redis
import hashlib
from concurrent.futures import ProcessPoolExecutor,as_completed

class MGTV():
    # 进程环境下链接对象需要声明成类属性
    redis_client=redis.Redis()
    mogo_client=pymongo.MongoClient(host='localhost',port=27017)
    collections=mogo_client['py_spider']['tv_info']

    def __init__(self):
        self.url='https://pianku.api.mgtv.com/rider/list/pcweb/v3?allowedRC=1&'
        self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36'}

    def get_params(self):
        for page in range(6):
            params_list=[]
            params={
                    'allowedRC':'1',
                    'platform':'pcweb',
                    'channelId':'2',
                    'pn':f'{page}',
                    'pc':'80',
                    'hudong':'1',
                    '_support':'10000000',
                    'kind':'19',
                    'area':'10',
                    'year':'all',
                    'chargeInfo':'a1',
                    'sort':'c2',
                    'feature':'all',
            }
            params_list.append(params)
            return params_list

    def get_tv_info(self,params):
        res=requests.get(url=self.url,headers=self.headers,params=params)
        return res.json()

    @staticmethod
    def parse_tv_info(res):
        items=[]
        for i in res['data']['hitDocs']:
            item =dict()
            item['title']=i['title']
            item['subtitle']=i['subtitle']
            item['story']=i['story']
            items.append(item)
        return items

    # 数据去重
    @staticmethod
    def get_haslib(dict_item):
        md5=hashlib.md5()
        md5.update(str(dict_item).encode('utf-8'))
        return md5.hexdigest()

    def save_tv_info(self,items):
        for item in items:
            md5_haslib=self.get_haslib(item)
            redis_result=self.redis_client.sadd('tv:filter',md5_haslib)
            if redis_result:
                # 确保插入的数据没有 _id 字段
                # item.pop('_id', None)
                self.collections.insert_one(item)
                print('保存成功:',item)
            else:
                print('数据重复')

    def close(self):
        print('爬虫即将退出,准备关闭数据库连接')
        self.mogo_client.close()
        self.redis_client.close()


    def main(self):
        params_list=self.get_params()

        with ProcessPoolExecutor(max_workers=5) as executor:
            # 提交请求数据的任务
            future_to_params={executor.submit(self.get_tv_info,params): params for params in params_list}

            # 处理请求数据的结果
            for future in as_completed(future_to_params):
                res=future.result()
                items=self.parse_tv_info(res)
                self.save_tv_info(items)
            self.close()

if __name__ == '__main__':
    tv=MGTV()
    tv.main()

7.5使用协程完成王者荣耀英雄图片下载

采集王者荣耀官网中所有英雄的图片信息

网站地址:英雄资料列表页-英雄介绍-王者荣耀官方网站-腾讯游戏

import os
import aiofile
import aiohttp
import asyncio


class HeroSkin:
    def __init__(self):
        self.json_url = 'https://pvp.qq.com/web201605/js/herolist.json'
        self.skin_url = 'https://game.gtimg.cn/images/yxzj/img201606/skin/hero-info/{}/{}-bigskin-{}.jpg'
        self.headers = {
            'User-Agent':
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
        }

    async def get_image_content(self, session, e_name, c_name):
        # 因为不确定每个英雄具体的皮肤数量, 所以设置一个超出英雄皮肤数量的最大值
        # 根据链接状态码判断是否请求成功, 如果请求失败则终止循环并获取下一个英雄的皮肤图片
        for skin_id in range(1, 31):
            async with session.get(self.skin_url.format(e_name, e_name, skin_id), headers=self.headers) as response:
                if response.status == 200:
                    content = await response.read()
                    async with aiofile.async_open('./images/' + c_name + '-' + str(skin_id) + '.jpg', 'wb') as f:
                        await f.write(content)
                        print('保存成功:', c_name)
                else:
                    break

    async def main(self):
        tasks = list()
        async with aiohttp.ClientSession() as session:
            async with session.get(self.json_url, headers=self.headers) as response:
                result = await response.json(content_type=None)
                for item in result:
                    e_name = item['ename']
                    c_name = item['cname']
                    # print(e_name, c_name)
                    coro_obj = self.get_image_content(session, e_name, c_name)
                    tasks.append(asyncio.create_task(coro_obj))
                await asyncio.wait(tasks)


if __name__ == '__main__':
    if not os.path.exists('./images'):
        os.mkdir('./images')

    hero_skin = HeroSkin()
    asyncio.run(hero_skin.main())

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/981799.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Avalonia 中文乱码

代码字体文件设置成支持中文的,但是编译的代码还是显示的乱码,原因是代码文件的文件编码格式不支持中文导致的。 如下面的2个页面一部分中文显示正常,一部分显示正常,一部分显示乱码。

Verilog学习方法—基础入门篇(一)

前言: 在FPGA开发中,Verilog HDL(硬件描述语言)是工程师必须掌握的一项基础技能。它不仅用于描述数字电路,还广泛应用于FPGA的逻辑设计与验证。对于初学者来说,掌握Verilog的核心概念和基本语法&#xff0…

PCB电路板基础知识与应用详解:结构与工作原理

电路板,简称PCB(Printed Circuit Board),是电子设备的核心部分,几乎所有现代电子产品都离不开电路板的支撑。本文将带您全面了解电路板的基本结构、工作原理及其在电子工程中的重要作用。 什么是电路板? 电…

使用Qt调用HslCommunication(C++调用C#库)

使用C/CLI 来调用C#的dll 任务分解: 1、实现C#封装一个调用hsl的dll; 2、实现C控制台调用C#的dll库; 3、把调用C#的dll用C再封装为一个dll; 4、最后再用Qt调用c的dll; 填坑: 1、开发时VS需要安装CLI项目库…

标签的ref属性 vue中为什么不用id标记标签

标签的ref属性 vue中为什么不用id标记标签 假设有一对父子组件,如果父组件和子组件中存在id相同的标签,会产生冲突。通过id获取标签会获取到先加载那个标签。 标签的ref属性的用法 在父组件App中,引入了子组件Person。 并使用ref标记了Pe…

嵌入式硬件发展历程

微型计算机架构:CPURAM存储设备 以前常把CPU称为MPU,但现在随着发展,分为两条道路: 一、发展历程 1、集成 然后把CPURAMFlash其他模块集成在一起,就称为MCU也称单片机,他们Flash和RAM比较小,运行裸机程…

Java进阶:Zookeeper相关笔记

概要总结: ●Zookeeper是一个开源的分布式协调服务,需要下载并部署在服务器上(使用cmd启动,windows与linux都可用)。 ●zookeeper一般用来实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能。 ●有多台服…

Java spring客户端操作Redis

目录 一、创建项目引入依赖 二、controller层编写 (1)String类型相关操作测试: (2)List类型相关操作测试: (3)Set类型相关操作测试: (4)Has…

TMS320F28P550SJ9学习笔记1:CCS导入工程以及测试连接单片机仿真器

学习记录如何用 CCS导入工程以及测试连接单片机仿真器 以下为我的CCS 以及驱动库C2000ware 的版本 CCS版本: Code Composer Studio 12.8.1 C2000ware :C2000Ware_5_04_00_00 目录 CCS导入工程: 创建工程: 添加工程: C…

【Java学习】String类变量

面向对象系列七 一、String类似复刻变量 1.似复刻变量 1.1结构 1.2常量池检查 1.3构造方法 1.4""形式 1.5引用 2、字符数组 2.1不可变性 2.2常创性 二、String类变量里的方法 1.获取 1.1引用获取: 1.2字符获取: 1.3数组获取 1.…

3.1、密码学基础

目录 密码学概念与法律密码安全分析密码体制分类 - 私钥密码/对称密码体制密码体制分类 - 公钥密码/非对称密码体制密码体制分类 - 混合密码体制 密码学概念与法律 密码学主要是由密码编码以及密码分析两个部分组成,密码编码就是加密,密码分析就是把我们…

【问题解决】Jenkins使用File的exists()方法判断文件存在,一直提示不存在的问题

小剧场 最近为了给项目组提供一个能给Java程序替换前端、后端的增量的流水线,继续写上了声明式流水线。 替换增量是根据JSON配置文件去增量目录里去取再替换到对应位置的,替换前需要判断增量文件是否存在。 判断文件是否存在?作为一个老Ja…

Vue中实现大文件的切片并发下载和下载进度展示

Vue中实现大文件的切片下载 切片下载需要后端提供两个接口,第一个接口用来获取当前下载文件的总切片数,第二个接口用来获取具体某一个切片的内容。 界面展示 数据流展示 代码 接口 // 切片下载-获取文件的总切片数 export function getChunkDownload…

Hive-数据倾斜优化

数据倾斜的原因 1)key分布不均匀,本质上就是业务数据有可能会存在倾斜 2)某些SQL语句本身就有数据倾斜 关键词 情形 后果 Join A、其中一个表较小,但是key集中; B、两张表都是大表,key不均 分发到…

java通过lombok自动生成getter/setter方法、无参构造器、toString方法

文章目录 在IDEA打开允许注解在类名上面使用Data注解 在IDEA打开允许注解 打开设置 在类名上面使用Data注解 按住AltEnter键 等依赖下载完成后上面会新增一行import lombok.Data; 完整代码如下: package com.itheima.extendss;import lombok.AllArgsConstru…

RabbitMQ 2025/3/5

高性能异步通信组件。 同步调用 以支付为例: 可见容易发生雪崩。 异步调用 以支付为例: 支付服务当甩手掌柜了,不管后面的几个服务的结果。只管库库发,后面那几个服务想取的时候就取,因为消息代理里可以一直装&#x…

Element UI-Select选择器结合树形控件终极版

Element UI Select选择器控件结合树形控件实现单选和多选&#xff0c;并且通过v-model的方式实现节点的双向绑定&#xff0c;封装成vue组件&#xff0c;文件名为electricity-meter-tree.vue&#xff0c;其代码如下&#xff1a; <template><div><el-select:valu…

9.RabbitMQ消息的可靠性

九、消息的可靠性 1、生产者确认 9.1.1、Confirm模式简介 可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中&#xff1b; 有两种解决方案&#xff1a; 第一种是开启Confirm(确认)模式&#xff1b;(异步) 第二种是开…

探秘基带算法:从原理到5G时代的通信变革【四】Polar 编解码(二)

文章目录 2.3.3 极化编码巴氏参数与信道可靠性比特混合生成矩阵编码举例 2.3.4 极化译码最小单元译码串行抵消译码&#xff08;SC译码&#xff09;算法SCL译码算法 2.3.5 总结**Polar 码的优势****Polar 码的主要问题****Polar 码的应用前景** 2.3.6 **参考文档** 本博客为系列…

【我的 PWN 学习手札】House of Emma

House of Emma 参考文献 第七届“湖湘杯” House _OF _Emma | 设计思路与解析-安全KER - 安全资讯平台 文章 - house of emma 心得体会 - 先知社区 前一篇博客【我的 PWN 学习手札】House of Kiwi-CSDN博客的利用手法有两个关键点&#xff0c;其一是利用__malloc_assert进入…