爬虫之多线程-了解
单线程爬虫的问题
- 因为爬虫多为IO密集型的程序,而IO处理速度并不是很快,因此速度不会太快
- 如果IO卡顿,直接影响速度
解决方案
考虑使用多线程、多进程
原理:
爬虫使用多线程来处理网络请求,使用线程来处理URL队列中的url,然后将url返回的结果保存在另一个队列中,其它线程在读取这个队列中的数据,然后写到文件中 。
主要组成部分
URL队列和结果队列
将将要爬去的url放在一个队列中,这里使用标准库Queue。访问url后的结果保存在结果队列中
初始化一个URL队列
from queue import Queue
urls_queue = Queue()
out_queue = Queue()
类包装
使用多个线程,不停的取URL队列中的url,并进行处理:
import threading
class ThreadCrawl(threading.Thread):
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
item = self.queue.get()
如果队列为空,线程就会被阻塞,直到队列不为空。处理队列中的一条数据后,就需要通知队列已经处理完该条数据
函数包装
from threading import Thread
def func(args)
pass
if __name__ == '__main__':
info_html = Queue()
t1 = Thread(target=func,args=
(info_html,))
线程池
# 简单往队列中传输线程数
import threading
import time
import queue
class Threadingpool():
def __init__(self,max_num = 10):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def getthreading(self):
return self.queue.get()
def addthreading(self):
self.queue.put(threading.Thread)
def func(p,i):
time.sleep(1)
print(i)
p.addthreading()
if __name__ == "__main__":
p = Threadingpool()
for i in range(20):
thread = p.getthreading()
t = thread(target = func, args =
(p,i))
t.start()
Queue模块中的常用方法
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]])获取队列,timeout等待时间
- Queue.get_nowait() 相当Queue.get(False)
- Queue.put(item) 写入队列,timeout等待时间
- Queue.put_nowait(item) 相当Queue.put(item, False)
- Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一
- 个信号
- Queue.join() 实际上意味着等到队列为空,再执行别的操作
爬虫之多进程-了解
multiprocessing是python的多进程管理包,和threading.Thread类似
multiprocessing模块
multiprocessing模块可以让程序员在给定的机器上充分的利用CPU
在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法
from multiprocessing import Process
def func(name):
print('hello', name)
if __name__ == "__main__":
p = Process(target=func,args=('sxt',))
p.start()
p.join() # 等待进程执行完毕
Manager类,实现数据共享
在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据,可以使用由Manager()返回的manager提供list, dict, Namespace, Lock, RLock,
Semaphore, BoundedSemaphore, Condition, Event, Barrier,Queue, Value and Array类型的支持
from multiprocessing import
Process,Manager,Lock
def print_num(info_queue,l,lo):
with lo:
for n in l:
info_queue.put(n)
def updata_num(info_queue,lo):
with lo:
while not info_queue.empty():
print(info_queue.get())
if __name__ == '__main__':
manager = Manager()
into_html = manager.Queue()
lock = Lock()
a = [1, 2, 3, 4, 5]
b = [11, 12, 13, 14, 15]
p1 = Process(target=print_num,args=
(into_html,a,lock))
p1.start()
p2 = Process(target=print_num,args=
(into_html,b,lock))
p2.start()
p3 = Process(target=updata_num,args=
(into_html,lock))
p3.start()
p1.join()
p2.join()
p3.join()
from multiprocessing import Process
from multiprocessing import Manager
import time
from fake_useragent import UserAgent
import requests
from time import sleep
def spider(url_queue):
while not url_queue.empty():
try:
url = url_queue.get(timeout = 1)
# headers = {'UserAgent':UserAgent().chrome}
print(url)
# resp =
requests.get(url,headers = headers)
# 处理响应结果
# for d in
resp.json().get('data'):
# print(f'tid:{d.get("tid")}
topic:{d.get("topicName")} content:
{d.get("content")}')
sleep(1)
# if resp.status_code == 200:
# print(f'成功获取第{i}页数据')
except Exception as e:
print(e)
if __name__ == '__main__':
url_queue = Manager().Queue()
for i in range(1,11):
url =
f'https://www.hupu.com/home/v1/news?pageNo=
{i}&pageSize=50'
url_queue.put(url)
all_process = []
for i in range(3):
p1 = Process(target=spider,args=
(url_queue,))
p1.start()
all_process.append(p1)
[p.join() for p in all_process]
进程池的使用
- 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
- 进程池中有两个方法:
- apply同步执行-串行
- apply_async异步执行-并行
from multiprocessing import Pool,Manager
def print_num(info_queue,l):
for n in l:
info_queue.put(n)
def updata_num(info_queue):
while not info_queue.empty():
print(info_queue.get())
if __name__ == '__main__':
html_queue =Manager().Queue()
a=[11,12,13,14,15]
b=[1,2,3,4,5]
pool = Pool(3)
pool.apply_async(func=print_num,args=
(html_queue,a))
pool.apply_async(func=print_num,args=
(html_queue,b))
pool.apply_async(func=updata_num,args=
(html_queue,))
pool.close() #这里join一定是在close之后,且必须要加join,否则主进程不等待创建的子进程执行完毕
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
from multiprocessing import Pool,Manager
from time import sleep
def spider(url_queue):
while not url_queue.empty():
try:
url = url_queue.get(timeout = 1)
print(url)
sleep(1)
except Exception as e:
print(e)
if __name__ == '__main__':
url_queue = Manager().Queue()
for i in range(1,11):
url =
f'https://www.hupu.com/home/v1/news?pageNo=
{i}&pageSize=50'
url_queue.put(url)
pool = Pool(3)
pool.apply_async(func=spider,args=
(url_queue,))
pool.apply_async(func=spider,args=
(url_queue,))
pool.apply_async(func=spider,args=
(url_queue,))
pool.close()
pool.join()
爬虫之协程
网络爬虫速度效率慢,多部分在于阻塞IO这块(网络/磁盘)。在阻塞时,CPU的中内核是可以处理别的非IO操作。因此可以考虑使用协程来提升爬虫效率,这种操作的技术就是协程.
协程一种轻量级线程,拥有自己的寄存器上下文和栈,本质是一个进程
相对于多进程,无需线程上下文切换的开销,无需原子操作锁定及同步的开销
简单的说就是让阻塞的子程序让出CPU给可以执行的子程序
一个进程包含多个线程,一个线程可以包含多个协程
多个线程相对独立,线程的切换受系统控制。 多个协程也相对独立,但是其切换由程序自己控制
安装
pip install aiohttp
官网:https://docs.aiohttp.org/en/stable/
常用方法
属性或方法 | 功能 |
aiohttp.ClientSession() | 获取客户端函数 |
session.get(url) | 发送get请求 |
seesion.post(url) | 发送post请求 |
resp.status | 获取响应状态码 |
resp.url 获 | 取响应url地址 |
resp.cookies | 获取响应cookie内容 |
resp.headers | 获取响应头信息 |
resp.read() | 获取响应bytes类型 |
resp.text() | 获取响应文本内容 |
import aiohttp
import asyncio
async def first():
async with aiohttp.ClientSession() as
session: # aiohttp.ClientSession() ==
import requests 模块
async with
session.get('http://httpbin.org/get') as
resp:
rs = await resp.text()
print(rs)
headers = {'User-Agent':'aaaaaa123'}
async def test_header():
async with
aiohttp.ClientSession(headers= headers) as
session: # aiohttp.ClientSession() ==
import requests 模块
async with
session.get('http://httpbin.org/get') as
resp:
rs = await resp.text()
print(rs)
async def test_params():
async with
aiohttp.ClientSession(headers= headers) as
session: # aiohttp.ClientSession() ==
import requests 模块
async with
session.get('http://httpbin.org/get',params=
{'name':'bjsxt'}) as resp:
rs = await resp.text()
print(rs)
async def test_cookie():
async with
aiohttp.ClientSession(headers=
headers,cookies={'token':'sxt123id'}) as
session: # aiohttp.ClientSession() ==
import requests 模块
async with
session.get('http://httpbin.org/get',params=
{'name':'bjsxt'}) as resp:
rs = await resp.text()
print(rs)
async def test_proxy():
async with
aiohttp.ClientSession(headers=
headers,cookies={'token':'sxt123id'}) as
session: # aiohttp.ClientSession() ==
import requests 模块
async with
session.get('http://httpbin.org/get',params=
{'name':'bjsxt'},proxy =
'http://name:pwd@ip:port' ) as resp:
rs = await resp.text()
print(rs)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_cookie())