说明
最近在大模型调用上,为了尽快的进行大量的数据处理,需要采用并发进行处理。
Before: 以前主要是自己利用CPU和GPU来搭建数据处理程序或者服务,资源受限于所用的硬件,并不那么考虑并发问题。在处理程序中,并发主要利用pandas的apply方法,以及在模型处理时采用矩阵解决。
Now: 当需要大量调用外部资源时,主要的负担在于IO。同步方式下,CPU的资源会被每一个连接抢占。所以如果不使用并发方法,性能会大幅下降。前后差距可能上千倍。
After: 除了用于大规模的外部资源请求之外,还可以用于算网微服务体系间的交互。可以实现并发查询等。
内容
1 协程并发与线程并发
资源的基本分配单元是进程,一般使用ps aux
查看。
简单的理解进程:一个核对应一个进程。特别当一个进程是CPU密集型任务时,这个会非常明显。
当我们有大量的IO以同步方式调度时,资源的抢占就会非常严重。这也是是谈到线程并发和协程并发的原因。
以下是一个实例:在执行20个左右的进程(IO密集)时CPU的实际状态。
线程是比进程低一个级别的概念。一个进程下的多个线程可以共享内存资源,其通信可以直接在内存级别,而不必像进程一样,要通过消息管道。
协程是更加轻量级的概念。
总体感觉上,协程并发的实现比线程并发更负责一些。
1.1 协程并发
使用deepseek测试
串行时
from langchain.chains import LLMChain
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
def generate_serially(key ='YOURKEY'):
# load model
llm = ChatOpenAI(
model='deepseek-chat',
openai_api_key=key,
openai_api_base='https://api.deepseek.com',
temperature=0
)
# setting prompt
prompt = PromptTemplate(
input_variables=["product"],
template="What is one good name for a company that makes {product}? Give name only.",
)
chain = LLMChain(llm=llm, prompt=prompt)
for _ in range(5):
resp = chain.run(product="toothpaste")
print(resp)
generate_serially()
BrightSmile
BrightSmile
BrightSmile
BrightSmile
BrightSmile
4.27S
协程: 一般分为两级,worker和player。
async def generate_concurrently(key ='YOURKEY'):
llm = ChatOpenAI(
model='deepseek-chat',
openai_api_key=key,
openai_api_base='https://api.deepseek.com',
temperature=1.0
)
prompt = PromptTemplate(
input_variables=["product"],
template="What is one good name for a company that makes {product}? Give name only.",
)
chain = LLMChain(llm=llm, prompt=prompt)
task = chain.arun(product="toothpaste")
return task
async def generate_all(count=10):
tasks = [generate_concurrently() for _ in range(count)]
res = await asyncio.gather(*tasks)
return res
import time
import asyncio
s = time.perf_counter()
# 如果在Jupyter之外运行此代码,请使用asyncio.run(generate_concurrently())
res = await generate_all()
# await generate_concurrently()
elapsed = time.perf_counter() - s
print("\033[1m" + f"Concurrent executed in {elapsed:0.2f} seconds." + "\033[0m")
# print (res)
s = time.perf_counter()
generate_serially()
elapsed = time.perf_counter() - s
print("\033[1m" + f"Serial executed in {elapsed:0.2f} seconds." + "\033[0m")
Concurrent executed in 0.21 seconds.
所以从效果上,速度提升了10倍。本次,针对单个api-key的并发问题,用
1.2 线程并发
一种是无返回的并发方式,其实比较适合我目前的case
总执行时间为2.01秒(单个任务需要执行2秒)
import threading
import time
def task(n):
print(f"Thread {n} starting")
time.sleep(2)
print(f"Thread {n} finished")
return 'ok'
def main():
threads = []
for i in range(5): # 创建5个线程
t = threading.Thread(target=task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # 等待所有线程完成
return t
t = main()
Thread 0 starting
Thread 1 starting
Thread 2 starting
Thread 3 starting
Thread 4 starting
Thread 4 finished
Thread 1 finished
Thread 2 finished
Thread 0 finished
Thread 3 finished
如果需要收集每个worker的返回数据
import concurrent.futures
import time
# 定义一个需要并发执行的函数,并返回结果
def worker(thread_id):
print(f"Thread {thread_id} started")
time.sleep(2) # 模拟耗时操作
result = f"Result from thread {thread_id}"
print(f"Thread {thread_id} finished")
return result
# 使用 ThreadPoolExecutor 来管理线程并获取结果
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务并获取 Future 对象
futures = [executor.submit(worker, i) for i in range(5)]
# 获取结果
results = [future.result() for future in concurrent.futures.as_completed(futures)]
print("All threads finished")
print("Results:", results)
Thread 0 started
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
Thread 0 finished
Thread 1 finished
Thread 2 finished
Thread 3 finished
Thread 4 finished
All threads finished
Results: ['Result from thread 0', 'Result from thread 1', 'Result from thread 2', 'Result from thread 3', 'Result from thread 4']
2.03S
3 应用
先完成一个单次worker调用
python3 async_caller.py APIKEY
使用多线程调用(player级别)
import os
import time
def worker(api_key):
print('started ',api_key)
tick1 = time.time()
os.system('python3 async_caller.py %s' % api_key)
tick2 = time.time()
print('ended %.2f' %(tick2 -tick1) ,api_key )
import threading
keys = [
【Many Keys】
]
# 创建多个线程
threads = []
for api_key in keys:
thread = threading.Thread(target=worker, args=(api_key,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("All threads finished")
run_for_loops.py
用于调用n次player,也可以改为run until 。
# nohup python3 run_for_loops.py 10 >/dev/null 2>&1 &
import os
import sys
loops = int(sys.argv[1])
for i in range(loops):
os.system('python3 thread_player.py')
后台执行
nohup python3 run_for_loops.py 10 >/dev/null 2>&1 &
发现仍然是满核执行。我看了一下各步骤的时间,发现有80%的时间是在等待服务器返回,但是有20%时间是在本地处理。所以,这仍然是一个小部分CPU密集型的任务。
结论:至少在调度上看起来简洁多了,不必启动n个进程,而是一个进程下面n个线程
4 总结
- 1 协程用于worker级别,务求在单核上达到最高的IO并行
- 2 线程用于player级别,确保多核并发worker
- 3 除了主要的等待,开头和结尾可能还是有CPU开销。(至少json序列化也是常见的)