项目地址:https://gitee.com/wyu_001/myscrapy
我们继续完成返回的处理类 MyResponse的实现
先上类图:
主要功能:
json() 方法解析返回的json格式内容,转换为 python 的json对象
xpath()方法解析返回的html格式的内容,使用etree转换成MyNode对象
follow() 方法解析需要继续访问的url;
get_xpath_node() : 获取节点下(包括子节点)的文本内容
get_xpath_text(): 获取节点下的文本内容
代码如下:
# !/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@author: spring.wang
@license:
@contact: wyu_01@163.com
@software: myscrapy
@file: myresponse.py
@time: 2022/3/22 8:48
@description:
'''
from lxml import etree
from utils.json import ConvertFromJson
from utils.tool import reduce,ltos
from urllib.parse import urljoin
from common.myobject import MyObject
from common.log import loger
import traceback
from config import setting
from common.myrequest import MyRequest
from common.mynode import MyNode
class MyResponse(MyObject):
def __init__(self,response = None,content = None):
'''
:param response-> response returned by request
:param content-> content of response
'''
self._response = response
self.request_url = self._response.url
self._content = content
self._etree = None
def xpath(self,path,**kwargs):
'''
:param path-> xpath
:param **kwargs dict parameter for etree's xpath parameter
:return MyNode Object List or MyNode or bool or String
:rtype
'''
if self._etree is not None:
pass
else:
self._etree= etree.HTML(self._content)
nodes = self._etree.xpath(path,**kwargs)
if isinstance(nodes,str) or isinstance(nodes,bool):
return nodes
_nodes = []
if isinstance(nodes,list):
if len(nodes):
if isinstance(nodes[0],str):
return MyNode(nodes)
else:
for node in nodes:
_nodes.append(MyNode(node))
return _nodes
else:
return MyNode([])
# return self._etree.xpath(path,**kwargs)
def json(self):
'''
:return json text convert into dict object
:rtype dict object
'''
json_dic = ConvertFromJson(self._content)
return json_dic
def get_xpath_text(self,path,node= None,**kwargs):
'''
:param path-> xpath
:param node-> element of node in document
:return text of element of node
'''
if self._etree is not None:
pass
else:
self._etree = etree.HTML(self._content)
if node is not None:
return ltos(node.xpath(path))
return ltos(self._etree.xpath(path,**kwargs))
def get_xpath_node(self,path,node=None,**kwargs):
'''
:param path-> xpath
:param node-> element of node in document
:return text of node (include child node) was handled by reduce function
excluding '\n',{},<>,//
'''
content = ''
if self._etree is not None:
pass
else:
self._etree = etree.HTML(self._content)
if node is not None:
for u in node.xpath(path):
content = content + u.xpath('string(.)')
return reduce(content)
else:
for u in self._etree.xpath(path):
content = content + u.xpath('string(.)')
return reduce(content)
def get_xpath_node1(self,path,node=None,**kwargs):
'''
:param path-> xpath
:param node-> element of node in document
:return text of node (include child node) was handled by reduce function
excluding '\n',{},<>,//
'''
content = ''
if self._etree is not None:
pass
else:
self._etree = etree.HTML(self._content)
if node is not None:
content = node.xpath(path+'//*[name() != "script" and name() != "style"]/text()')
return reduce(ltos(content))
else:
content = self._etree.xpath(path+'//*[name() != "script" and name() != "style"]/text()')
return reduce(ltos(content))
def base_url_join(self,link,base_url=None):
'''
:param link-> url of http
:param base_url-> base url of http
'''
if base_url and link:
return urljoin(base_url,link)
if link:
return urljoin(self._response.url,link)
else:
return link
def follow(self, url=None, callback=None, cb_kwargs=None, **rq_kwargs):
'''
:param url -> request http url
:param callback-> callback method's name
:param cb_kwargs-> callback method's parameters , dict object key is parameter's name
values is instance of parameter
:param **re_kwargs -> parameter of requests.request
'''
if cb_kwargs is None:
cb_kwargs = {}
if url is None:
raise ValueError("url cannot be empty")
if callback is not None and not callable(callback):
raise TypeError(f'callback must be a callable, got {type(callback).__name__}')
url = urljoin(self._response.url, url)
if setting.URL_DUPLICATE:
if url in self._key_url:
loger.info(f'request url:{url} is duplicate ')
return
loger.info(f'request url:{url}')
response,content = MyRequest.request(url, **rq_kwargs)
if response is None:
loger.info("response is None,following request failed!!")
return
myresponse = MyResponse(response,content)
'''
if 'response' in inspect.getfullargspec(callback)[0] :
kwargs={'response':response}
for k,v in cb_kwargs.items():
kwargs.setdefault(k, v)
'''
try:
callback(myresponse, **cb_kwargs)
except:
loger.error(traceback.format_exc())