Pyppeteer简介
pyppeteer
是 Python 语言的一个库,它是对 Puppeteer 的一个非官方端口,Puppeteer 是一个 Node 库,Puppeteer是Google基于Node.js开发的一个工具,它提供了一种高层次的 API 来通过 DevTools 协议控制 Chrome 或 Chromium。pyppeteer
可以用来进行网页自动化处理,支持页面抓取、表单提交、UI测试、JavaScript执行等功能,非常适合用于网页爬虫或自动化测试。
在pyppeter中,实际上它背后有一个类似Chrome浏览器的Chromium浏览器在执行一些动作进行网页渲染。
Chrome与Chromium渊源。两款浏览器内核是一样的,实现方式也是一样,可以认为是开发版和正式版的区别,功能基本没有太大的区别。
环境安装
pip install pyppeteer
注意:支持异步需要3.5以上的解释器
import pyppeteer
print(pyppeteer.executablePath()) #查看chromium存放路径
print(pyppeteer.__chromium_revision__) #查看版本号
官方文档:
API Reference — Pyppeteer 0.0.25 documentationhttps://miyakogi.github.io/pyppeteer/reference.html
测试样例
from pyppeteer import launch
import asyncio
import time
async def main():
# 启动一个浏览器(headless默认是无头即无界面浏览器,改为false有界面)
browser = await launch(headless=False,args=['--disable-infobars','--window-size=1920,1080'])
# 创建一个页面
page = await browser.newPage()
# 设置页面视图大小
await page.setViewport({'width':1900,'height':1080})
# 跳转到百度
await page.goto('https://www.baidu.com')
# 输入要查询的关键字,type第一个参数是元素的selector(css),第二个是要输入的关键字
await page.type('#kw','pyppeteer')
# 点击提交按钮
await page.click('#su')
time.sleep(30)
await browser.close()
# 启动异步任务
asyncio.get_event_loop().run_until_complete(main())
基本配置
基本参数
params = {
# 关闭无头浏览器
"headless":False,
"dumpio":True,#防止浏览器卡住
r"userDataDir":"./cache-data", #用户文件地址
"args":[
'--disable-infobars', #关闭自动化提示框
'--window-size=1920,1080', #设置窗口大小
'--log-level=30', #日志保存等级,建议设置越小越好,要不然生成的日志占用的空间会很大30为waring级别
'--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'--no-sandbox', #关闭沙盒模式
'--start-maximized', #窗口最大化模式
'--proxy-server=http://localhost:1080' #代理
]
}
设置窗口
#UI模式 闭频警告
browser = await launch(headless = False,args=['--disable-infobars'])
page = await browser.newPage()
await page.setViewport({'width':1200,'height':800})
添加头部
网页截图
page.screenshot(path='example.png')
伪装浏览器绕过检测
object.defineProperty()方法会直接在一个对象上定义一个新属性,或者修改一个对象的现有属性,并返回此对象。
await page.evaluateOnNewDocument('()=>{Object.defineProperty(navigator, "webdriver", { get: () => false }); }');
案例演示触发JS
async def main():
# 启动一个浏览器
browser = await pyppeteer.launch(headless = False,args = ['--disable-infobars','--window-size=1920,1080'])
# 打开一个新页面
page = await browser.newPage()
# 添加用户代理
await page.setUserAgent('Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
await page.evaluateOnNewDocument('()=>{Object.defineProperty(navigator, "webdriver", { get: () => false }); }')
await page.goto('https://www.zhipin.com/web/geek/job?query=%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&city=100010000&page=')
dimensions = await page.evaluate('() => ({ cookie: document.cookie })')
headers = {
'User-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Cookie':dimensions['cookie']
}
url = 'https://www.zhipin.com/web/geek/job?query=%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&city=100010000&page='
resp = requests.get(url=url,headers=headers)
print(resp.text)
# 启动异步任务
asyncio.get_event_loop().run_until_complete(main())
滚动到页面底部
await page.evaluate(window.scrollBy(0,document.body.scrollHeight))
进阶使用
数据提取
获取属性
登录案例
import asyncio
from pyppeteer import launch
async def main():
# 启动浏览器,headless=False 表示非无头模式,也就是浏览器界面是可见的
browser = await launch(headless=False, args=['--disable-infobars', '--window-size=1920,1080'])
# 开启一个新的浏览器标签页
page = await browser.newPage()
# 访问指定的URL
await page.goto('https://www.captainbi.com/amz_login.html')
# 设置视窗大小
await page.setViewport(viewport={'width': 1356, 'height': 768})
# 输入用户名
await page.type('#username', '123456')
# 输入密码,假定密码输入框的ID为'password'
await page.type('#password', '123456') # 请确保选择器正确对应到密码输入框
# 单击登录按钮,假定按钮的ID为'submit'
# 如果按钮没有ID,则需要提供正确的CSS选择器
await page.click('#submit', options={'timeout': 3000})
# 运行 main 协程
asyncio.run(main())
综合案例
'''
抓取唯品会关于女性口红等数据
1搜索入口抓口红数据
2根据品牌做检索
3字段 原价-折扣价-品牌
4翻页
5保存入库
根据观察数据是动态加载。所以要使用自动化技术 把动态变静态 结合requests
'''
import requests
from lxml import etree
import pandas as pd
import asyncio
from pyppeteer import launch
from loguru import logger
class Wph(object):
def __init__(self,url,name):
self.url = url
self.name = name
self.headers = {
'User-Agent':'aaqabbbccc'
}
self.session = requests.session()
self.hadInone = lambda x:x[0] if x else ''
self.browser = None
async def main(self,url):
# 打开一个浏览器
self.browser = await launch()
# 创建一个窗口
page = await self.browser.newPage()
# 访问对应的url
await page.goto(url)
text = await page.content() # 返回页面html
return text
def spider(self):
df = pd.DataFrame(columns=['品牌','标题','原价','现价','折扣'])
# 发起请求
res = self.session.get(self.url,params={'keyword':self.name},headers=self.headers,verify=False)
html = etree.HTML(res.text)
url_list = html.xpath('.//div[@class="c-filter-group-content"]/div[contains(@class,"c-filter-group-scroll-brand")]/ul/li/a/@href')
# 迭代品牌URL地址
for i in url_list:
# 驱动浏览器请求
page_html = asyncio.get_event_loop().run_until_complete(self.main('http:'+i))
# 获取网页源代码
page = etree.HTML(page_html)
htmls = page.xpath('//section[@id="J_searchCatList"]/div')
for h in htmls[1:]:
# 品牌
pinpai = self.hadInone(h.xpath('//div[contains(@class,"c-goods-item__name--two-line")]/text()'))
# 标题
title = self.hadInone(h.xpath('//div[contains(@class,"c-goods-item__name--two-line")]/text()'))
# 原价
y_price = self.hadInone(h.xpath('//div[contains(@class,"J-goods-item__market-price")]/text()'))
# 卖价
x_price = self.hadInone(h.xpath('//div[contains(@class,"c-goods-item__sale-price")]/text()'))
# 折扣
zk = self.hadInone(h.xpath('div//div[contains(@class,"c-goods-item__discount")]/text()'))
logger.info(f'品牌{pinpai},标题{title},原价{y_price},现价{x_price},折扣{zk}')
pro = {
'品牌':pinpai,
'标题':title,
'原价':y_price,
'现价':x_price,
'折扣':zk,
}
df = df.append([pro])
print(pro)
# df.to_excel('唯品会数据.xlsx',index=False)
return df
# def __del__(self):
# if self.browser:
# asyncio.get_event_loop().run_until_complete(self.browser.close())
if __name__=='__main__':
url = 'https://category.vip.com/suggest.php'
name = '香水'
w = Wph(url,name)
w.spider()