爬虫学习记录

 1.概念

通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程

  • 通用爬虫:抓取的是一整张页面数据
  • 聚焦爬虫:抓取的是页面中的特定局部内容
  • 增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据

robots.txt协议:

君子协议,网站后面添加robotx.txt可进行查看

https://www.baidu.com/robots.txt

1.1 http协议

服务器和客户端进行数据交互的一种形式

常用的请求头信息:

  • User-Agent: 请求载体的身份标识,如: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36
  • Connection: 请求完毕完毕之后,是断开连接还是保持连接(close和keep-alive)

常用响应头信息:

  • Content-Type: 服务器响应客户端的数据类型 如:(text/html; charset=utf-8)

1.2 https协议

安全的超文本传输协议,在客户端和服务端进行数据传输和数据交互的过程中,数据是进行加密的.

数据加密的方式

  • 对称秘钥加密: 客户端制定加密方式,加加密的密文和生成的秘钥都传输给服务端.服务端根据秘钥对于密文进行解密.(但是密文和秘钥可能会被同时拦截)
  • 非对称秘钥加密:服务端生成秘钥对,将生成的公钥传递给客户端,然后将生成的密文传递给服务端,服务端再使用私钥进行解密(客户端拿到的秘钥,不一定是从服务端传递过来的)
  • 证书秘钥加密:服务器端制定加密方式,服务端将公钥传递给证书认证机构,认证机构将公钥通过认证之后,进行数字签名.将携带数字签名的公钥封装到证书当中,将证书一并发送给客户端

2. requests模块

作用:模拟浏览器发送请求

requests模块的编码流程:

  • 指定url
  • 发起请求(get/post)
  • 获取相应数据
  • 持久化存储

2.1 简易网页采集器

2.1.1 UA 伪装

将python脚本发送的请求,伪装成为浏览器发送的请求

2.1.2 Get请求携带参数

将url的参数封装成为字典,传递给params

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

# 参考url
# https://www.baidu.com/s?wd=%E6%88%90%E5%8A%9F
url = "https://www.baidu.com/s?"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "wd":word
}

# 获取响应对象
response = requests.get(url, headers=header, params=param)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
file_name = word + ".html"
with open(file_name, "wb") as f:
    f.write(page_text)

print(f"{file_name} has been save successfully")

2.2 破解百度翻译

2.2.1 POST请求携带参数

将传递的参数封装成为字典,并且传递给data

180e7e948fa9407cb8ffb152aa6caa93.png

2.2.2 Ajax请求

单词输入后,局部页面就会刷新

Ajax(Asynchronous JavaScript and XML)是一种在Web应用程序中进行异步通信的技术,它使用JavaScript和XML(现在通常使用JSON)来实现在不刷新整个页面的情况下与服务器进行数据交换

97db308ad6cc45dfa31e7f16dcee5cc8.png

2.2.3 JSON模块的使用

  • 反序列化 loads:将json字符串转化为python对象字典
  • 序列化 dump: 将python对象字典转化为json字符串,并写入文件

4c96e3dfa22742a1a2ce45a179a8f6cd.png

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests
import json

# 参考url
url = "https://fanyi.baidu.com/sug"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
    "Referer":"https://fanyi.baidu.com/mtpe-individual/multimodal"
}

word = input("enter your word:").strip()

# get请求携带的参数
param = {
    "kw":word
}

# 获取响应对象
response = requests.post(url, headers=header,data=param)

# 获取响应内容
word_text = json.loads(response.text)

# 持久化储存
file_name = word + ".json"
with open(file_name, "w",encoding="utf-8") as f:
    json.dump(word_text["data"][0], f,ensure_ascii=False)

print(f"{file_name} has been save successfully")

f35d983fa23d484db43827f19e219270.png

2.3 电影 

2.4 公司url

  • 动态加载数据分析
  • 获取每家公司的url,但是发现每家公司的详情页面也是动态加载出来的

3.数据解析

  • 正则
  • bs4
  • xpath

数据解析原理:

解析局部的文本内容都会在标签之间或是标签的属性中进行存储

  • 进行指定标签的定位
  • 标签或是标签对应属性存储值的获取

3.1 正则解析

3.2 bs4解析

只可以被应用于python中

3.2.1 数据解析原理

  • 实例化BeautifulSoup对象,并且将页面源码数据加载到该对象里面
  • 通过调用BeautifulSoup对象中的相关的属性或是方法,进行标签定位和数据提取

3.2.2 环境安装

pip3 install beautifulsoup4  -i  https://mirrors.aliyun.com/pypi/simple
pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.2.3 bs4的具体使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:07
# @Author : George
from bs4 import BeautifulSoup
import re

fp = open("./result.html", "r", encoding="utf-8")
soup = BeautifulSoup(fp, "lxml")
# soup.tagName:返回的是html中第一次出现的tag标签
print(soup.div)
# -------------------------------------------
# 等同于soup.tagName
print(soup.find("a"))
# 属性定位
print(soup.find("a", href=re.compile(".*ip138.com/$")).text)
# # 加载源码中所有的tag标签组成的列表
print(soup.find_all("a"))
# -------------------------------------------
# 可以使用选择器,id/类/标签/选择器,返回的是一个列表
print(soup.select('.center'))
# 层级选择器,
# “ ”空格就是表示多个层级
# > 表示一个层级
for i in soup.select('.center > ul > li > a'):
    print(i.text)
print(soup.select('.center > ul a')[0])
# -------------------------------------------
# 获取标签之间的文本数据 soup.a.text/string/get_text()
# --text/get_text可以获取标签里面啊所有的文本内容
# --string只可以获取该标签下的直系文本内容
print(soup.select('.center > ul a')[0].string)
# -------------------------------------------
# 获取标签里面的属性值
print(soup.select('.center > ul a')[0]["href"])

3.2.4 bs4爬取三国演义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:43
# @Author : George
# ==================================================
# <a href="/guwen/bookv_6dacadad4420.aspx">第一回</a>
# 第一回网址
# https://www.gushiwen.cn/guwen/bookv_6dacadad4420.aspx
# ==================================================

from bs4 import BeautifulSoup
import requests

url = "https://www.gushiwen.cn/guwen/book_46653FD803893E4F7F702BCF1F7CCE17.aspx"

# UA伪装:模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 获取响应对象
response = requests.get(url, headers=header)
# 实例化bs对象
soup = BeautifulSoup(response.text, "lxml")
a_liat = soup.select(".bookcont >ul > span >a")
fp = open("三国.txt","w",encoding="utf-8")
# 解析章节标题和详情页面的url
for tag in a_liat:
    title = tag.text
    detail_url = "https://www.gushiwen.cn/"+tag["href"]
    # 对详情页面发起请求
    detail_page = requests.get(detail_url, headers=header)
    # 解析出详情页面中的内容
    detail_soup = BeautifulSoup(detail_page.text,"lxml")
    # 使用此种方法出现一个问题,就是文章都是在p标签里面,所以文章不会换行
    # content = detail_soup.find("div",class_="contson").text
    fp.write(title + ":")
    for line in detail_soup.select('.contson > p'):
        fp.write(line.text+"\n")
    fp.write("\n\n")
    print(f"{title}爬取成功")
fp.close()

效果:

2fed6d19354c427799d07f88da4f0cb3.png

3.3 xpath解析

最常用且是最便捷高效的一种解析方式

3.3.1 xpath解析原理

  • 实例化一个etree的对象,且将被解析的页面远吗数据加载到该对象中
  • 调用etree对象中的xpath方法结合xpath表达式实现标签的定位和内容的捕获

3.3.2 环境的安装

pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.3.3 具体使用

xpath只能够根据层级关系定位标签页面.

1d80a29970ae42d885f98586b60dd8f9.png

ba9263465f7742479051cd9574f5c8fd.png

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<!--    <meta charset="UTF-8">-->
<!--    <meta name="viewport" content="width=device-width, initial-scale=1.0">-->
    <title>小型HTML页面示例</title>
</head>
<body>
    <div class="container">
        <div class="div1">
            <h2>第一个Div</h2>
            <p>这是第一个div的内容。它使用了类标签.div1进行样式定位。</p>
            <p>这是第一个div的内容。它使用了类标签.div2进行样式定位。</p>
            <p><title>这是第一个div的内容。它使用了类标签.div3进行样式定位。</title></p>
        </div>
        <div class="div2">
            <h2>第二个Div</h2>
            <p>这是第二个div的内容。它使用了类标签.div2进行样式定位。</p>
        </div>
        <div class="div3">
            <h2 class="div3_h2">第三个Div</h2>
            <p>这是第三个div的内容。它使用了类标签.div3进行样式定位。</p>
        </div>
    </div>
</body>
</html>
from lxml import etree

tree_ = etree.parse("./baidu.html")
# r = tree_.xpath("/html/head/title")  # => [<Element title at 0x10a79ee60>],返回的是定位出来的对象
# r = tree_.xpath("/html//title")  # => [<Element title at 0x10a510dc0>],定位出来的效果是一样的
# r = tree_.xpath("//title") # => [<Element title at 0x106b75dc0>],找到源码里面所有的title标签

# 属性定位
# r = tree_.xpath('//div[@class="div1"]') # => [<Element div at 0x101807e60>]

# 索引定位,索引位置从1开始
# r = tree_.xpath('//div[@class="div1"]/p[3]')

# 取文本 /text()获取的标签里面直系的文本内容, //text() 获取标签下面所有的文本内容
# text = tree_.xpath('//div[@class="div1"]/p[3]/text()')
# print(text)  # => ['这是第一个div的内容。它使用了类标签.div3进行样式定位。']
# text = tree_.xpath('//div[@class="div1"]//text()')
# print(text)

# 获取属性
# attr = tree_.xpath('//div[@class="div3"]/h2/@class')  # ['div3_h2']
# print(attr)

3.3.4 爬取ppt模板

已经成功,结果如下,但是是大学时多亏了它的免费模板,就不贴代码给它带来麻烦了

99221c29b8094f67b7b8f70744dd24fa.png

3.3.5 爬取美女图片

https://pic.netbian.com/4kmeinv/

爬取美女图片失败,开始进入网站总是有人机验证,进去什么都爬取不了,后面再试一下

db1321caf5304010a2a3d806cc43f60c.png

4.模拟登录

4.1 验证码识别 

要收费,自己写个图片文字识别

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 13:44
# @Author : George
from PIL import Image
import pytesseract

# 如果你没有将tesseract.exe添加到系统的PATH中,
# 你需要在这里指定tesseract可执行文件的完整路径
pytesseract.pytesseract.tesseract_cmd = r'D:\Tesseract-OCR\tesseract.exe'

# 打开一个图像文件
image_path = 'abc_1.png'  # 替换为你的图像路径
image = Image.open(image_path)

# 使用pytesseract进行OCR
text = pytesseract.image_to_string(image, lang='chi_sim')  # lang参数指定语言,例如'chi_sim'表示简体中文

# 打印识别出的文字
print(text)

4.2 模拟登录逻辑

5.cookie

5.1含义及作用

网页的Cookie是一种在Web开发中广泛使用的技术,用于在用户的计算机上存储小块的数据。这些数据通常由Web服务器发送给用户的浏览器(所以主要是服务器创建的),并在用户后续的访问中被浏览器返回给服务器。Cookie的主要功能和用途包括:

  1. 会话管理:Cookie可以用于保持用户的会话状态,例如在用户登录到网站后,服务器可以发送一个包含会话ID的Cookie到用户的浏览器。在用户后续的请求中,浏览器会自动包含这个会话ID,使得服务器能够识别并持续管理用户的会话。

  2. 个性化设置:Cookie可以用来存储用户的偏好设置,例如网站的语言、主题颜色、字体大小等。这样,当用户再次访问网站时,这些设置可以自动恢复,提高用户体验。

  3. 跟踪用户行为:通过Cookie,网站可以跟踪和分析用户的行为,例如用户访问了哪些页面、停留了多长时间、点击了哪些链接等。这些信息对于网站优化、广告定位等非常有用。

  4. 安全性:在某些情况下,Cookie还可以用于增强网站的安全性,例如通过存储加密的令牌来验证用户的身份。

Cookie具有以下几个特点:

  • 存储在客户端:Cookie数据存储在用户的浏览器上,而不是服务器上。这意味着即使用户关闭了浏览器或计算机,只要没有删除Cookie,数据仍然存在。
  • 自动发送:在用户访问与Cookie相关的网站时,浏览器会自动将相关的Cookie数据发送到服务器。
  • 有限的大小和数量:每个Cookie的大小和数量都有限制,这取决于浏览器和Web服务器的配置。
  • 过期时间:Cookie可以设置过期时间,在过期时间之前,Cookie一直有效。如果没有设置过期时间,Cookie就是一个会话Cookie,在用户关闭浏览器时自动删除。

5.2 session的含义及特点

  • 一、Session的基本概念

Session是服务器用于跟踪用户会话的一种机制。它允许服务器在多个请求之间识别同一个用户,并维护该用户的状态信息。这些状态信息可以包括用户的登录状态、购物车内容、偏好设置等。

  • 二、Session的工作原理
  1. 创建Session:当用户首次访问网站时,服务器会创建一个新的Session对象,并为其分配一个唯一的Session ID。
  2. 发送Session ID:服务器通常会将这个Session ID以Cookie的形式发送给客户端浏览器。客户端浏览器会在后续的请求中自动将这个Session ID附加在请求头中。
  3. 识别用户:服务器通过检查请求头中的Session ID来识别用户,并获取相应的Session数据。这样,服务器就可以在多个请求之间保持用户的状态信息。
  • 三、Session的作用
  1. 保持用户状态:Session允许服务器在多个请求之间跟踪用户的状态信息,如登录状态、购物车内容等。这使得用户可以在不同页面之间无缝切换,而无需重新认证或输入信息。
  2. 个性化服务:根据用户的喜好和历史行为,服务器可以为用户提供个性化的内容和服务。这有助于提高用户体验和满意度。
  3. 安全性:通过验证Session ID来确认用户的请求,服务器可以防止未授权访问和非法操作。这有助于保护用户的隐私和数据安全。
  • 四、Session与Cookie的关系

Session和Cookie是密切相关的两种技术。Cookie是服务器发送到客户端浏览器并保存在本地的一小块数据,而Session则是服务器用于跟踪用户会话的一种机制。Cookie通常用于存储Session ID,以便服务器在后续的请求中识别用户。因此,可以说Cookie是Session的一种实现方式。

  • 五、Session的管理

在网络请求中,管理Session是非常重要的。开发人员需要确保Session的安全性、有效性和可维护性。这包括:

  1. 设置合理的Session过期时间:为了避免用户长时间未操作而导致的会话过期问题,开发人员需要设置合理的Session过期时间。
  2. 保护Session ID:Session ID是用户身份的重要标识,开发人员需要采取措施来保护它免受攻击和泄露。例如,可以使用HTTPS协议来加密传输Session ID,或者使用更复杂的Session ID生成算法来提高安全性。
  3. 清理无效的Session:为了节省服务器资源和提高性能,开发人员需要定期清理无效的Session对象。这可以通过设置Session的失效时间、使用数据库存储Session数据并定期清理过期数据等方式来实现。

5.3 http和https协议的特点

无状态。即是说,即使你的第一次请求已经实现了自动登录。但是你第二次发送请求时,服务器端并不知道你的请求是基于第一次登录状态的。

cookie可以让服务器端记录客户端的相关状态

5.3 cookie值的处理

  • 手动抓包cookie值,将其封装到headers里面,但是这种方法面对cookie是动态变化的时候就很难处理了
  • session会话对象
    • 可以进行请求的发送
    • 如果请求过程中产生了cookie,则该cookie会被自动存储/携带在该session会话对象中
  • b3b252aa2fe644b3aef552a0ef159520.png

6.代理

需要绕过IP封锁、限制或进行大规模数据抓取时。代理服务器充当客户端和目标服务器之间的中介,可以隐藏你的真实IP地址,提供额外的安全性,有时还能加速请求

测试网址:

https://httpbin.org/get

现在基本上没有免费正常的代理可以被使用,我这个也是失败的。看到一个博主写建立代理ip池的python3之爬虫代理IP的使用+建立代理IP池_python爬虫代理池-CSDN博客,代码写的不错,爬取出来的ip也没什么能用的了。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 19:14
# @Author : George
import requests

url = "https://www.baidu.com/s?"

# 将爬虫程序伪装成为浏览器来发送请求
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}

# 设置代理
proxies = {
    "http": "154.203.132.49:8080",
    "https":"49.73.4.158:8089"
}
param = {
    "wd":"ip"
}

response = requests.get(url,headers=headers,proxies=proxies,verify=False,params=param)
with open("proxy.html","wt",encoding="utf-8") as f:
    f.write(response.text)

公司的联网也是需要代理的

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests

user = ""
passwd = "

url = "https://www.baidu.com/"

# 模拟浏览器
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 指定代理,不指定代理,无法上网
proxies = {
    "http": f"http://{user}:{passwd}@ip:port",
    "https": f"https://{user}:{passwd}@ip:port"
}

# 获取响应对象
response = requests.get(url, headers=header, proxies=proxies)

# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content

# 持久化储存
with open("baidu.html", "wb") as f:
    f.write(page_text)

7.异步爬虫(进程池)

本来是针对视频进行爬取的,但是ajax请求时的请求地址,看不懂mrd这个怎么来的,暂时跳过,我灰太狼一定会回来的!!!!!!!!!!!!

295b11c4eb6346fba989e6cdf2874352.png

d684c278cec246cf8b9eb62f1497660d.png

7.1 视频爬取

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 13:16
# @Author : George
import requests
import chardet
import os
from lxml import etree
"""
"https://www.pearvideo.com/category_1"
['video_1797596', 'video_1797399', 'video_1797404', 'video_1797260']

https://www.pearvideo.com/video_1797596

ajax请求
https://www.pearvideo.com/videoStatus.jsp?contId=1797596&mrd=0.4308675768914054
https://www.pearvideo.com/videoStatus.jsp?contId=1797399&mrd=0.6241695396585363
"""


class LiVideo(object):
    def __init__(self):
        #  定义输出ppt的文件夹
        self.extract_to_dir = f"./video"
        os.makedirs("./video", exist_ok=True)
        # 添加Referer防止反爬虫
        self.header = {
            "Referer": "https://www.pearvideo.com/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        # 编码
        self.encoding = None
        self.home_url = "https://www.pearvideo.com/category_1"

    def get_HTML(self, url_param):
        response = requests.get(url_param, headers=self.header)
        if response.status_code == 200:
            # 使用 chardet 自动检测编码
            self.encoding = chardet.detect(response.content)['encoding']
            response.encoding = self.encoding
            # 创建etree对象
            tree = etree.HTML(response.text)
            return tree

    def deal(self):
        home_tree = self.get_HTML(self.home_url)
        home_url_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/@href")
        name_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/div[2]/text()")
        for name,url in zip(name_list,home_url_list):
            url = "https://www.pearvideo.com/"+url
            detail_tree = self.get_HTML(url)


if __name__ == '__main__':
    vi = LiVideo()
    vi.deal()

 7.2 诗文异步爬取

只要将任务提交给线程池,线程池就会自动安排一个线程来执行这个任务.同过线程池提交的任务是异步提交.异步提交的结果就是不等待任务的执行结果,继续往下执行 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 14:26
# @Author : George
"""
https://www.gushiwen.cn/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb

第二层
https://www.gushiwen.cn/guwen/book_4e6b88d8a0bc.aspx
https://www.gushiwen.cn/guwen/book_a09880163008.aspx

第三層
https://www.gushiwen.cn/guwen/bookv_b630af160f65.aspx
"""
import os
import requests
import chardet
from lxml import etree
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Dict, List, Tuple

class NovelDownloader:
    def __init__(self):
        self.output_dir = "./novels"
        os.makedirs(self.output_dir, exist_ok=True)
        
        self.headers = {
            "Referer": "https://www.gushiwen.cn/guwen/Default.aspx?p=1",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        }
        self.base_url = "https://www.gushiwen.cn"
        self.home_url = f"{self.base_url}/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb"

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")
            
        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def get_chapter_details(self) -> Dict[str, Dict[str, str]]:
        """获取所有章节详情"""
        home_tree = self.get_html_tree(self.home_url)
        
        # 获取书籍链接和标题
        urls = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/@href")[1:3]
        titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")[1:3]
        
        book_details = {}
        for title, url in zip(titles, urls):
            detail_tree = self.get_html_tree(f"{self.base_url}{url}")
            
            # 获取章节链接和标题
            chapter_urls = [f"{self.base_url}{url}" for url in 
                          detail_tree.xpath("//*[@class='bookcont']/ul/span/a/@href")]
            chapter_titles = detail_tree.xpath("//*[@class='bookcont']/ul/span/a/text()")
            
            book_details[title] = dict(zip(chapter_titles, chapter_urls))
            
        return book_details

    def download_novel(self, title: str, chapters: Dict[str, str]):
        """下载单本小说"""
        output_path = os.path.join(self.output_dir, f"{title}.txt")
        print(f"开始下载 {title}".center(100, "="))
        
        with open(output_path, "w", encoding="utf-8") as f:
            for chapter_title, chapter_url in chapters.items():
                response = requests.get(chapter_url, headers=self.headers)
                soup = BeautifulSoup(response.text, "lxml")
                
                f.write(f"{chapter_title}:\n")
                for paragraph in soup.select('.contson > p'):
                    f.write(f"{paragraph.text}\n")
                f.write("\n\n")
                
                print(f"{chapter_title} 下载完成".center(20, "-"))
                
        print(f"{title} 下载完成".center(100, "="))

def main():
    start_time = time.time()
    
    downloader = NovelDownloader()
    books = downloader.get_chapter_details()
    
    # 使用线程池并行下载
    with ThreadPoolExecutor(max_workers=10) as pool:
        futures = [pool.submit(downloader.download_novel, title, chapters) 
                  for title, chapters in books.items()]
        
    print(f"总耗时: {time.time() - start_time:.2f}秒")

if __name__ == '__main__':
    main()

8.异步爬虫(协程)

  协程内容不赘述:CSDN

c62484262bf6460187337af4d2ab3996.png

基于单线程和协程,实现异步爬虫。 

8.1. 基于flask搭建服务器

简单的学习了一下,感觉不是很复杂,后面等着详细学习

261a05e6ee984879838d6b8015f8b299.png

pip3 install Flask -i  https://mirrors.aliyun.com/pypi/simple
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 21:47
# @Author : George
"""
# @app.route('/'): 这是一个装饰器,用于告诉 Flask 哪个 URL 应该触发下面的函数。
在这个例子中,它指定了根 URL(即网站的主页)。

# return 'Hello, World!': 这行代码是 hello_world 函数的返回值。
当用户访问根 URL 时,这个字符串将被发送回用户的浏览器。

"""
from flask import Flask, request, jsonify
import time


def get_client_ip(request):
    # 如果使用了反向代理,优先从 X-Forwarded-For 头部获取 IP 地址
    x_forwarded_for = request.headers.get('X-Forwarded-For', '').split(',')
    if x_forwarded_for:
        client_ip = x_forwarded_for[0]  # 通常第一个 IP 地址是客户端的真实 IP 地址
    else:
        client_ip = request.remote_addr
    return client_ip


app = Flask(__name__)


@app.route('/')
def index():
    return 'Hello, World!'


@app.route('/bobo')
def index_bobo():
    # 获取client的user_agent和referer
    user_agent = request.headers.get('User-Agent')
    user_referer = request.headers.get('Referer')
    print(user_agent, user_referer)
    # client ip地址没有获取到
    client_ip = get_client_ip(request)
    print({'client_ip': client_ip})
    time.sleep(3)
    return 'bobo'


@app.route('/jar')
def index_jar():
    time.sleep(3)
    return 'jar'


@app.route('/test')
def index_test():
    time.sleep(3)
    return 'test'


if __name__ == '__main__':
    app.run(threaded=True)

8.2 基于aiohttp实现异步爬虫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 22:08
# @Author : George
import asyncio
import requests
import time
import aiohttp
from threading import currentThread

"""
async def request(url): # 耗时: 9.054526805877686,requests是同步阻塞,必须替换为异步库提供的函数aiohttp
"""

urls = [
    "http://127.0.0.1:5000/jar",
    "http://127.0.0.1:5000/bobo",
    "http://127.0.0.1:5000/test"
]

start = time.time()


# 耗时: 9.054526805877686,requests是同步操作
# async def request(url):
#     print("开始下载", url)
#     response = requests.get(url=url)
#     print(response.text)
#     print("下载结束", url)
#     print("------------")

async def request_2(url):
    print("开始下载", url, currentThread())
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
        "Referer": "https://movie.douban.com/explore"
    }
    # 设置代理
    proxy = "http://8.220.204.215:8008"

    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            text = await response.text()
            print(text)
    print("下载结束", url)


async def main():
    tasks = []
    for url in urls:
        tasks.append(asyncio.create_task(request_2(url)))
    await asyncio.wait(tasks)


asyncio.run(main())

end = time.time()
print("耗时:", end - start)

9.selenuim使用

  • 便捷的获取网页中间动态加载的数据
  • 边界实现模拟登录

selenuim:基于浏览器自动化的一个模块

入门指南 | Selenium

b2505f3011124e5bb0606b9d5922d52f.png

基于selenium实现浏览器自动化,自动输入并播放动漫核心代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/1/1 16:04
# @Author : George
"""
在 Selenium 4 中,不再直接在 webself.driver.Chrome 中设置驱动程序路径,而是通过引入 Service 对象来设置。
这样可以避免弃用警告,并确保驱动程序的正确加载
"""
import os.path

from selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
from lxml import etree
import requests
import chardet
import json
from log_test import logger

class VideoAutoPlay(object):
    def __init__(self):
        self.driver = {}
        self.count_num = 1
        self.movies_dict =None
        self.home_url = "https://www.agedm.org/"
        self.headers = {
            "Referer": "https://www.agedm.org/",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0"
        }

    def get_html_tree(self, url: str) -> etree._Element:
        """获取页面HTML并返回etree对象"""
        response = requests.get(url, headers=self.headers)
        if response.status_code != 200:
            raise Exception(f"Failed to get {url}, status code: {response.status_code}")

        encoding = chardet.detect(response.content)['encoding']
        response.encoding = encoding
        return etree.HTML(response.text)

    def movies_input(self, movie_name, n):
        try:
            if not self.driver:
                self.driver = self.setup_driver()

            self.driver.get(self.home_url)
            # 查找搜索框元素
            search_box = self.driver.find_element(By.ID, "query")

            # 输入搜索内容
            search_box.send_keys(movie_name)

            # 提交搜索表单
            search_box.send_keys(Keys.RETURN)

            # 等待搜索结果加载
            # WebDriverWait(self.driver, 10).until(
            #     EC.presence_of_element_located((By.CLASS_NAME, "content_left"))
            # )
            # 二级请求
            self.driver.get(f"https://www.agedm.org/search?query={movie_name}")

            # 查找在线播放btn
            # wait = WebDriverWait(self.driver, 10)  # 10秒超时
            # # titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")
            button = self.driver.find_element(By.XPATH, "//*[@class='video_btns']/a[1]")
            # print(button)
            button.click()
            WebDriverWait(self.driver, 30).until(
                EC.presence_of_element_located((By.CLASS_NAME, "tab-content"))
            )
            # # 等待搜索结果加载
            detail_url = self.driver.current_url
            tree = self.get_html_tree(detail_url)
            url_dict = {}
            titles = tree.xpath("//*[@class='video_detail_episode']/li/a/text()")[n - 1:]
            urls = tree.xpath("//*[@class='video_detail_episode']/li/a/@href")[n - 1:]
            for title, url in zip(titles, urls):
                url_dict[title] = url
            # print(url_dict)
            return url_dict
        except Exception as e:
            logger.error(f"搜索过程发生错误: {str(e)}")
            if self.driver:
                self.driver.quit()
                self.driver = None
            return {}

    def setup_driver(self):
        # 优化视频播放性能的设置
        options = webdriver.EdgeOptions()
        options.add_argument('--disable-gpu')  # 禁用GPU加速
        options.add_argument('--no-sandbox')  # 禁用沙箱模式
        options.add_argument('--disable-dev-shm-usage')  # 禁用/dev/shm使用
        options.add_argument('--disable-software-rasterizer')  # 禁用软件光栅化
        options.add_argument('--disable-extensions')  # 禁用扩展
        options.add_argument('--disable-infobars')  # 禁用信息栏
        options.add_argument('--autoplay-policy=no-user-gesture-required')  # 允许自动播放
        options.add_experimental_option('excludeSwitches', ['enable-automation'])  # 禁用自动化提示
        options.add_experimental_option("useAutomationExtension", False)  # 禁用自动化扩展

        # 设置正确的驱动路径
        service = EdgeService(executable_path='./msedgedriver.exe')
        return webdriver.Edge(options=options, service=service)

    def video_player(self, movie_name, n):
        try:
            url_dict = self.movies_input(movie_name, n)
            if not url_dict:
                logger.error("未找到可播放的视频链接")
                return

            if not self.driver:
                self.driver = self.setup_driver()

            for title, url in url_dict.items():
                try:
                    # 打开网站
                    self.driver.get(url)
                    # 切换到视频iframe
                    self.driver.switch_to.frame("iframeForVideo")
                    logger.info(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频元素加载
                    wait = WebDriverWait(self.driver, 20)
                    video_element = wait.until(
                        EC.presence_of_element_located((By.CLASS_NAME, "art-video"))
                    )
                    logger.info(f"找到视频元素: {video_element}")
                    logger.info(f"{movie_name}:{title}集开始播放".center(10, "="))
                    # 等待视频加载完成后执行全屏
                    page_stage = True
                    while page_stage:
                        paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                        print("paused_state", paused_state)
                        if not paused_state:
                            break
                    # 双击使得视频全屏显示
                    ActionChains(self.driver).double_click(video_element).perform()
                    time.sleep(3)
                    # 视频单击播放
                    paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)
                    if paused_state:
                        logger.info(f"{movie_name}:{title}触发双击全屏")
                        ActionChains(self.driver).click(video_element).perform()
                    # # 点击视频开始播放
                    # action = ActionChains(self.driver)
                    # action.move_to_element_with_offset(video_element, 100, 100).click().perform()
                    print(f"点击播放时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")

                    # 等待视频播放完成
                    time.sleep(60*21)
                    n = n + 1
                    self.update_movie_count(movie_name,n)
                    logger.info(f"第{title}集播放完毕".center(10, "="))

                except Exception as e:
                    logger.error(f"播放第{title}集时发生错误: {str(e)}")
                    continue

        except Exception as e:
            logger.error(f"视频播放总体发生错误: {str(e)}")
        finally:
            if self.driver:
                self.driver.quit()
                self.driver = None

    def __del__(self):
        """确保在对象销毁时关闭driver"""
        if self.driver:
            try:
                self.driver.quit()
            except:
                pass

    def count_read(self, movies, n=1):
        # 将此文件作为播放内容的缓存
        if not os.path.exists("./count.json"):
            with open("./count.json", "w", encoding="utf-8") as f:
                self.movies_dict = {movies:n}
                json.dump({movies: n}, f, ensure_ascii=False, indent=4)
            return n
        else:
            with open("./count.json", "r", encoding="utf-8") as f:
                self.movies_dict = json.load(f)
            if not self.movies_dict.get(str(movies)):  # 读取不到movies
                self.movies_dict.update({str(movies):n})
                with open("./count.json", "w", encoding="utf-8") as f:
                    json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)
                return n
            else:  # 读取到了movie
                self.count_num = self.movies_dict[str(movies)]
                return self.count_num

    # update movies count
    def update_movie_count(self, movies, n):
        with open("./count.json", "w", encoding="utf-8") as f:
            self.movies_dict[str(movies)] = n
            json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)


if __name__ == "__main__":
    video = VideoAutoPlay()
    movie = "火影忍者 疾风传"
    # movie = "神之塔 第二季"
    count_num = video.count_read(movie, 1)
    # video.update_movie_count(movie, 4)
    video.video_player(movie, count_num)

10.scrapy框架

破电脑太老,安装不了!!

d684c278cec246cf8b9eb62f1497660d.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/950204.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大语言模型训练所需的最低显存,联邦大语言模型训练的传输优化技术

联邦大语言模型训练的传输优化技术 目录 联邦大语言模型训练的传输优化技术大语言模型训练所需的最低显存大语言模型训练所需的最低显存 基于模型微调、压缩和分布式并行处理的方法,介绍了相关开源模型及技术应用 核心创新点 多维度优化策略:综合运用基于模型微调、模型压缩和…

主机A与主机B建立TCP连接的三次握手过程

&#xff08; 1 &#xff09;主机 A 的 TCP 向主机 B 发出连接请求 SYN 报文段&#xff08;第一次握手&#xff09;。&#xff08; 1 分&#xff09; &#xff08; 2 &#xff09;一旦包含 SYN 报文段的 IP 数据报到达主机 B &#xff0c; SYN 报文段被从数据报…

SpringCloud系列教程:微服务的未来(六)docker教程快速入门、常用命令

对于开发人员和运维工程师而言&#xff0c;掌握 Docker 的基本概念和常用命令是必不可少的。本篇文章将带你快速入门 Docker&#xff0c;并介绍一些最常用的命令&#xff0c;帮助你更高效地进行开发、测试和部署。 目录 前言 快速入门 docker安装 配置镜像加速 部署Mysql …

Express 加 sqlite3 写一个简单博客

例图&#xff1a; 搭建 命令&#xff1a; 前提已装好node.js 开始创建项目结构 npm init -y package.json:{"name": "ex01","version": "1.0.0","main": "index.js","scripts": {"test": &q…

C++:字符数组

一、字符数组介绍 数组的元素如果是字符类型&#xff0c;这种数组就是字符数组&#xff0c;字符数组可以是一维数组&#xff0c;可以是二维数组 &#xff08;多维数组&#xff09;。我们接下来主要讨论的是一维的字符数组。 char arr1[5]; //⼀维字符数组 char arr2[3][5];//⼆…

基于SpringBoot实现的保障性住房管理系统

&#x1f942;(❁◡❁)您的点赞&#x1f44d;➕评论&#x1f4dd;➕收藏⭐是作者创作的最大动力&#x1f91e; &#x1f496;&#x1f4d5;&#x1f389;&#x1f525; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;欢迎留言讨论 &#x1f525;&#x1f525;&…

分享3个国内使用正版GPT的网站【亲测有效!2025最新】

1. molica 传送入口&#xff1a;https://ai-to.cn/url/?umolica 2. 多帮AI 传送入口&#xff1a;https://aigc.openaicloud.cn?inVitecodeMYAAGGKXVK 3. 厉害猫 传送入口&#xff1a;https://ai-to.cn/url/?ulihaimao

LabVIEW瞬变电磁接收系统

利用LabVIEW软件与USB4432采集卡开发瞬变电磁接收系统。系统通过改进硬件配置与软件编程&#xff0c;解决了传统仪器在信噪比低和抗干扰能力差的问题&#xff0c;实现了高精度的数据采集和处理&#xff0c;特别适用于地质勘探等领域。 ​ 项目背景&#xff1a; 瞬变电磁法是探…

CM3/4启动流程

CM3/4启动流程 1. 启动模式2. 启动流程 1. 启动模式 复位方式有三种&#xff1a;上电复位&#xff0c;硬件复位和软件复位。 当产生复位&#xff0c;并且离开复位状态后&#xff0c;CM3/4 内核做的第一件事就是读取下列两个 32 位整数的值&#xff1a; 从地址 0x0000 0000 处取…

快手短剧播放器uniapp如何引入与对接?

uniApp前端微短剧项目开源分享 开源地址&#xff1a;git开源下载地址 文章目录 快手短剧播放器uniapp如何引入与对接&#xff1f;1.引入短剧播放器2.创建文件kscomponents组件3.local-stream.js文件说明4.用户行为事件4.local-stream.ksml文件参考如下 快手短剧播放器uniapp如何…

.NET AI 开发人员库 --AI Dev Gallery简单示例--问答机器人

资源及介绍接上篇 nuget引用以下组件 效果展示&#xff1a; 内存和cpu占有&#xff1a; 代码如下&#xff1a;路径换成自己的模型路径 模型请从上篇文尾下载 internal class Program{private static CancellationTokenSource? cts;private static IChatClient? model;privat…

如何构建多层决策树

构建一颗多层的决策树时&#xff0c;通过递归选择最佳划分特征&#xff08;依据 信息增益 或 基尼系数&#xff09;对数据集进行划分&#xff0c;直到满足停止条件&#xff08;例如叶节点纯度达到要求或树的深度限制&#xff09;。以下是基于 信息增益 和 基尼系数 的递推公式和…

VSCode 使用鼠标滚轮控制字体

一、 文件 | 首选项 | 设置 二、单击在 settings.json中编辑 "editor.mouseWheelZoom": true 注注注意&#xff1a;保存哦&#xff01;ctrlS 三、测试 按住ctrl鼠标滚轮&#xff0c;控制字体大小

十年后LabVIEW编程知识是否会过时?

在考虑LabVIEW编程知识在未来十年内的有效性时&#xff0c;我们可以从几个角度进行分析&#xff1a; ​ 1. 技术发展与软件更新 随着技术的快速发展&#xff0c;许多编程工具和平台不断更新和改进&#xff0c;LabVIEW也不例外。十年后&#xff0c;可能会有新的编程语言或平台…

注册中心如何选型?Eureka、Zookeeper、Nacos怎么选

这是小卷对分布式系统架构学习的第9篇文章&#xff0c;第8篇时只回答了注册中心的工作原理的内容&#xff0c;面试官的第二个问题还没回答&#xff0c;今天再来讲讲各个注册中心的原理&#xff0c;以及区别&#xff0c;最后如何进行选型 上一篇文章&#xff1a;如何设计一个注册…

C++ 复习总结记录三

C 复习总结记录三 主要内容 1、类的六个默认成员函数 2、构造函数 3、析构函数 4、拷贝构造函数 5、赋值运算符重载 6、const 成员函数 7、取地址及 const 取地址操作符重载 一 类的六个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。空类中并不是…

【简博士统计学习方法】第1章:4. 模型的评估与选择

4. 模型的评估与选择 4.1 训练误差与测试误差 假如存在样本容量为 N N N的训练集&#xff0c;将训练集送入学习系统可以训练学习得到一个模型&#xff0c;我们将这么模型用决策函数的形式表达&#xff0c;也就是 y f ^ ( x ) y\hat{f}(x) yf^​(x)&#xff0c;关于模型的拟合…

计算机网络 (30)多协议标签交换MPLS

前言 多协议标签交换&#xff08;Multi-Protocol Label Switching&#xff0c;MPLS&#xff09;是一种在开放的通信网上利用标签引导数据高速、高效传输的新技术。 一、基本概念 MPLS是一种第三代网络架构技术&#xff0c;旨在提供高速、可靠的IP骨干网络交换。它通过将IP地址映…

【Java】JVM内存相关笔记

Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域有各自的用途&#xff0c;以及创建和销毁的时间&#xff0c;有的区域随着虚拟机进程的启动而一直存在&#xff0c;有些区域则是依赖用户线程的启动和结束而建立和销毁。 程序计数器&am…

鸿蒙 ArkUI实现地图找房效果

常用的地图找房功能&#xff0c;是在地图上添加区域、商圈、房源等一些自定义 marker&#xff0c;然后配上自己应用的一些筛选逻辑构成&#xff0c;在这里使用鸿蒙 ArkUI 简单实现下怎么添加区域/商圈、房源等 Marker. 1、开启地图服务 在华为开发者官网&#xff0c;注册应用&…