Python 全栈系列258 线程并发与协程并发

说明

最近在大模型调用上,为了尽快的进行大量的数据处理,需要采用并发进行处理。

Before: 以前主要是自己利用CPU和GPU来搭建数据处理程序或者服务,资源受限于所用的硬件,并不那么考虑并发问题。在处理程序中,并发主要利用pandas的apply方法,以及在模型处理时采用矩阵解决。

Now: 当需要大量调用外部资源时,主要的负担在于IO。同步方式下,CPU的资源会被每一个连接抢占。所以如果不使用并发方法,性能会大幅下降。前后差距可能上千倍。

After: 除了用于大规模的外部资源请求之外,还可以用于算网微服务体系间的交互。可以实现并发查询等。

内容

1 协程并发与线程并发

资源的基本分配单元是进程,一般使用ps aux查看。

在这里插入图片描述
简单的理解进程:一个核对应一个进程。特别当一个进程是CPU密集型任务时,这个会非常明显。
在这里插入图片描述

当我们有大量的IO以同步方式调度时,资源的抢占就会非常严重。这也是是谈到线程并发和协程并发的原因。
在这里插入图片描述
以下是一个实例:在执行20个左右的进程(IO密集)时CPU的实际状态。
在这里插入图片描述

线程是比进程低一个级别的概念。一个进程下的多个线程可以共享内存资源,其通信可以直接在内存级别,而不必像进程一样,要通过消息管道。

在这里插入图片描述
协程是更加轻量级的概念。
在这里插入图片描述

总体感觉上,协程并发的实现比线程并发更负责一些。

1.1 协程并发

使用deepseek测试

串行时

from langchain.chains import LLMChain
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate

def generate_serially(key ='YOURKEY'):
    # load model 
    llm = ChatOpenAI(
        model='deepseek-chat', 
        openai_api_key=key, 
        openai_api_base='https://api.deepseek.com',
        temperature=0
    )
    # setting prompt
    prompt = PromptTemplate(
        input_variables=["product"],
        template="What is one good name for a company that makes {product}? Give name only.",
    )
    
    chain = LLMChain(llm=llm, prompt=prompt)
    for _ in range(5):
        resp = chain.run(product="toothpaste")
        print(resp)

generate_serially()
BrightSmile
BrightSmile
BrightSmile
BrightSmile
BrightSmile

4.27S

协程: 一般分为两级,worker和player。

async def generate_concurrently(key ='YOURKEY'):
    llm = ChatOpenAI(
        model='deepseek-chat', 
        openai_api_key=key, 
        openai_api_base='https://api.deepseek.com',
        temperature=1.0
    )
    prompt = PromptTemplate(
        input_variables=["product"],
        template="What is one good name for a company that makes {product}? Give name only.",
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    task = chain.arun(product="toothpaste")
    return task

async def generate_all(count=10): 
    tasks = [generate_concurrently() for _ in range(count)]
    res = await asyncio.gather(*tasks)
    return res


import time 
import asyncio

s = time.perf_counter()
# 如果在Jupyter之外运行此代码,请使用asyncio.run(generate_concurrently())
res = await generate_all()
# await generate_concurrently()
elapsed = time.perf_counter() - s
print("\033[1m" + f"Concurrent executed in {elapsed:0.2f} seconds." + "\033[0m")
# print (res) 

s = time.perf_counter()
generate_serially()
elapsed = time.perf_counter() - s
print("\033[1m" + f"Serial executed in {elapsed:0.2f} seconds." + "\033[0m")

Concurrent executed in 0.21 seconds.

所以从效果上,速度提升了10倍。本次,针对单个api-key的并发问题,用

1.2 线程并发

一种是无返回的并发方式,其实比较适合我目前的case

总执行时间为2.01秒(单个任务需要执行2秒)

import threading
import time
 
def task(n):
    print(f"Thread {n} starting")
    time.sleep(2)
    print(f"Thread {n} finished")
    return 'ok'
 
def main():
    threads = []
    for i in range(5):  # 创建5个线程
        t = threading.Thread(target=task, args=(i,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()  # 等待所有线程完成

    return t
t = main()

Thread 0 starting
Thread 1 starting
Thread 2 starting
Thread 3 starting
Thread 4 starting
Thread 4 finished
Thread 1 finished
Thread 2 finished
Thread 0 finished
Thread 3 finished

如果需要收集每个worker的返回数据

import concurrent.futures
import time

# 定义一个需要并发执行的函数,并返回结果
def worker(thread_id):
    print(f"Thread {thread_id} started")
    time.sleep(2)  # 模拟耗时操作
    result = f"Result from thread {thread_id}"
    print(f"Thread {thread_id} finished")
    return result

# 使用 ThreadPoolExecutor 来管理线程并获取结果
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务并获取 Future 对象
    futures = [executor.submit(worker, i) for i in range(5)]
    
    # 获取结果
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

print("All threads finished")
print("Results:", results)

Thread 0 started
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
Thread 0 finished
Thread 1 finished
Thread 2 finished
Thread 3 finished
Thread 4 finished
All threads finished
Results: ['Result from thread 0', 'Result from thread 1', 'Result from thread 2', 'Result from thread 3', 'Result from thread 4']

2.03S

3 应用

先完成一个单次worker调用

python3 async_caller.py APIKEY

使用多线程调用(player级别)

import os 
import time 
def worker(api_key):
    print('started ',api_key)
    tick1 = time.time()
    os.system('python3 async_caller.py %s' % api_key)
    tick2 = time.time()
    print('ended %.2f' %(tick2 -tick1) ,api_key )


import threading
keys = [
    【Many Keys】
    ]
# 创建多个线程
threads = []
for api_key in keys:
    thread = threading.Thread(target=worker, args=(api_key,))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print("All threads finished")

run_for_loops.py 用于调用n次player,也可以改为run until 。

# nohup python3 run_for_loops.py 10 >/dev/null 2>&1 &
import os 
import sys 

loops = int(sys.argv[1])
for i in range(loops):
    os.system('python3 thread_player.py')

后台执行
nohup python3 run_for_loops.py 10 >/dev/null 2>&1 &
在这里插入图片描述
发现仍然是满核执行。我看了一下各步骤的时间,发现有80%的时间是在等待服务器返回,但是有20%时间是在本地处理。所以,这仍然是一个小部分CPU密集型的任务。

结论:至少在调度上看起来简洁多了,不必启动n个进程,而是一个进程下面n个线程

4 总结

  • 1 协程用于worker级别,务求在单核上达到最高的IO并行
  • 2 线程用于player级别,确保多核并发worker
  • 3 除了主要的等待,开头和结尾可能还是有CPU开销。(至少json序列化也是常见的)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/782278.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

互联网十万个为什么之什么是数据备份?

数据备份是按照一定的备份频率创建数据副本的过程,将重要的数据复制到其它位置或者存储介质,并对生成的副本保留一定的时长。备份通常储存在不同的物理介质或云端,以确保数据的连续性和完整性。有效的备份策略至关重要,以防止数据…

ESP32-C3-Arduino-uart

引脚图 2实现串口发送接收 1默认值初始化串口(默认是uart0) Serial.begin(UART_BAUD); 参数是波特率 2自定义其他串口 2-1创建实例 HardwareSerial SerialUART(0); //数值指的是uart0 1为uart1.。。。。 2-2初始化 SerialUART.begin(UART_BAU…

LabVIEW的Actor Framework (AF) 结构介绍

LabVIEW的Actor Framework (AF) 是一种高级架构,用于开发并发、可扩展和模块化的应用程序。通过面向对象编程(OOP)和消息传递机制,AF结构实现了高效的任务管理和数据处理。其主要特点包括并发执行、动态可扩展性和强大的错误处理能…

不是哥们?你怎么抖成这样了?求你进来学学防抖吧!全方位深入剖析防抖的奥秘

前言 古有猴哥三打白骨精,白骨精 > 噶 今有用户疯狂点请求,服务器 > 噶 所以这防抖咱必须得学会!!! 本文就来讲解一下Web前端中防抖的奥秘吧!!!! 为什么要做防…

适用于 Windows 11/10/8/7/Vista/XP 的最佳免费分区软件

无论您使用的是 SSD、机械磁盘还是任何类型的 RAID 阵列,硬盘驱动器都是 Windows 计算机中不可或缺的组件。在将文件保存到全新磁盘之前,您应该初始化它,创建分区并使用文件系统格式化。在运行计算机一段时间后,您需要收缩、扩展、…

14-25 剑和侠客 – 预训练模型三部曲2 – 视觉

概述 在第 1 部分中,我们讨论了适用于文本的预训练模型的重要性及其在当今世界的相关性。大型语言模型 (LLM),尤其是 GPT-3 和随后的 GPT-3.5,已经获得了极大的欢迎,从而在 AI 讨论中引起了越来越多的关注。我们已经看到了用于构…

everything高级搜索-cnblog

everything高级搜索用法 基础4选项验证 总结搜索方式 高级搜索搜指定路径文件名: 文件名 路径不含文件名: !文件名包含单词 路径包含指定内容: 路径 content:内容 大小写 区分大小写搜索搜指定路径文件名: case:文件名 路径全字匹配 全字搜指定路径文件名: wholewo…

网络安全基础-2

知识点 1.网站搭建前置知识 域名,子域名,DNS,HTTP/HTTPS,证书等 注册购买域名:阿里云企航_万网域名_商标注册_资质备案_软件著作权_网站建设-阿里云 2.web应用环境架构类 理解不同WEB应用组成角色功能架构: 开发语…

四、(1)网络爬虫入门及准备工作(爬虫及数据可视化)

四、(1)网络爬虫入门及准备工作(爬虫及数据可视化) 1,网络爬虫入门1.1 百度指数1.2 天眼查1.3 爬虫原理1.4 搜索引擎原理 2,准备工作2.1 分析爬取页面2.2 爬虫拿到的不仅是网页还是网页的源代码2.3 爬虫就是…

Golang | Leetcode Golang题解之第213题打家劫舍II

题目: 题解: func _rob(nums []int) int {first, second : nums[0], max(nums[0], nums[1])for _, v : range nums[2:] {first, second second, max(firstv, second)}return second }func rob(nums []int) int {n : len(nums)if n 1 {return nums[0]}…

7.pwn 工具安装和使用

关闭保护的方法 pie: -no-pie Canary:-fno-stack-protector aslr:查看:cat /proc/sys/kernel/randomize_va_space 2表示打开 关闭:echo 0>/proc/sys/kernel/randomize_va_space NX:-z execstack gdb使用以及插件安装 是GNU软件系统中的标准调试工具,此外GD…

【计组OS】I/O方式笔记总结

苏泽 “弃工从研”的路上很孤独,于是我记下了些许笔记相伴,希望能够帮助到大家 目录 IO方式:程序查询方式 工作原理 程序查询方式的详细流程: 1. 初始化阶段 2. 发送I/O命令 3. 循环检查状态 4. 数据传输 5. 继续查询 6…

reactor和proactor模型

Reactor模型是非阻塞的同步IO模型。在主线程中也就是IO处理单元中,只负责监听文件描述符上是否有事件发生,有的话就立即将事件通知工作线程,将socket可读可写事件放入请求队列,交给工作线程处理。 总而言之就是主线程监听有事件发…

期末考试结束,老师该如何私发成绩?

随着期末考试的落幕,校园里又恢复了往日的宁静。然而,对于老师们来说,这并不意味着工作的结束,相反,一系列繁琐的任务才刚刚开始。 成绩单的发放,就是其中一项让人头疼的工作。家长们焦急地等待着孩子的考试…

可视化作品集(08):能源电力领域

能源电力领域的可视化大屏,有着巨大的用武之地,不要小看它。 监控能源生产和消耗情况: 通过可视化大屏,可以实时监控能源生产和消耗情况,包括发电量、能源供应情况、能源消耗情况等,帮助管理者及时了解能…

14-39 剑和诗人13 - 顶级大模型测试分析和建议

​​​​​ 随着对高级语言功能的需求不断飙升,市场上涌现出大量语言模型,每种模型都拥有独特的优势和功能。然而,驾驭这个错综复杂的生态系统可能是一项艰巨的任务,开发人员和研究人员经常面临选择最适合其特定需求的模型的挑战。…

React中的useMemo和memo

引言 React是一个声明式的JavaScript库,用于构建用户界面。在开发过程中,性能优化是一个重要的方面。useMemo和memo是React提供的工具,用于帮助开发者避免不必要的渲染和计算,从而提升应用性能。 问题背景 在React应用中&#…

Golang | Leetcode Golang题解之第214题最短回文串

题目&#xff1a; 题解&#xff1a; func shortestPalindrome(s string) string {n : len(s)fail : make([]int, n)for i : 0; i < n; i {fail[i] -1}for i : 1; i < n; i {j : fail[i - 1]for j ! -1 && s[j 1] ! s[i] {j fail[j]}if s[j 1] s[i] {fail[i…

【密码学】密码学中的四种攻击方式和两种攻击手段

在密码学中&#xff0c;攻击方式通常指的是密码分析者试图破解加密信息或绕过安全机制的各种策略。根据密码分析者对明文、密文以及加密算法的知识程度&#xff0c;攻击可以分为以下四种基本类型&#xff1a; 一、四种攻击的定义 &#xff08;1&#xff09;唯密文攻击(COA, C…

MySQL学习(7):4种常用函数

1.字符串函数 mysql中内置了很多字符串函数&#xff0c;常用的几种如下&#xff1a; concat(s1,s2,s3...)字符串拼接&#xff0c;将s1,s2,s3...拼接成一个字符串 lower(s1) 将字符串s1全部转为小写upper(s1)将字符串s1全部转为大写lpad(s1,5,*) 如果字符串s1不足5位&#xff…