【python高级】asyncio 并发编程

【大家好,我是爱干饭的猿,本文重点介绍python高级篇的事件循环,task取消和协程嵌套、call_soon、call_later、call_at、 call_soon_threadsafe、asyncio模拟http请求、asyncio同步和通信、aiohttp实现高并发实践。

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【python高级】多线程、多进程和线程池编程》

1. 事件循环

  • 包含各种特定系统实现的模块化事件循环
  • 传输和协议抽象
  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
  • 模仿futures模块但适用于事件循环使用的Future类
  • 基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语、可以用在单线程内的协程之间

1.1 开始一个协程

# 事件循环回调(驱动生成器)+epollIO多路复用)
# asyncio是python用于解决异io编程的一整套解决方案
# tornado, gevent, twisted (scrapy.django channels)
# tornado(实现web服务器) django+flask(uwsgi gunicorn+nginx)
# tornado可以直接部署,nginx+tornado

import asyncio
import time


async def get_html(url):
    print("start get url")
    # time.sleep(2)  执行此代码是顺序执行
    await asyncio.sleep(2)
    print("end get url")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [get_html("www.baidu.com") for i in range(10)]

    start_time = time.time()
    loop.run_until_complete(asyncio.wait(tasks))
    end_time = time.time()

    print("耗时:{}".format(end_time-start_time))

1.2 获取协程返回值和callback逻辑

import asyncio
import time
from functools import partial

# 获取协程返回值和callback逻辑
async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    return "body"

def callback(url, future):
    print("url:{}".format(url))
    print("send email to me")

if __name__ == '__main__':
    start_time = time.time()

    loop = asyncio.get_event_loop()

    # 方式1
    # get_future = asyncio.ensure_future(get_html("www.baidu.com"))
    # loop.run_until_complete(get_future)
    # print(get_future.result())
    # print("耗时:{}".format(time.time()-start_time))

    # 方式2
    # task = loop.create_task(get_html("www.baidu.com"))
    # loop.run_until_complete(task)
    # print(task.result())
    # print("耗时:{}".format(time.time()-start_time))

    # 加入callback
    task = loop.create_task(get_html("www.baidu.com"))
    # task.add_done_callback(callback) callback未传入参数
    task.add_done_callback(partial(callback, "www.baidu.com"))
    loop.run_until_complete(task)
    print(task.result())
    print("耗时:{}".format(time.time()-start_time))

1.3 await 和 gather

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    start_time = time.time()
    # gather和wait的区别
    # gather更加high-leveL
    # gather可以进行分组

    # # 方法1
    # group1 = [get_html("www.baidu.com") for i in range(2)]
    # group2 = [get_html("www.baidu.com") for i in range(2)]
    # loop.run_until_complete(asyncio.gather(*group1, *group2))

    # 方法2
    group1 = [get_html("www.baidu.com") for i in range(2)]
    group2 = [get_html("www.baidu.com") for i in range(2)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))

    print("耗时:{}".format(time.time()-start_time))

2. task取消和协程嵌套

2.1 task 取消

import asyncio
import time

async def get_html(sleep_time):
    print("waiting")
    await asyncio.sleep(sleep_time)
    print("done after {}s".format(sleep_time))


if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)
    tasks = [task1, task2, task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        # 此报错可以在Linux中运行时按 ctrl+c 复现
        all_tasks = asyncio.all_tasks()
        for task in all_tasks:
            print("task cancel")
            print(task.cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

2.2 协程嵌套

import asyncio


async def compute(x,y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y


async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

子协程调用原理:
在这里插入图片描述

3. call_soon、call_later、call_at、 call_soon_threadsafe

import asyncio


def callback(sleep_times):
    print("sleep {} success".format(sleep_times))


def stoploop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 1. call_soon立即执行
    # loop.call_soon(callback, 2)
    # loop.call_soon(callback, 1)

    # 2. call_later 后于call_soon执行,多个call_later按照delay顺序执行
    # loop.call_later(2, callback, 2)
    # loop.call_later(1, callback, 1)
    # loop.call_later(3, callback, 3)

    # 3. call_at 指定时间执行
    # cur_time = loop.time()
    # loop.call_at(cur_time+2, callback, 2)
    # loop.call_at(cur_time+1, callback, 1)
    # loop.call_at(cur_time+3, callback, 3)

    # 4. call_soon_threadsafe和call_soon 使用方法一致,但是是线程安全的
    loop.call_soon_threadsafe(callback, 1)


    loop.call_soon(stoploop, loop)
    loop.run_forever()

4. ThreadPoolExecutor + asyncio

import asyncio
from concurrent.futures import ThreadPoolExecutor


def get_url(url):
    pass


if __name__ == "__main__ ":
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    tasks = []
    for url in range(20):
        url = "http://shop.dd.com/goods/{}/".format(url)
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time:{".format(time.time() - start_time))

5. asyncio模拟http请求

#requests -> urlib ->socket
import socket
import asyncio
from urllib.parse import urlparse

async def get_url(url ):
    # 通过socket请求htmL
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    # client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)
    writer.write("GET {HTTP/1.1r\nHost:{}r\nConnection:close\r\n\r\n".format(path, host).encode( "utf-8"))
    all_lines = []
    async for raw_line in reader:
        data = raw_line.decode("utf8")
        all_lines.append(data)
    html = "ln".join(all_lines)
    return html

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http: // shop.projectsedu.com/goods/{0}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    loop.run_until_complete(asyncio.wait(tasks))
    print('last time:{}'.format(time.time()))

6. asyncio同步和通信

import asyncio
from asyncio import Lock, Queue

import aiohttp

cache = {}
lock = Lock()
# queue = Queue() 和 queue = [], 如果需要流量限制就需要使用Queue()

async def get_stuff(url) :
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff

async def parse_stuff():
    stuff = await get_stuff()
    #do some parsing

async def use_stuff():
    stuff = await get_stuff()
    #use stuff to do something interesting

7. aiohttp实现高并发实践

import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery

start_url = 'https://developer.aliyun.com/article/698731'

waitting_urls = []
seen_urls = set()
stopping = False


async def fetch(url, session):
    async with session.get(url) as response:
        if response.status != 200:
            return None

        try:
            return await response.text(encoding='ISO-8859-1')
        except UnicodeDecodeError:
            return await response.read()


async def init_urls(url, session):
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)


def extract_urls(html):
    if html is None or not isinstance(html, str):
        return
    if not html.strip():  # 如果html内容为空或者只包含空白字符
        return

    try:
        urls = []
        pq = PyQuery(html)
        # 在这里继续处理pq对象
        pq = PyQuery(html)
        for link in pq.items("a"):
            url = link.attr("href")
            if url and url.startswith("http") and url not in seen_urls:
                urls.append(url)
                waitting_urls.append(url)
        return urls
    except Exception as e:
        print(f"Failed to parse HTML: {e}")


async def article_handler(url, session, pool):
    # 获取文章详情并解析入库
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)
    pq = PyQuery(html)
    title = pq("article-title").text()
    if len(title) == 0:
        print("No valid title found for article: {}".format(url))
    else:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 42;")
                insert_sql = "insert into article_test(title) values('{}')".format(title)
                await cur.execute(insert_sql)


async def consumer(pool):
    async with aiohttp.ClientSession() as session:
        while not stopping:
            if len(waitting_urls) == 0:
                await asyncio.sleep(1)
                continue

            url = waitting_urls.pop()
            print("start get url: {}".format(url))
            if re.match('https://world.taobao.com', url):
                continue
            if re.match('http://.*?developer.aliyun.com/article/\d+/', url):
                if url not in seen_urls:
                    asyncio.ensure_future(article_handler(url, session, pool))
                    await asyncio.sleep(2)  # 适当的等待时间
            else:
                if url not in seen_urls:
                    await init_urls(url, session)


async def main(loop):
    # 等待MySQL建立连接
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='*****',
                                      db='aiomysql_test', loop=loop,
                                      charset='utf8', autocommit=True)
    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        extract_urls(html)
    asyncio.ensure_future(consumer(pool))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        stopping = True
        loop.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/128700.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

leetcode刷题 - SQL - 中等

1. 176. 第二高的薪水 筛选出第二大 查询并返回 Employee 表中第二高的薪水 。如果不存在第二高的薪水,查询应该返回 null(Pandas 则返回 None) 。查询结果如下例所示。 666中等的第一题就上强度 强行解法 select max(salary) as SecondHighestSalary from Emp…

多维详述MediaBox互动直播AUI Kit低代码开发方案

本专栏将分享阿里云视频云MediaBox系列技术文章,深度剖析音视频开发利器的技术架构、技术性能、开发能效和最佳实践,一起开启音视频的开发之旅。本文为MediaBox最佳实践篇,重点从互动直播AUI Kit的核心能力、技术架构、快速集成等方面&#x…

Seata之AT模式

目录 AT模式的引进 AT模式前提 AT模式的工作流程 案例流程梳理 AT模式的原理 具体使用 优缺点 小结 AT模式的引进 我们XA模式有死锁(协议阻塞)问题:XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA…

【手写模拟Spring底层原理】

文章目录 模拟Spring底层详解1、结合配置类,扫描类资源1.1、创建需要扫描的配置类AppConfig,如下:1.2、创建Spring容器对象LyfApplicationContext,如下1.3、Spring容器对象LyfApplicationContext扫描资源 2、结合上一步的扫描&…

vue3 文字轮播打字机效果

实现效果 1.安装依赖 npm install duskmoon/vue3-typed-js 2.html <div class"title_left_1"><Typed :options"options" class"typedClass"><div class"typing"></div></Typed> </div> 3.ts…

Vue 组件化编程 和 生命周期

目录 一、组件化编程 1.基本介绍 : 2.原理示意图 : 3.全局组件示例 : 4.局部组件示例 : 5.全局组件和局部组件的区别 : 二、生命周期 1.基本介绍 : 2.生命周期示意图 : 3.实例测试 : 一、组件化编程 1.基本介绍 : (1) 开发大型应用的时候&#xff0c;页面往往划分成…

行情分析——加密货币市场大盘走势(11.10)

大饼今日继续上涨&#xff0c;正如预期&#xff0c;跌不下来&#xff0c;思路就是逢低做多。现在已经上涨到36500附近&#xff0c;目前从MACD日线来看&#xff0c;后续还要继续上涨&#xff0c;当然稳健的可以不做。昨日的策略已经达到止盈&#xff0c;也是顺利的落袋为安啦。一…

局域网下搭建SVN服务器

文章目录 1. 下载SVN服务器(VisualSVN Server)2. 安装SVN服务器(VisualSVN Server)3. 下载并安装TortoiseSVN4. 搭建SVN服务器 1. 下载SVN服务器(VisualSVN Server) 下载地址 2. 安装SVN服务器(VisualSVN Server) 默认安装即可 Location&#xff1a;VisualSVN Server的安装…

【chat】2:vs2022 连接远程ubuntu服务器远程cmake开发

大神们是使用vs远程连接和调试的:C++搭建集群聊天室(三):配置远程代码编辑神器 VScode我尝试过vs++ 和 clion 都不错。在 Visual Studio 中配置 Linux CMake 项目 比较麻烦的就是要配置CMakeSettings.json ,而且会自动做复制指定远程 Linux 目标,则会将源复制到远程系统 …

《单链表》的实现(不含哨兵位的单向链表)

目录 ​编辑 前言&#xff1a; 链表的概念及结构&#xff1a; 链表的实现&#xff1a; 1.typedef数据类型&#xff1a; 2.打印链表 &#xff1a; 3.创建新节点&#xff1a; 4.尾插 &#xff1a; 5.头插&#xff1a; 6.尾删 &#xff1a; 7.头删&#xff1a; 8.查找节…

刚安装的MySQL使用Navicat操作数据库遇到的问题

刚安装的MySQL使用Navicat操作数据库遇到的问题 一、编辑连接保存报错二、打开数据表很慢三、MySQL的进程出现大量“sleep”状态的进程四、执行sql脚本报错&#xff0c;部分表导不进去五、当前MySQL配置文件 一、编辑连接保存报错 连接上了数据库&#xff0c;编辑连接保存报错…

winform打包默认安装路径设置

点击安装程序的 Application Folder 修改属性中的 DefaultLocation

竞赛选题 深度学习疲劳驾驶检测 opencv python

文章目录 0 前言1 课题背景2 实现目标3 当前市面上疲劳驾驶检测的方法4 相关数据集5 基于头部姿态的驾驶疲劳检测5.1 如何确定疲劳状态5.2 算法步骤5.3 打瞌睡判断 6 基于CNN与SVM的疲劳检测方法6.1 网络结构6.2 疲劳图像分类训练6.3 训练结果 7 最后 0 前言 &#x1f525; 优…

云效流水线docker部署 :node.js镜像部署VUE项目

文章目录 引言I 流水线配置1.1 项目dockerfile1.2 Node.js 镜像构建1.3 docker 部署引言 云效流水线配置实现docker 部署微服务项目:https://blog.csdn.net/z929118967/article/details/133687120?spm=1001.2014.3001.5501 配置dockerfile-> 镜像构建->docker部署。 …

linux中使用arthas进行jvm内存分析

1. 安装下载 首先在官方github地址选择合适的版本&#xff0c;下载后上传到对于服务器。 使用unzip arthas-bin.zip 解压文件。进入目录中&#xff0c;执行./install-local.sh进行安装。执行完成后提示succeed&#xff0c;即可使用。 2. 启动 进入目录&#xff0c;执行java…

【优选算法系列】【专题七分治】第一节.75. 颜色分类和912. 排序数组

文章目录 前言一、颜色分类 1.1 题目描述 1.2 题目解析 1.2.1 算法原理 1.2.2 代码编写二、排序数组 2.1 题目描述 2.2 题目解析 2.2.1 算法原理 2.2.2 代码编写总结 前言 一、颜色分类 1.1 题目描述 描述&…

续:将基于Nasm汇编的打字小游戏,移植到DOSBox

续&#xff1a;将基于Nasm汇编的打字小游戏&#xff0c;移植到DOSBox 文章目录 续&#xff1a;将基于Nasm汇编的打字小游戏&#xff0c;移植到DOSBox前情提要细说1 编译2 程序入口3 定位段 运行体验 前情提要 上一篇&#xff1a;【编程实践】黑框框里的打字小游戏&#xff0c;但…

我的月光宝盒初体验失败了

哈哈哈&#xff0c;我爱docker, docker 使我自由&#xff01;&#xff01;&#xff01; docker make me free! 菠萝菠萝蜜口号喊起来。 https://github.com/vivo/MoonBox/ windows上安装好了docker之后&#xff0c;docker-compose是自带的。 docker-compose -f docker-compo…

SpringBoot核心知识点总结【Spring Boot 复习】

文章目录 Spring Boot 精要1. 自动配置2. 起步依赖3. 命令行界面4. Actuator 开发SpringBoot程序1. 启动引导Spring2. 测试Spring Boot应用程序3. 配置应用程序属性2.2 使用起步依赖2.3 使用自动配置专注于应用程序功能 Spring Boot 精要 Spring Boot将很多魔法带入了Spring应…

KiB、MiB与KB、MB的区别

KiB、MiB与KB、MB的区别