说明
这个坑花了10个小时才爬出来
碰到一个现象:将微服务改造为并发后,请求最初很快,然后就出现大量的失败,然后过一会又能用。
过去从来没有碰到这个问题,要么是一些比较明显的资源,或者逻辑bug,总之跑断了就明白,非常显眼。然后从接口传入的并发量并不是很大,而且即使初期传入的量大也没见崩。
内容
1 分析
我把日志贴给了chatgpt,然后得到了一些信息。
在压测过程中,出现 “Cannot assign requested address” 错误,通常是由于系统的网络端口资源耗尽或请求连接过于频繁,导致无法分配新的端口。这类问题可能与以下几个因素有关:
-
端口耗尽:系统的可用端口数有限,尤其是在并发请求非常高时,操作系统可能耗尽可用的端口资源。这通常会导致类似 “Cannot assign requested address” 的错误。
-
TIME_WAIT 状态:HTTP 请求结束后,系统会将使用的端口暂时保留在 TIME_WAIT 状态,导致端口无法立即释放和重复使用。如果并发请求较多,TIME_WAIT 状态的端口过多,会导致端口耗尽。
-
系统参数限制:一些系统网络参数设置可能过低,例如
net.ipv4.ip_local_port_range
(本地可用端口范围)或net.ipv4.tcp_tw_reuse
(TIME_WAIT 状态端口重用),可能会影响并发连接的处理能力。
解决方案建议:
-
调整系统端口范围:
你可以通过sysctl
修改系统可用的端口范围:sysctl -w net.ipv4.ip_local_port_range="1024 65535"
-
启用 TIME_WAIT 重用:
开启tcp_tw_reuse
允许系统重用 TIME_WAIT 状态的端口:sysctl -w net.ipv4.tcp_tw_reuse=1
-
增加文件句柄和连接数:
提升系统最大文件句柄和连接数限制:ulimit -n 65536 sysctl -w fs.file-max=1000000
-
合理配置负载均衡:
如果你使用了 Nginx 作为负载均衡器,确保 Nginx 配置了足够的连接处理能力,例如增加worker_connections
。
这些设置可以帮助你在高并发压测环境中优化连接资源,减少端口耗尽的情况。
文件句柄数我先看了,非常大100多万。然后我看到fastapi在疯狂的刷端口,一个容器内所有的端口都给这个服务,所以说端口不够用也不对。
看到TIME_WAIT我突然有感觉了,应该是这个问题。
由于这个微服务实际上是第二级的,负责处理大量的规则。每一个entity都会执行一次规则。没执行一次都发起一次调用。然后按照TCP的规范,握手释放的时间至少要10秒,保守是30秒。默认情况下是2Min。
从这里也可以推算,假设有5万个端口分配给微服务,每120秒的冷却时间,那么TPS大约是400,如果改为30秒,那么是1200。还有就是通过长连接 + 批量发送来实质性的扩大TPS。
最初我的主web框架是flask和tornado。
本来这次没有意外的话,我应该使用Tornado搭建服务的,也许也就不会有这个问题。最初是看到FastAPI的一些优点:例如更高的json序列化效率,结合pydantic的写法,的确是更好的,所以也冒了一点险来使用。
然后在优化的过程中了解到了Gunicorn和Uvicorn的差别,也更明确了FastAPI适用的场景。例如,使用FastAPI进行异步数据库查询应该会比Tornado更合适 。反正想要做的更好就必须去冒险,踩坑可能是一种无法避免的事,当我再从坑里爬出来,就离目标更近了一点。
我也和Chat确认了一下:
是的,Tornado 和 FastAPI 确实有不同之处,尤其是在长连接(如 WebSocket、长轮询)方面。
1. Tornado 的长连接能力
Tornado 是一个异步的 Python web 框架,最早是为实时 Web 应用设计的。它天然支持长连接,适合处理WebSocket、长轮询、**服务器推送(Server-Sent Events, SSE)**等应用场景。它的异步 IO 模型基于 tornado.ioloop.IOLoop
,让 Tornado 可以高效地处理并发连接和长时间保持的连接。
2. FastAPI 与长连接
FastAPI 虽然不是为长连接特别设计的,但它基于 Starlette 框架,同样支持 WebSocket、SSE 等长连接类型的通信。FastAPI 的优势在于它的性能和现代设计,包括使用异步支持(async/await
),并且在许多情况下性能可以媲美 Tornado。
FastAPI 使用 WebSocket 的例子:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
3. Tornado 和 FastAPI 的区别
-
长连接支持:Tornado 原生支持长连接,适合实时通讯类应用,例如聊天服务器、实时数据流等。FastAPI 也支持长连接,但需要通过 WebSocket 等机制来实现,底层是通过 Starlette 提供的异步支持。
-
并发模型:Tornado 使用自己的事件循环 (
IOLoop
) 和异步编程模型,比较轻量且高效。而 FastAPI 基于 Python 的asyncio
事件循环,允许更现代的异步编程(async/await
),并且和 Starlette、Uvicorn 结合提供高性能的异步 API。 -
实时性和延迟:Tornado 被设计为应对高并发的长连接场景,在处理实时应用时,性能和低延迟方面表现突出。FastAPI 也表现不错,但对于长连接,可能在某些特定情况下性能不如 Tornado 稳定。
4. 长连接适用场景
-
Tornado:适合构建实时交互应用,如聊天系统、股票行情推送系统、游戏服务器等需要保持长时间的连接或高频率通讯的场景。
-
FastAPI:适合 RESTful API 服务,同时也可以处理长连接需求,如 WebSocket,但它的设计初衷是高性能的 API,而非长连接为主的应用场景。
总结:
- Tornado:更适合专注于长连接和实时通信的场景,原生支持这些特性。
- FastAPI:更适合快速构建高性能API,也可以支持长连接,但需要依赖底层的异步框架。
小结论
到这里可以有一些小结论:
- 1 在大量的密集处理任务上,tornado可能会更好一些;而fastapi更适合零散的异步任务。400TPS-1200TPS是一个天然限制。
- 2 大量的规则不可以作为API,而是应该封装为函数式。事实上,我估算了一下产生的二级请求是一个比较夸张的量。能勉强顶住,FastAPI已经算不错的了。
- 3 度量是否要API也要看计算/传输比。在简单规则中,这个值太低了,完全划不来。
2 解决
既然是“端口相对不足”的问题,那么就做相应的调整。
the_port=34009
docker run -d \
--name=short_name_query_server_${the_port} \
-v /etc/localtime:/etc/localtime \
-v /etc/timezone:/etc/timezone \
-v /etc/hostname:/etc/hostname \
-p ${the_port}:8000 \
-e "LANG=C.UTF-8" \
--sysctl net.ipv4.tcp_fin_timeout=30 \
--sysctl net.ipv4.tcp_tw_reuse=1 \
--sysctl net.ipv4.ip_local_port_range="10000 65535" \
-w /workspace \
IMAGE \
sh -c "uvicorn fast_server:app --host 0.0.0.0 --port 8000 --workers=5
在docker启动时增加配置项,反正顾名思义吧
然后可以切入容器检查
cat /proc/sys/net/ipv4/tcp_fin_timeout
cat /proc/sys/net/ipv4/tcp_tw_reuse
cat /proc/sys/net/ipv4/ip_local_port_range
改完后实测下来是在足够大的批量里跑数都是0错误了,当然只能是可怜的并发2,而且服务内部我还不敢去并发执行规则。
3 Next
规则执行只会有 get、pass、reject、error四个状态
目前规则的样式
# reject
@app.post("/r000/")
async def r000(justent:JustEnt):
the_ent = justent.some_ent
the_result = RuleResult()
try:
if judge_existence(the_ent, word_list=r0_exe_clude_list):
the_result.status = 'reject'
else:
the_result.status = 'pass'
return the_result.dict()
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
在上层调用的样式
# 接口返回数据模型 v {status: pass/reject/get , data:None 或者匹配全称}
# mapping_list 仅用于本次,不是通用设计
# raw 也是如此
import time
def waterfall_api_mode(last_fall, rule_name ,reject_list = None, get_list = None, mappling_list = None, raw = None , base_url = None):
next_fall = []
last_ent_list = last_fall
pure_rule_url = rule_name + '/'
if len(last_ent_list):
rule_url = base_url + pure_rule_url
# api mode
tick1 = time.time()
task_list = []
for ent in last_ent_list:
tem_dict = {}
tem_dict['task_id'] = ent
tem_dict['url'] = rule_url
if raw is None :
tem_dict['json_params'] = {'some_ent':ent}
else:
tem_dict['json_params'] = {'some_ent':ent,'raw':raw}
task_list.append(tem_dict)
rule_res = asyncio.run(json_player(task_list, concurrent = 10))
# 解析结果,保留pass
for tem_res in rule_res:
for k,v in tem_res.items():
# print(k,v)
if v['status'] == 'pass':
next_fall.append(k)
elif v['status'] == 'get':
if get_list is not None :
get_list.append(v['data'])
if mappling_list is not None :
mappling_list.append({'ent':k,'mapping_ent': v['data']})
elif v['status'] == 'reject':
if reject_list is not None :
reject_list.append(k)
tick2 = time.time()
print('takes %.2f ' %(tick2-tick1))
return next_fall
可以把输入的实体列表作为一个series,然后去apply就好了。根据每次apply的结果,分为四个类型:
- 1 get : 附加到返回部分
- 2 reject : 目前可以直接扔掉(如果是学习和分析)
- 3 pass : 没有获取也没有抛弃,传入下一步处理。如果没有pass,那么处理结束。
- 4 error:发生错误的部分,可以发往kafka
然后做一个简单的程序流就可以取代目前的微服务了。