环境:
python 3.8
代码:
# 爬取博客园内容
# https://www.cnblogs.com/
import re
from lxml import etree
import requests
import json
import threading
from queue import Queue
import pymysql
import time
class HeiMa:
def __init__(self):
# 请求头
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) '
'AppleWebKit/537.36 (KHTML, like Gecko)'
'Chrome/90.0.4430.212 Safari/537.36'}
self.url_queue = Queue() # 网址队列
self.html_queue = Queue() # 网页源代码队列
self.content_queue = Queue() # 网页数据队列
# 创建连接对象
self.conn= pymysql.connect(
host='127.0.0.1', # 本机就写:localhost
port=3306, # 要连接到的数据库端口号,MySQL是3306
user='root', # 数据库的用户名
password='root', # 数据库的密码
database='boke', # 要操作的数据库
charset='utf8' # 码表
)
# 创建游标 - -可执行SQL语句的对象
self.cursor = self.conn.cursor()
#开始事务
self.conn.begin()
def get_url_queue(self):
url_temp = "https://www.cnblogs.com/sitehome/p/{}"
# 构造请求URL
url_list = [url_temp.format(i) for i in range(1, 100)]
for url in url_list:
# print(url)
self.url_queue.put(url) # 将构造的请求URL添加到网址队列中
def get_html_queue(self):
while True:
# 从网址队列中取出请求URL
url = self.url_queue.get()
html_source_page = requests.get(url, headers=self.headers).text
self.html_queue.put(html_source_page)
# 向网址队列发送完成信号
self.url_queue.task_done()
def parse_html(self):
while True:
content_list = []
html = self.html_queue.get()
html_str = etree.HTML(html)
node_list = html_str .xpath('//div[@id="post_list"]')
title_num = 0
for node in node_list:
# 文章标题
title = node.xpath('./article/section/div/a/text()')[0]
# 文章链接
url = node.xpath('./article/section/div/a/@href')[0]
# 文章作者
author = node.xpath(
'./article/section/footer/a[@class="post-item-author"]/span/text()')[0]
# 发布时间(具体日期)
release_time = node.xpath(
'./article/section/footer/span[@class="post-meta-item"]/span/text()')[0]
item = {
"文章标题": title,
"文章链接": url,
"文章作者": author,
'发布时间': release_time,
}
content_list.append(item)
title_num += 1
self.content_queue.put(content_list)
self.html_queue.task_done()
def save_data(self):
while True:
content_list = self.content_queue.get()
with open("thread-heima.json", "a+", encoding='utf-8')as f:
f.write(json.dumps(content_list, ensure_ascii=False, indent=2))
self.content_queue.task_done()
def save_data_mysql(self):
while True:
content_list = self.content_queue.get()
for item in content_list:
pattern = "[\',\"]"
title = re.sub(pattern, '-', item['文章标题'])
# title = item['文章标题']
url = item['文章链接']
author = re.sub(pattern, '-', item['文章作者'])
# author = item['文章作者']
release_time = item['发布时间']
# 添加到数据库
sql=f'insert into news VALUES (NULL,"{title}","{url}","{author}","{release_time}")'
print(sql)
self.cursor.execute(sql)
self.content_queue.task_done()
def run(self):
thread_list = []
# 构造URL地址线程
t_url = threading.Thread(target=self.get_url_queue)
thread_list.append(t_url)
# 获取网页源代码
for page in range(9):
t_content = threading.Thread(target=self.get_html_queue)
thread_list.append(t_content)
# 解析网页数据队列
for j in range(9):
t_content = threading.Thread(target=self.parse_html)
thread_list.append(t_content)
# t_save = threading.Thread(target=self.save_data)
t_save = threading.Thread(target=self.save_data_mysql)
thread_list.append(t_save)
for t in thread_list:
t.setDaemon(True)
t.start()
for q in [self.url_queue, self.html_queue, self.content_queue]:
# join():在子线程完成运行之前,这个子线程的父类进程一直被阻塞.
# 让join()的子线程先执行,暂时不执行主线程的代码,主线程一直被阻塞,直到join()的子线程完成之后才开始执行主线程
# print(q)
q.join()
if __name__ == '__main__':
heima = HeiMa()
heima.run()
try:
# 提交事务,把插入的数据写入到数据库
heima.conn.commit()
except:
# 发生异常就回滚事务,保证所有数据都不插入,保证数据的一致性
heima.conn.rollback()
# 释放数据库资源
heima.cursor.close()
heima.conn.close()
print("抓取完毕!")