「Python|aiohttp|并发与协程」将HTTP请求提速成百上千倍!一次性掌握把requests请求改成协程的通用方法

本文主要介绍如何通过使用aiohttp库将同步的http请求改成异步方式请求,从而降低等待网络IO过程中时间和计算资源的浪费。
主要包括如何将常见的requests请求改用aiohttp异步执行以及如何将异步的批量请求方法封装成普通方法/同步调用方式,给业务模块调用。

文章目录

  • 场景说明
    • 大量数据请求场景
    • 用协程解决网络请求密集的任务
  • 解决方案
    • 使用requests库发送请求
    • 异步请求代码的结构
    • 批量请求的同步方式和异步方式代码对照
    • 代码结构差异
      • 导包差异
      • send_single_request差异
      • send_batch_requests差异
      • 外部业务模块调用差异
    • 差异说明
    • 异步方式返回结果无序的解决方案
      • 方式一:按请求顺序排序(请求顺序作为返回数据标识)
      • 方式二:自定义返回数据标识,返回一个键值对
  • 源代码

场景说明

大量数据请求场景

在爬虫通过定制化接口获取数据或者后端处理大量数据请求时,由于数据量较大,所以如果单纯使用同步方式的requests请求,对单台机器的利用率不高,数据处理的速度较慢,同时如果要提速需要增加机器会导致成本提高。

在这种情况下,使用并发是一种成本比较低的方式。在并发中多进程的并发是用于处理计算逻辑较多计算量较大的计算密集型任务,数据请求属于网络IO等待时间占处理时间的大头,不属于计算密集型,而是属于IO密集型任务。

用协程解决网络请求密集的任务

处理IO密集型任务,使用多线程或协程来实现效率的提升。而由于python默认的cpython解释器有全局解释器锁GIL的限制,使得python的多线程会有效率提高,但是无法完全发挥多线程的优势。

而协程在处理IO密集型任务上相当合适,但是在写法上跟我们习惯的同步方式执行的代码写法有些差别。

解决方案

在python中,将http请求使用异步方式执行,主要是使用aiohttp库来快速实现。

我们接下来将明确把一个同步方式的requests请求有哪几个部分,然后通过将各个部分替换成异步的aiohttp请求,来展示将同步请求改成异步请求的步骤。

使用requests库发送请求

最简单的请求代码如下:

import requests

response = requests.get("http://httpbin.org/get")

当然我们在业务中会需要使用到其他参数来满足我们的业务需求,所以可以考虑请求分为三个部分:

  • 构造请求相关的参数/值
  • 发送请求(考虑复用与目标服务器的连接,会需要使用session发送请求)
  • 处理返回结果

代码结构如下:

"""请求过程分为三个部分:
1. 构造参数
2. 发送请求
3. 处理返回结果
"""

import requests

"""构造参数, 包括但不限于:
- headers
- url
- parameters
- proxy
- post请求的body
- ......
"""
headers = {
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Cache-Control': 'no-cache',
    'Pragma': 'no-cache',
    'Proxy-Connection': 'keep-alive',
    'Referer': 'http://httpbin.org/',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
    'accept': 'application/json',
}
url = 'http://httpbin.org/get'

"""
# 新建会话,使用会话发送请求
# 也可以直接requests.get()进行请求
"""
session = requests.session()
response = session.get(url, headers=headers, verify=False)

"""处理返回结果
主要是对返回结果的状态,返回值进行条件检查
在满足某些条件时执行对应的处理逻辑
"""
print(response.status_code)
if response.status_code == 200:
    print(response.json())

异步请求代码的结构

关于异步请求,我们只需要知道下面几点:

  • 同步函数就是用def定义的函数
  • 异步函数就是用async def定义的函数,同时函数体里面会包含await语句
  • 同步请求的逻辑是用循环语句一次接着一次进行"发起请求,等待返回,处理返回结果"的过程
  • 异步请求的逻辑是生成不同的请求任务,然后(几乎)同时发出这批请求,然后请求返回的先后顺序,依次处理各个请求的返回结果
  • 同步请求是一个def 函数传入所有要请求的数据,然后依次传给另一个def函数去执行单个请求的发送和处理逻辑
  • 异步请求是一个async def函数接收所有要请求的数据,然后将各个数据传入另一个async def函数来生成请求任务并批量执行这些请求任务。异步需要额外的一个def函数来启动这个外层的async def 函数

批量请求的同步方式和异步方式代码对照

我们分别展示一下完整的代码结构,然后解释二者的差异。
假设我们批量请求的requests代码结构如下:

"""请求分为三个部分:
1. 构造参数
2. 发送请求
3. 处理返回结果

"""

import requests


def send_single_request(session, parameters: dict):
    # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body
    headers = {
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Cache-Control': 'no-cache',
        'Pragma': 'no-cache',
        'Proxy-Connection': 'keep-alive',
        'Referer': 'http://httpbin.org/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
        'accept': 'application/json',
    }
    url = 'http://httpbin.org/get'

    # 新建会话,使用会话发送请求
    # (也可以直接requests.get()进行请求)
    response = session.get(url, params=parameters, headers=headers)

    # 处理返回结果
    print(response.status_code)
    if response.status_code == 200:
        json_response = response.json()
        print(json_response)
        return json_response["args"]


def send_batch_requests(batch_data):
    all_result = []
    session = requests.session()

    for parameters in batch_data:
        request_result = send_single_request(session, parameters)
        all_result.append(request_result)

    return all_result


# 业务模块在有请求需求时调用接口的业务逻辑
def appliaction_logic_to_call_requests():
    batch_data_to_request = [{"num": num} for num in range(1, 11)]
    all_result = send_batch_requests(batch_data_to_request)
    # process the result


appliaction_logic_to_call_requests()

则改成批量的aiohttp异步请求的方式,代码结构如下:

"""请求分为三个部分:
1. 构造参数
2. 发送请求
3. 处理返回结果

"""

import aiohttp
import asyncio
import json


async def send_single_request(session, parameters: dict):
    # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body
    headers = {
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Cache-Control': 'no-cache',
        'Pragma': 'no-cache',
        'Proxy-Connection': 'keep-alive',
        'Referer': 'http://httpbin.org/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
        'accept': 'application/json',
    }
    url = 'http://httpbin.org/get'

    # 新建会话,使用会话发送请求
    # (也可以直接requests.get()进行请求)
    async with session.get(url, params=parameters, headers=headers) as response:
        raw_response = await response.read()

        # 处理返回结果
        print(response.status)
        if response.status == 200:
            json_response = json.loads(raw_response)
            print(json_response)
            return json_response["args"]


async def send_batch_requests(batch_data):
    all_result = []
    async with aiohttp.ClientSession() as session:
        # # 我更喜欢使用列表推导式来生成任务,简单明了
        # tasks = [asyncio.create_task(send_single_request(session, parameters)) for parameters in batch_data]

        tasks = []
        for parameters in batch_data:
            single_task = asyncio.create_task(send_single_request(session, parameters))
            tasks.append(single_task)
        all_result = await asyncio.gather(*tasks)

    return all_result

"""异步方式需要增加一个额外的函数给外部调用
这样对外部调用逻辑来说,批量请求是同步还是异步是隐藏的细节(封装)
"""
def logic_to_start_async_tasks(batch_data):
    all_result = asyncio.run(send_batch_requests(batch_data))
    return all_result


# 业务模块在有请求需求时调用接口的业务逻辑
def appliaction_logic_to_call_requests():
    batch_data_to_request = [{"num": num} for num in range(1, 11)]
    all_result = logic_to_start_async_tasks(batch_data_to_request)
    # process the result


appliaction_logic_to_call_requests()

代码结构差异

用版本管理工具查看两个版本的代码,差异如下:
红色高亮代码为旧版本代码,即同步方式的requests结构代码
绿色高亮代码为新版本代码,即异步方式的aiohttp结构代码

导包差异

在这里插入图片描述

send_single_request差异

在这里插入图片描述

send_batch_requests差异

在这里插入图片描述

外部业务模块调用差异

在这里插入图片描述

差异说明

我们可以看到异步的的代码结构主要差异如下:

  • 具体进行异步请求的函数需要使用async def进行定义
  • async def的函数体必须包含await语句:
    • 批量创建和调度任务的函数send_batch_requestsawait用于等待任务的执行和返回:await asyncio.gather(*tasks)
    • 具体发送请求和处理返回结果的函数send_single_requestawait用于等待外部网络服务器返回awiat session.get()以及读取返回值await response.read()
    • send_single_request中也可以对response调用json()方法,但是也要使用await,(例:json_response = await response.json()),可以自行选择使用哪种用法。
  • session创建用的函数不同:
    • requestsrequests.session()创建一个可以复用的新会话
    • aiohttpaiohttp.ClientSession()创建一个可以复用的新会话
    • aiohttp的session不一定要搭配上下文管理器使用,也可以用session = aiohttp.ClientSession()创建session;使用上下文可以更好避免session没有关闭的问题
  • 可能存在一些属性或方法的名称有差异:
    • requests中的response状态码的属性名为status_code
    • aiohttp中的response状态码的属性名为status
  • 异步方法需要增加一个同步函数来启动异步创建和执行任务的逻辑
    • 为了让外部调用完全没有感知,我们在实际业务中会使用send_batch_requests作为同步函数的名称,异步函数可以改为async def create_and_execute_tasks,这样对于外部来说,在同步和异步两个版本中调用的API名称没有变化,都是调用的send_batch_requests方法

只要注意以上这些差异,就可以熟练地在异步和同步方式之间转换。

异步方式返回结果无序的解决方案

当我们使用同步的方式进行请求的时候,由于是完成上一个请求才会执行下一个请求,所以返回结果是有序的,如下:
在这里插入图片描述

而当我们使用异步的方式进行请求时,由于请求是直接接连发出的,不会等待上一个请求完成返回并处理后才发出下一个请求,并且请求返回的次序是不固定的(尽管我们是有序发出的请求),所以处理完的结果可能是无序的,如下:
在这里插入图片描述

由于我们在使用中需要知道哪个返回结果是对应哪个请求数据的,所以返回结果无序是需要解决的。解决的方法也很简单,异步请求的方法返回一个唯一标志,然后外部管理异步任务的异步方法根据返回的唯一标志进行排序或者其他处理

方式一:按请求顺序排序(请求顺序作为返回数据标识)

代码变动如下:
在这里插入图片描述
在这里插入图片描述
这样一来,最后返回的结果就是有序的:
在这里插入图片描述

方式二:自定义返回数据标识,返回一个键值对

使用数据请求顺序来说可以让返回结果有序,但从业务实际应用来说,这种方式应用场景较少。
因为实际数据使用都是通过数据内容来定位数据,而不是数据的顺序来定位数据。

举一个具体例子:

我们要请求10个用户的数据,由于对每一个用户可能是走同一套逻辑,所以这种情况可以是批量请求,然后按顺序把"用户ID"和"根据用户ID请求到的用户数据"传入后续的处理逻辑中。

但事实上用户数据的不同信息可能是由不同请求去获取的,比如用户信息包括地址和手机,而地址和手机分别存在不同的数据库表中,或者由不同的API获取,所以我们可以是发出get/user/addressget/user/telephone两个请求,然后返回结果是{"user_id": "xxx", "telephone": "xxxx"}{"user_id": "xxx", "address": "xxxxxx"},这种时候我们是知道要请求哪些信息项的,比如info_items = ["address", "telephone"]

所以可以有两种写法,如下:

"""写法一:主要还是用顺序作为数据标识以及排序依据
然后根据item在info_items的顺序与在结果的顺序一一对应的特点来返回最终结果
"""
info_items = ["address", "telephone"]

tasks = []
for index, item in enumerate(info_items):
	API = f"get/user/{item}"
    session = aiohttp.ClientSession()
    single_task = asyncio.create_task(send_single_request(session, item, identity=index))
    tasks.append(single_task)
all_result = await asyncio.gather(*tasks)
all_result = sorted(all_result, key=lambda result: int(result[0]))
all_result = [result[1] for result in all_result]

user_info = {
      info_item[index]: all_result[index] for index in range(len(info_items))
}
return user_info

"""方式二: 直接用item作为identity
然后直接用identity和返回结果生成键值对
"""
info_items = ["address", "telephone"]

tasks = []
for item in info_items:
    API = f"get/user/{item}"
    session = aiohttp.ClientSession()
    single_task = asyncio.create_task(send_single_request(session, item, identity=item))
    tasks.append(single_task)
all_result = await asyncio.gather(*tasks)

user_info = {
      result[0]: result[1] for result in all_result
}
return user_info

个人推荐第二种方式,编写上更符合python的风格,阅读上也更清晰明了

源代码

完整的异步代码结构如下:

"""请求分为三个部分:
1. 构造参数
2. 发送请求
3. 处理返回结果

"""

import aiohttp
import asyncio
import json


async def send_single_request(session, parameters: dict, identity):
    # 构造参数, 包括但不限于headers, url, parameters, proxy, post请求的body
    headers = {
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Cache-Control': 'no-cache',
        'Pragma': 'no-cache',
        'Proxy-Connection': 'keep-alive',
        'Referer': 'http://httpbin.org/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
        'accept': 'application/json',
    }
    url = 'http://httpbin.org/get'

    # 新建会话,使用会话发送请求
    # (也可以直接requests.get()进行请求)
    async with session.get(url, params=parameters, headers=headers) as response:
        raw_response = await response.read()

        # 处理返回结果
        # print(response.status)
        if response.status == 200:
            json_response = json.loads(raw_response)
            # json_response = await response.json()
            # print(json_response)
            # return identity, json_response
            return identity, {"args": json_response["args"], "url": json_response["url"]}


async def create_and_execute_tasks(batch_data):
    all_result = {}
    async with aiohttp.ClientSession() as session:
        # # 我更喜欢使用列表推导式来生成任务,简单明了
        # tasks = [asyncio.create_task(send_single_request(session, parameters)) for parameters in batch_data]

        tasks = []
        for index, parameters in enumerate(batch_data):
            single_task = asyncio.create_task(send_single_request(session, parameters, identity=index))
            tasks.append(single_task)
        identity_and_data_pairs = await asyncio.gather(*tasks)
        all_result = {pair[0]: pair[1] for pair in identity_and_data_pairs}

    return all_result


"""异步方式需要增加一个额外的函数给外部调用
这样对外部调用逻辑来说,批量请求是同步还是异步是隐藏的细节(封装)
"""


def send_batch_requests(batch_data):
    all_result = asyncio.run(create_and_execute_tasks(batch_data))
    return all_result


# 业务模块在有请求需求时调用接口的业务逻辑
def appliaction_logic_to_call_requests():
    batch_data_to_request = [{"num": num} for num in range(1, 11)]
    all_result = send_batch_requests(batch_data_to_request)
    return all_result
    # process the result


print("异步方式返回结果")
print(appliaction_logic_to_call_requests())

在实际业务开发中,根据具体的业务需求修改send_single_request以及create_and_execute_tasks即可,比如:

  • send_single_request中get请求方式改为post请求方式,并且增删使用的参数
  • create_and_execute_tasks从batch_data单个数据parameters中生成identity

掌握了上面这些,使用python写异步的http请求,就手到擒来了(●ˇ∀ˇ●)

好书推荐:

  • 流畅的python
  • Python编程 从入门到实践 第2版
  • Python数据结构与算法分析 第2版

好课推荐:

  • 零基础学python
  • python核心技术与实战
  • python自动化办公实战

写文不易,如果对你有帮助的话,来一波点赞、收藏、关注吧~👇

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/78314.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

去掉鼠标系列之一: 语雀快捷键使用指南

其实应该是系列之二了,因为前面写了一个关于Interlij IDEA的快捷键了。 为什么要写这个了,主要是觉得一会儿用鼠标,一会儿键盘,一点儿不酷,我希望可以一直用键盘,抛开鼠标。后面陆续记录一下各个软件的快捷…

如何在安卓设备上安装并使用 ONLYOFFICE 文档

您可以使用文档安卓版应用,在移动设备上访问存在您 ONLYOFFICE 帐号中的文件。阅读本文,了解如何操作。 什么是 ONLYOFFICE 文档安卓版 适用于 Android 系统的 ONLYOFFICE 文档是一款全面的办公工具,您可以使用它,查看、创建、编…

【Redis】Redis 的学习教程(五)之 SpringBoot 集成 Redis

在前几篇文章中,我们详细介绍了 Redis 的一些功能特性以及主流的 java 客户端 api 使用方法。 在当前流行的微服务以及分布式集群环境下,Redis 的使用场景可以说非常的广泛,能解决集群环境下系统中遇到的不少技术问题,在此列举几…

uniapp 小兔鲜儿 - 首页模块(1)

目录 自定义导航栏 静态结构 安全区域​ 通用轮播组件 静态结构 自动导入全局组件 全局组件类型声明 .d.ts文件 注册组件 vue/runtime-core 首页 – 轮播图指示点 首页 – 获取轮播图数据 首页 – 轮播图数据类型并渲染 首页 – 轮播图总结 首页分类 首页 – 前…

第九章 动态规划part11(代码随想录)

123.买卖股票的最佳时机III 给定一个数组,它的第 i 个元素是一支给定的股票在第 i 天的价格。 设计一个算法来计算你所能获取的最大利润。你最多可以完成 两笔 交易。 注意:你不能同时参与多笔交易(你必须在再次购买前出售掉之前的股票&…

RabbitMq-1基础概念

RabbitMq-----分布式中的一种通信手段 1. MQ的基本概念(message queue,消息队列) mq:消息队列,存储消息的中间件 分布式系统通信的两种方式:直接远程调用,借助第三方完成间接通信 消息的发送方是生产者&#xff0c…

部署piwigo网页 通过cpolar分享本地电脑上的图片

通过cpolar分享本地电脑上有趣的照片:发布piwigo网页 文章目录 通过cpolar分享本地电脑上有趣的照片:发布piwigo网页前言1. 设定一条内网穿透数据隧道2. 与piwigo网站绑定3. 在创建隧道界面填写关键信息4. 隧道创建完成 总结 前言 首先在本地电脑上部署…

linux下的lld命令

Linux下的lld命令的主要作用:用来查看程式运行所需的共享库(动态链接库),常用来解决程式因缺少某个库文件而不能运行的一些问题。 1、首先ldd不是一个可执行程序,而只是一个shell脚本 2、ldd 的使用 lld 可执行程序或者动态库…

MyBatis动态SQL:打造灵活可变的数据库操作

目录 if标签trim标签where标签set标签foreach标签 动态SQL就是根据不同的条件或需求动态地生成查询语句,比如动态搜索条件、动态表或列名、动态排序等。 if标签 在我们填写一些信息时,有些信息是必填字段,有的则是非必填的,这些…

SpringBoot-lombok

为什么要使用lombok? Lombok是一个通过注解以达到减少代码的Java库,如通过注解的方式减少getter,setter方法,构造方法等。通过注解的形式自动生成构造器、getter/setter、equals、hashcode、toString等方法,并可以自动化生成日志变量,简化java开发、提高…

c++--SLT六大组件之间的关系

1.SLT六大组件: 容器,迭代器,算法,仿函数,适配器,空间配置器 2.六大组件之间的关系 容器:容器是STL最基础的组件,没有容器,就没有数据,容器的作用就是用来存…

UI设计师个人工作总结范文精选

UI设计师个人工作总结范文(一) 在忙忙碌碌中,2019年又将过去了,在这一年当中,设计部无论是在运作模式、设计产值、还是人员结构,各方面的变化都比较大。 设计部的运作模式是从7月底开始进行调整的,以独立承包制的运营方…

Python学习笔记_基础篇(十一)_socket编程

python 线程与进程简介 进程与线程的历史 我们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行…

应用在汽车前照灯系统中的环境光传感芯片

为了保证行车照明的安全性和方便性,减轻驾驶员的劳动强度。近年来,出现了许多新的照明控制系统,例如用于日间驾驶的自动照明系统、光束调节系统、延迟控制等。尤其是汽车自适应前照灯系统,它是一种能够自动改变两种以上的光型以适…

分类预测 | MATLAB实现WOA-CNN-BiLSTM-Attention数据分类预测

分类预测 | MATLAB实现WOA-CNN-BiLSTM-Attention数据分类预测 目录 分类预测 | MATLAB实现WOA-CNN-BiLSTM-Attention数据分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.MATLAB实现WOA-CNN-BiLSTM-Attention数据分类预测,运行环境Matlab2023b及以上…

【数据库系统】--【4】DBMS存储管理

DBMS存储管理 01存储介质概述02外存管理03数据缓冲区管理04共享缓冲区的并发控制05本地缓冲区管理 01存储介质概述 02外存管理 03数据缓冲区管理 04共享缓冲区的并发控制 05本地缓冲区管理 小结 ●存储介质概述 ●外存管理 ●共享数据缓冲区 缓冲区的组织结构缓冲区的替换策略…

易服客工作室:WordPress是什么?初学者的解释

目录 什么是WordPress? WordPress可以制作什么类型的网站? 谁制作了WordPress?它已经存在多久了? 谁使用 WordPress? 白宫网站 微软 滚石乐队 为什么要使用 WordPress? WordPress 是免费且…

JAVA基础知识(六)——异常处理

异常 一、异常概述与异常体系结构二、常见异常三、异常处理机制一:try-catch-finally四、异常处理机制二:throws五、手动抛出异常:throw六、用户自定义异常类七、开发中如何选择使用try-catch-finally还是使用throws八、如何看待代码中的编译…

ATF(TF-A) 威胁模型汇总

安全之安全(security)博客目录导读 目录计划如下,相关内容补充中,待完成后进行超链接,敬请期待,欢迎您的关注 1、通用威胁模型 2、SPMC威胁模型 3、EL3 SPMC威胁模型 4、fvp_r 平台威胁模型 5、RSS-AP接口威胁模型 威胁建模是安全…

Redis消息传递:发布订阅模式详解

目录 1.Redis发布订阅简介 2.发布/订阅使用 2.1 基于频道(Channel)的发布/订阅 2.2 基于模式(pattern)的发布/订阅 3.深入理解Redis的订阅发布机制 3.1 基于频道(Channel)的发布/订阅如何实现的? 3.2 基于模式(Pattern)的发布/订阅如何实现的? 3.3 Sp…