Python 全栈系列249 IO并发:异步、线程与协程

说明

很久没有关注这方面的问题了,平时大部分时候还是做批量操作 。在这种情况下(CPU密集),异步、协程这些意义就不大了,甚至可能进一步拖慢处理时间。

但是在IO这一块的零碎处理是比较重要的,可以更快,且更省资源。很早的时候,曾经在执行规则引擎之前要分布的从mysql取数,结果处理时间特别慢;后来改用了asyncio和aiomysql,速度大幅提升,这给我了很深的印象:什么资源都没加,速度就是快了。

后来我主要还是集中在批次处理数据下,每次都是万条的密集操作,这时候主要就用数据库本身的功能;而面对大量零散需求的时候用了redis,这样看起来也不存在瓶颈。

事实上,主要是由于一些零碎的请求实在是太少了,完全没有达到redis-py处理的上线,所以我也忽略了,其实在应对大量请求上,还是可以使用aioredis的。
在这里插入图片描述

内容

1 基本概念

进程、线程与协程。chatgpt说的又快又好

进程、线程与协程是三种常见的计算机并发执行模型。它们各自有不同的特点和应用场景。下面对它们进行详细解释:

进程(Process)
定义
进程是操作系统分配资源的基本单位,每个进程都有独立的内存空间、数据段、代码段和系统资源。
特点
隔离性:进程之间是相互独立的,一个进程崩溃不会影响其他进程。
开销大:由于进程间资源独立,创建和切换进程的开销较大。
并行性:在多核系统上,不同进程可以真正并行运行。
应用场景
适用于需要高隔离性和稳定性的场景,如不同服务模块的隔离运行。
使用多进程来充分利用多核 CPU 的性能。

import multiprocessing

def worker(num):
    print(f'Worker: {num}')

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        p.start()
        p.join()

线程(Thread)
定义
线程是进程的子单元,是 CPU 调度和执行的基本单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和系统资源。
特点
共享资源:同一进程内的线程共享内存和文件等资源,线程之间通信和切换的开销较小。
并发执行:在单核 CPU 上,线程通过时间片轮转并发执行;在多核 CPU 上,线程可以并行执行。
同步问题:由于线程共享资源,需要考虑线程同步和互斥问题,避免竞态条件。
应用场景
适用于需要高效并发处理和共享资源的场景,如 Web 服务器处理并发请求、后台任务调度等。

import threading

def worker(num):
    print(f'Worker: {num}')

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

协程(Coroutine
定义
协程是比线程更加轻量级的执行单元。协程在用户态进行调度和切换,不依赖于操作系统提供的线程调度机制。
特点
轻量级:协程创建和切换的开销极小,因为它们不需要进行系统调用。
协作式调度:协程通过显式的让出控制权来实现调度,不同于线程的抢占式调度。即,协程在执行过程中需要显式地调用挂起操作,才能切换到其他协程。
不适合多核并行:由于协程是单线程运行的,无法利用多核 CPU 的并行能力。
应用场景
适用于大量 I/O 密集型任务,如网络编程、异步 I/O 操作等。
使用协程可以避免线程同步问题,简化并发编程模型。
比较与总结
进程:独立性强,隔离性好,但创建和切换开销大,适合多核并行。
线程:共享资源,创建和切换开销小,但需要解决同步问题,适合高并发和资源共享的场景。
协程:轻量级,无需系统调用,调度开销极小,但无法利用多核并行,适合大量 I/O 密集型任务。

import asyncio

async def worker(num):
    print(f'Worker: {num}')
    await asyncio.sleep(1)

async def main():
    tasks = [worker(i) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

关于进程,我的理解和体会。进程就是CPU的核,就是一个jupyter服务,就是一个容器,虽然这么说不严密,但是挺容易记的。在这个级别并行的方法太多了,multiprocessing没啥大用。

  • 1 服务级别,采用nginx发挥多核作用。
  • 2 单服务,tornado之类的可以直接发挥多核
  • 3 程序级,pandas的apply可以发挥多核作用(对于可向量化的操作)

还有就是采用GPU那种根本性的并行器件。

关于线程,刚好有个实际的体会。我有一个tornado,里面允许临时给一个参数字典加参数,然后我就发现调用过程失灵时不灵。原因是我启动了多核,这个参数字典其实给了某一个线程,在python里,线程也就是进程。然后进程间是隔离的,所以对于很多进程,根本没有参数。

所以从整体性能上,在核/线程基本我还算利用的可以,底下的IO密集并发还做的很不够。现在虽然有了celery,不过那种是偏异步的利用。

最后,协程在IO并发上的性价比应该是远高于线程的,所以这点我看到java的多线程就感觉太浪费了。

2 简单梳理

我把chatpt给我的一些有用的示例记一下,其实也就是这些写的比价有用,才快速攒这篇文章。

首先,我用了大量的微服务,特别是很多的agent: MongoAgent, RedisAgent, MysqlAgent… 这些服务都采用了同步的包,因为我原来处理的核心就是大批量数据:在CPU已经密集的情况下,IO并发也就没有意义了

考虑到现在越来越多的轻处理(sniffer),所以突然间感觉异步就变得越来越重要了。

2.1 在服务端异步

这个可以参考这篇文章

用sleep模拟了耗时操作,实测是蛮好用的。tornado本身也是基于asyncio做的。

import time
from concurrent.futures.thread import ThreadPoolExecutor

from tornado import web, ioloop
from tornado.concurrent import run_on_executor


class SyncToAsyncThreadHandler(web.RequestHandler):

    executor = ThreadPoolExecutor(max_workers=2)

    @run_on_executor
    def sleep(self):
        print("休息1...start")
        time.sleep(5)
        print("休息1...end")
        return 'ok'

    async def get(self):
        res = await self.sleep()
        self.write(res)

url_map = [
    ("/?", SyncToAsyncThreadHandler)
]

if __name__ == '__main__':
    app = web.Application(url_map, debug=True)
    app.listen(8888)
    print('started...')
    ioloop.IOLoop.current().start()

2.2 在客户端请求

用线程池发起并发,虽然效率不那么搞,但看着是同步方式,比较简单。

import requests
from concurrent.futures import ThreadPoolExecutor

def make_request():
    response = requests.get('http://localhost:8888')
    print(response.text)

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(make_request) for _ in range(5)]
    for future in futures:
        future.result()

这是另一个变体,同时发起多个url的请求。

import time
import logging
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

logging.basicConfig(level=logging.INFO)

def fetch_url(url):
    logging.info(f"Fetching {url}...")
    response = requests.get(url)
    logging.info(f"Completed {url}")
    return response.text

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
    "https://httpbin.org/uuid",
    "https://httpbin.org/headers",
]

def main():
    max_workers = 10  # 可以根据需要调整 max_workers 的数量
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
                logging.info(f"Result from {url}: {data[:60]}...")
            except Exception as exc:
                logging.error(f"{url} generated an exception: {exc}")

if __name__ == "__main__":
    main()

线程与协程

在我问这个问题的时候,chat又了我例子
在这里插入图片描述

线程

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"Task {n} start")
    time.sleep(2)
    print(f"Task {n} end")
    return n

def main():
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(5)]
        for future in futures:
            print(f"Result: {future.result()}")

if __name__ == "__main__":
    main()

协程

import asyncio

async def task(n):
    print(f"Task {n} start")
    await asyncio.sleep(2)
    print(f"Task {n} end")
    return n

async def main():
    tasks = [task(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

然后还给了一个混合版,我就不知道是不是它有点幻觉+过敏了。

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io(n):
    print(f"Blocking IO {n} start")
    time.sleep(2)
    print(f"Blocking IO {n} end")
    return n

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        results = await asyncio.gather(
            loop.run_in_executor(pool, blocking_io, 1),
            loop.run_in_executor(pool, blocking_io, 2),
            loop.run_in_executor(pool, blocking_io, 3),
        )
    for result in results:
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

最后再附一个我自己的协程版,在eventloop方面我有点没搞明白,不过反正不是get loop就是new loop,是在不行再叠一个nest_asyncio,反正只要有那么一个协调组织者在就行(loop)。

import nest_asyncio 
nest_asyncio.apply()

import json 
import asyncio, aiohttp
async def json_query_worker(task_id = None , url = None , json_params = None,time_out = 60, semaphore = None):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json = {**json_params},timeout=aiohttp.ClientTimeout(total=time_out)) as response:
                res = await response.text()
                return {task_id: json.loads(res)}
async def json_player(task_list , concurrent = 3):
    semaphore = asyncio.Semaphore(concurrent) # 并发限制
    tasks = [asyncio.ensure_future(json_query_worker(**x,  semaphore = semaphore)) for x in task_list]
    return await asyncio.gather(*tasks)
loop = asyncio.new_event_loop()
# loop = asyncio.get_event_loop()
tick1 = time.time()
res = loop.run_until_complete(json_player(para_dict['task_rec_list'], concurrent=10))
tick2 = time.time()
print(tick2- tick1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/682772.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

什么是通配符SSL证书?要怎么申请?

通配符SSL证书的作用主要是为了方便管理和加密具有多个子域名的网站。它能够保护一个主域名及其所有的同级子域名,无论子域名的数量多少或名称如何变化。使用一个通配符证书,你可以为像 *.example.com 这样的设置加密,这样不论是 blog.exampl…

Mysql:通过一张表里的父子级,递归查询并且分组分级

表:gc_jzst_single_base 需求:要求返回这张表里符合条件的数据,且有父子级关系的,展示为同一组且分级,给后续业务调用 代码 WITH RECURSIVE t1 AS (SELECTsingle_id,old_build_single_id,single_name,bulid_code,1 A…

运维开发介绍

目录 1.什么是运维开发 2.作用 3.优点 4.缺点 5.应用场景 5.1.十个应用场景 5.2.网站和Web应用程序 6.案例 7.小结 1.什么是运维开发 运维开发(DevOps)是一种结合软件开发(Development)与信息技术运维(Opera…

从分布式训练到大模型训练

要了解大模型训练难,我们得先看看从传统的分布式训练,到大模型的出现,需要大规模分布式训练的原因。接着第二点去了解下大规模训练的挑战。 从分布式训练到大规模训练 常见的训练方式是单机单卡,也就是一台服务器配置1块AI芯片&a…

地球科学SCI期刊,中科院2区,IF=14.6,自引率低,无预警风险!

一、期刊名称 IEEE Geoscience and Remote Sensing Magazine 二、期刊简介概况 期刊类型:SCI 学科领域:地球科学 影响因子:14.6 中科院分区:2区 三、期刊征稿范围 IEEE地球科学和遥感杂志向读者介绍IEEE GRS协会及其技术委员…

谢宁DOE培训的奇妙之旅:从陌生到熟练

在充满挑战与机遇的现代社会,不断提升自我,掌握新的技能和知识,成为了我们追求进步的重要途径。而对于我来说,参加谢宁DOE培训,无疑是我职业生涯中的一次重要抉择。这次培训让我从对谢宁DOE陌生到熟练,经历…

618电视盒子哪个好?经销商总结热销电视盒子品牌排行榜

电视盒子是目前热度最高的数码产品,大家都在讨论电视盒子的资源问题,究竟电视盒子还值不值得入手?电视盒子哪个好?电视盒子的功能并没有受到影响,依然是不可缺少的,本期我要给大家盘点实体店销量最好的电视…

雅欣控制HALL IC 产品选型手册,选择您的专属霍尔芯片(霍尔产品主要包括远翔FD,FS全系列,MST全系列霍尔)

HALLICs 应用领域 Applications 应用案例 雅欣为各个应用场景匹配专属HALL元器件 合作伙伴 Partners

Servlet详解(下)

目录 一、Servlet详解1.1、核心接口和类1.1.1、Servlet接口1.1.2、GenericServlet1.1.3、HttpServlet(推荐) 1.2、两种配置方法1.2.1、使用web.xml1.2.2、使用注解 二、Servlet应用2.1、request对象2.2、request主要方法2.2、response对象2.3、response主要方法 三、转发与重定…

linuxDNS域名解析

文章目录 DNS 是域名系统的简称正向解析反向解析主从服务器解析bond网卡 DNS 是域名系统的简称 域名和IP地址之间的映射关系 互联网中,IP地址是通信的唯一标识,逻辑地址 访问网站 域名解析的目的就是为了实现,访问域名就等于访问IP地址 …

作业-day-240605

思维导图 C编程 设计一个Per类&#xff0c;类中包含私有成员:姓名、年龄、指针成员身高、体重 再设计一个Stu类&#xff0c;类中包含私有成员:成绩、Per类对象p1 设计这两个类的构造函数、析构函数。 #include <iostream>using namespace std;class Per{ private:str…

mkfs.ubifs -c 计算工具

mkfs.ubifs -c 输入的logical erase block count,是指除掉ubi文件系统自身管理和坏块处理后的逻辑可擦除最大块数。 参考信息如下&#xff1a; logical erase block count 计算工具&#xff0c;实现代码如下&#xff1a; #!/usr/bin/python # -*- coding: UTF-8 -*-import os…

postman教程-15-前置脚本

上一小节我们学习了Postman生成随机数的方法&#xff0c;本小节我们讲解一下Postman前置脚本的使用方法。 Postman中的前置脚本&#xff08;Pre-request Script&#xff09;允许你在发送请求之前运行JavaScript代码。这可以用于修改请求头、查询参数、请求体等&#xff0c;或者…

bison flex 实现tiny语言的编译器

bison flex 实现tiny语言的编译器 项目地址&#xff1a;tiny-compiler 完成了词法分析&#xff0c;语法分析&#xff0c;中间代码生成&#xff0c;虚拟机执行&#xff0c;没有进行类型检查、错误处理和中间代码优化。 词法分析 %{ #include <iostream> #include "…

RocketMq源码解析五:生产者Producer发送消息

上一章我们把生产者启动的流程和大家一起跟着源码走了一遍,现在我们来看发送消息的流程。上一章我们已经把核心接口和类关系梳理了一遍。如下图 我们今天重点看MQProducer中的send方法最终的实现。DefaultMQProducer中,send的实现最终还是调用了 defaultMQProducerIm…

ingress规则

一 k8s 对外服务之 Ingress LB ingress 1 Ingress 简介 service的作用体现在两个方面 ? ① 对集群内部&#xff0c;它不断跟踪pod的变化&#xff0c;更新endpoint中对应pod的对象&#xff0c;提供了ip不断变化的pod的服务发现机制&#xff1b; ② 对集群外部&#xff0c…

MySQL—多表查询—自连接

一、引言 自连接&#xff0c;顾名思义就是自己连接自己。 自连接的语法结构&#xff1a; 表 A 别名 A join 表 A 别名 B ON 条件 ...; 注意&#xff1a; 1、这种语法有一个关键字&#xff1a;join 2、自连接查询可以是内连接的语法&#xff0c;可以是外连接的语法&#xff08…

embedding层的理解

一文读懂Embedding的概念&#xff0c;以及它和深度学习的关系 - 知乎 (zhihu.com) 感觉这篇知乎真的大道至简。个人感觉embedding层和普通的线性层没有什么区别~就是为了降维和升维用的。也就是向量的维度变化&#xff01;

hutool工具实践-验证码

简介 验证码功能位于cn.hutool.captcha包中&#xff0c;核心接口为ICaptcha&#xff0c;此接口定义了以下方法&#xff1a; createCode 创建验证码&#xff0c;实现类需同时生成随机验证码字符串和验证码图片getCode 获取验证码的文字内容verify 验证验证码是否正确&#x…

客户端被攻击怎么办,为什么应用加速这么适合

随着科技的进步和互联网的普及&#xff0c;游戏行业也正在经历前所未有的变革。玩家们不再满足于传统的线下游戏&#xff0c;而是转向了线上游戏。然而&#xff0c;随着游戏的线上化&#xff0c;游戏安全问题也日益凸显。游戏受到攻击是游戏开发者永远的痛点&#xff0c;谈“D“…