目录
一、框架的特点简介
1、vllm
pagedAttention
Continuous batching
2、TensorRT-LLM
WOQ——W4A16、W8A16
SQ——SmoothQuant
AWQ——Activation-aware Weight Quantization
二、web推理服务
vllm_service
tensortllm_service
三、推理速度对比
1、非业务数据
torch的生成
trtllm的生成
fp16权重
wo_int8权重
wo_int4
sq_W8A8
vllm推理
2、业务数据
在上一篇博客——基于torch.compile和gptfast代码风格实现ChatGLM模型推理加速——中提到有空了也把vllm和tensort-LLM推理框架在ChatGLM2-6B模型上进行效果对比写写博客,这篇博客就专门对语言大模型推理框架Vllm和TensorRT-LLM在ChatGLM2-6B模型上的推理速度和效果进行一个对比。主要的内容分为三块,第一块简单介绍一下vllm和TensorRT-llM框架的特色和基本技术点,由于篇幅的原因关于技术的原理就不做多的介绍(每个技术点都可以拧出来写一篇文章,工作量非常大);第二块内容就是介绍一下环境安装和重要的API,并且提供一个web推理服务;最后一块内容就是展示一下具体的推理案例,在推理速度和推理质量上进行对比,当然也要参考huggingface transformer和torch 原生实现的推理结果。
一、框架的特点简介
1、vllm
从vllm github上的简介(上图)可以看出vllm推理速度快的原因包括最佳的服务吞吐、使用pageattention高效的管理attention的key和value的内存(特指GPU显存)、Continuous batching(连续的批处理)、cuda计算图快速执行模型、量化技巧(AWQ/FP8)以及优化的cuda算子,其中比较新颖的是pagedAttention和Continuous batching。
pagedAttention
pagedAttention的主要内容是——借助操作系统内存分页的思想,对显存进行分区,减少显存碎片化,提高显存的利用,从而在固定大小显存和模型的前提下,提高推理的batch size,增加显卡的推理吞吐量。pagedAttention的论文Efficient Memory Management for Large Language Model Serving with PagedAttention指出现有的推理系统对attention的kv cache的管理中存在显存的浪费,如下图所示
可以看到系统会为请求分配连续的显存空间,其长度为prompt + output,这里势必就会产生内部碎片(internal fragmentation,已经分配给请求但未被利用的空间,将来才会使用)以及外部碎片(external fragmentation,过小而无法分配给其他请求)。
为了解决上述显存碎片的问题,提出了pagedAttention——把kv向量保存在不连续的显存空间块。从而减少显存空间的浪费,减少kvcache 对显存的占用,从而增大batch size,提升显卡的吞吐率。
Continuous batching
batching的思路是不对收到的请求立马过模型推理得到结果,而是把请求攒积到一定的数量的时候,一起经过模型推理,利用显卡的高度并行性,提升了整体系统的吞吐率。从博客How continuous batching enables 23x throughput in LLM inference while reducing p50 latency中可以得出vllm使用continuous batching获得了23倍吞吐的提升,并且continuous batching比原始的naive batching效果要好很多:
naive batching示意图如下,batch内所有的推理都是需要等生成序列最长的那个请求完成后,它们才能完成推理,这里有一段无意义的等待时间,造成了显存浪费同时也降低了系统的吞吐,增大了延迟。
为此vllm基于论文Orca: A Distributed Serving System for Transformer-Based Generative Models中的iteration batching提出continuous batching(核心思想差不多,工程实现不同)。
在S3推理生成完成后,立马空出显存空间,把新的请求S5组入batch中,不像naive batching那样等待S2生成完成后才能进行推理。以上只是一个简单原理示意图的介绍,工程化实现是比较有难度的,需要一个调度器来进行请求的调度,哪些请求可以进行推理,哪些请求请求要等待,哪些请求推理中的请求可以暂停等,同时为了实现batch内早完成的思想,还需要重写一套attention计算的cuda算子,支持attention的计算(没有batch的概念,batch被拉平到一行中),详细内容需要去仔细阅读vllm源码。
2、TensorRT-LLM
英伟达出品必是精品,2023年10月19日英伟达发布了一款专为大模型推理的开源库,它的推理效率比vllm还要好。它专为LLM大模型设计,支持市面上主流的模型,同时也实现了其他推理框架比如vllm的各项推理加速技术,其主要的特点如下:
各种attention版本、in-flight batching(也就是continuous batching)、pagedAttention、张量并行、流水线并行、各种量化技术SQ、AWQ以及新的数据类型FP8的支持。同时结合tensorrt高效的cuda算子、层融合、精度校准和内核选择等技术,使得它的推理效率更为高效。下面简单的介绍一下各种量化技术
WOQ——W4A16、W8A16
仅权重量化第一个就是可以降低模型使用门槛,可以降低加载模型权重的显存;另外WOQ是否能加速和底层的cuda算子实现强相关,WOQ在执行过程中会把权重转化为int8或者int4,然后再转化为fp16和激活值相乘。如果不使用底层优化这个整个过程多了一个反量化的流程,显而易见的是会减慢模型的推理速度的;但是如果实现独特的算子,把上述过程写在一个算子中,反量化的过程也有独特的快速算法,总体上相对于前面的普通流程是会减少显存的换进换出次数,从而实现降低显存同时加速推理的效果。
SQ——SmoothQuant
对LLM模型权重和激活值都进行量化操作,有一个问题就是激活值很难量化——它的值域范围很大,而且激活值分布不均匀不平滑比较杂乱,有异常值,量化后导致误差较大,模型效果损失比较多,为此学者们提出了SmoothQuant,采用技术手段把激活值的量化难度迁移到权重的量化上来,即能保证模型的效果和推理速度,又进一步减少了模型显存的占用。
论文SmoothQuant: Accurate and Efficient Post-Training Quantization for Large Language Models中的示意图展示的比较清楚,原始的量化由于激活值分布不均匀有异常值很难量化,权重分布比较均匀very easy量化;smoothquant对激活值和权重分别进行平滑和扩大后,激活值变得平滑一下,权重稍微变得那么平滑,但是它们量化难度都是比较容易量化。同时为了调控权重和激活值的量化难度的程度或者或激活值的量化难度向权重值偏移多少设置了一个平滑系数α:
具体的α的值是多少,需要做实验根据不同的模型设置不同的参数。最后针对smoothquant,作者们实现了int8GEMM的cuda算子,使得模型的推理速度相比WOQ进一步提升,不过模型的效果还需要实践测试,论文里给出的指标非常好看。从流程上看,这个量化操作需要训练数据的校准,得到合适的量化scale。
AWQ——Activation-aware Weight Quantization
这个还是SQ作者们的又一量化工作,它是一种大模型低比特权重量化方法,支持W4A16的量化,仅仅采用权重量化策略来实现模型的量化加速,加速的原因有两个方面,第一个是小batch_size 和小模型规模的情况下,系统是属于Memory-bound(内存受限——主要是值矩阵在显卡上的计算的时候数据从全局内存加载到共享显存中,这个耗时是比较久的相对小矩阵的计算),而不是Compute-bound(计算受限——大规模矩阵计算) 。
上图来自AutoAWQ的特点说明——Fp6权重缩小3倍为int4,从gpu全局内存中加载进共享内存,然后再结合定制化的算子,把int4权重快速的反量化为FP16,再完成计算,整体收益还是为正,有加速效果。加速收益最大的就是Fused modules——模块融合。
几个模块融合为一个模块,然后再开发出对应的算子,就会减少内存(显存)的换进换出次数,从而加速模型推理性能。
从模型推理效果上看AWQ基于到观测的事实——LLM模型的weights中只有1%比例是显著的,能显著决定模型的效果,然后做了保留不同比例weights为fp16精度的实验,结果如下:
得出结论认为基于activations的分布保留的0.1-1%的权重为Fp16,量化后的效果最好;同时考虑到大部分权重为低精度(int8 or int4之类的)少量的权重为fp16的精度——也就是混合精度对于硬件实现计算效率不友好,依据smoothQuante量化的思想也把该部分权重进行低精度量化,提升整体的性能,同时又不会降低模型推理质量太多。
二、web推理服务
本来这里想写环境的安装和配置之类的,这些都比较简单,就不写了。这里写一下使用aiohttp实现的异步web服务。
vllm_service
import aiohttp
import json
import traceback
import asyncio
import os
import configparser
from aiohttp import web
import socket
from tools.log import Logger
import logging
import time
import re
import json
from typing import AsyncGenerator
from utils.utils import merge
import pytomlpp as toml
from vllm.utils import random_uuid
from transformers.generation.logits_process import LogitsProcessor
import torch
config = toml.load('config.toml')
log_level_dict = {'CRITICAL': 50, 'FATAL': 50, 'ERROR': 40, 'WARNING': 30, 'WARN': 30, 'INFO': 20, 'DEBUG': 10,
'NOTSET': 0, 'critical': 50, 'fatal': 50, 'error': 40, 'warning': 30, 'warn': 30, 'info': 20,
'debug': 10, 'notset': 0}
def get_local_ip(ip, port):
try:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((ip, port))
ip = conn.getsockname()[0]
except Exception:
raise
conn.close()
return ip
async def heart_beat(ip, port):
beat_logger = Logger(log_name='beat', log_level=10,
log_file='./logs/beat.log').logger
# 单位秒
interval = config["common"]["hb_interval"]
send_data = {
'method': 'heartbeat',
'params': {
'data': [
]
}
}
......
send_data = json.dumps(send_data)
client = aiohttp.ClientSession()
while True:
try:
await client.post(url=hb_url, data=send_data)
except Exception as e:
beat_logger.error(f'send heartbeat fail: {e}')
beat_logger.info(send_data)
await asyncio.sleep(interval)
class VLLMServer():
def __init__(self):
self.config = config
os.environ['CUDA_VISIBLE_DEVICES'] = config['common']['device']
self.logger = self.create_logger()
self.max_input_len = int(self.config['chatplusplus']['max_input_len'])
self.max_new_token = int(self.config['chatplusplus']['max_new_token'])
self.gpu_memory_utilization = float(self.config['chatplusplus']['gpu_memory_utilization'])
# 0表示greedy search 大于0表示 sampling 越大模型越随机
if self.config['chatplusplus']['do_sample']:
self.temperature = 1.0
self.top_p = 1.0
else:
self.temperature = 0
self.top_p = 1.0
# 这句代码放在.py文件首位os.environ['CUDA_VISIBLE_DEVICES'] 不生效
from transformers.generation.utils import LogitsProcessorList
logits_processor = LogitsProcessorList()
logits_processor.append(InvalidScoreLogitsProcessor())
request_dict = {
"n": 1,
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_new_token,
"logits_processors": logits_processor
}
from vllm.sampling_params import SamplingParams
self.sampling_params = SamplingParams(**request_dict)
self.engine = self.init_engine()
def build_rsp(self, answer, req_id, tokens, cost_t, speed):
elements = []
for a in answer.split('\n\n'):
a = a.split(":")
elements.append({"tag":a[0],"value":a[1]})
resp = {
"id": req_id,
"jsonrpc": "2.0",
"ret": 0,
"result": {
"chatInfo": {
"answer": answer,
"elements": elements
},
"tokens": tokens,
"cost_time": str(cost_t) + " ms",
"speed": str(speed) + " tokens/s"
}
}
return resp
async def inerence(self, request:web.Request):
req = await request.json()
self.logger.info(f"receive request: {json.dumps(req, ensure_ascii=False)}")
session_id = req['id']
data = req['params']['data']
query = data['content']
# 输入长度进行截断
query = query[-self.max_input_len:]
query = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
start = time.time()
request_id = random_uuid()
results_generator = self.engine.generate(prompt=query,sampling_params=self.sampling_params, request_id=request_id)
async for result in results_generator:
end = time.time()
output = result.outputs[0]
token_ids = output.token_ids
answer = output.text.strip()
cost_t = (end-start)*1000
speed = round(len(token_ids) / (end - start), 2)
resp = self.build_rsp(answer,session_id, len(token_ids), round(cost_t, 4), speed)
self.logger.info(f"model inference session_id:{session_id} send resp:{json.dumps(resp, ensure_ascii= False)}")
self.logger.info(f"model inference session_id:{session_id}, len of answer:{len(answer)}, len of token_ids:{len(token_ids)}, cost:{cost_t} ms speed:{speed} tokens/s")
return web.json_response(resp)
def init_engine(self):
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
# self.model_dir = self.config['chatplusplus']['base_model_v2']
# if self.config['peft']['use_lora']:
# merge(self.config)
# self.model_dir = self.config['peft']['merge_dir']
# self.logger.info("merge lora to base model")
self.model_dir = self.config['peft']['merge_dir']
engine_args = AsyncEngineArgs(model=self.model_dir, trust_remote_code=True, disable_log_requests=True)
engine_args.gpu_memory_utilization = self.gpu_memory_utilization
engine = AsyncLLMEngine.from_engine_args(engine_args)
self.logger.info("init_engine finished!")
return engine
def create_logger(self):
log_level = config["log"]["log_level"]
log_level = log_level_dict[log_level]
log_path = "./logs/server.log"
logger = logging.getLogger(__name__)
logger.setLevel(level=log_level)
formatter = logging.Formatter("%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s")
# 创建一个handler,用于写入日志文件,按大小覆盖
# file_handler = logging.handlers.RotatingFileHandler(filename=log_path, maxBytes=838860800, backupCount=20, encoding='utf-8')
# 按日期覆盖
file_handler = logging.handlers.TimedRotatingFileHandler(filename=log_path, when='D', interval=1,
encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(level=log_level)
logger.addHandler(file_handler)
# 创建一个handler,用于将日志输出到控制台
console = logging.StreamHandler()
console.setLevel(level=log_level)
console.setFormatter(formatter)
logger.addHandler(console)
return logger
class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 5] = 5e4
return scores
async def main(ip, port):
vllm_server = VLLMServer()
app = web.Application()
# 直连
app.add_routes([
web.post('/nlp', vllm_server.inerence)
])
# 创建一个协程 心跳任务
asyncio.create_task(heart_beat(ip, port))
return app
if __name__ == '__main__':
if not os.path.exists("./logs"):
os.makedirs("./logs")
# 目标IP和端口
ip = ""
port = xxxx
bind_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0)
# 自动获取本地IP和端口
local_ip = get_local_ip(ip , port )
bind_socket.bind(('0.0.0.0', 0))
web.run_app(main(local_ip, bind_socket.getsockname()[1]), sock=bind_socket)
vllm异步推理的核心代码如下:
引擎的创建
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
self.model_dir = "./path"
engine_args = AsyncEngineArgs(model=self.model_dir, trust_remote_code=True, disable_log_requests=True)
# gpu_memory_utilization 控制gpu中kvcache可使用的最大显存比例
engine_args.gpu_memory_utilization = self.gpu_memory_utilization
engine = AsyncLLMEngine.from_engine_args(engine_args)
推理结果
request_dict = {
"n": 1,
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_new_token,
"logits_processors": logits_processor
}
from vllm.sampling_params import SamplingParams
self.sampling_params = SamplingParams(**request_dict)
# 注意request_id要不相同,continuous batching才能生效和提速
request_id = random_uuid()
results_generator = self.engine.generate(prompt=query,sampling_params=self.sampling_params, request_id=request_id)
async for result in results_generator:
end = time.time()
output = result.outputs[0]
token_ids = output.token_ids
answer = output.text.strip()
注意到这里的engine.generate传入的参数中request_id一定要具有唯一性,才能使continuous batching生效,显卡吞吐才能增加,另外最后的结果是个异步生成器,需要采用异步的方式来获取结果;其他的代码就没有什么好说的了。
tensortllm_service
首先给出完整的web service代码
import aiohttp
import json
import traceback
import asyncio
import os
import configparser
from aiohttp import web
import socket
from tools.log import Logger
import logging
import time
import re
import json
from typing import AsyncGenerator, Optional
from utils.utils import merge
import pytomlpp as toml
from tensorrt_llm.executor import GenerationExecutor
from tensorrt_llm.executor import GenerationExecutorWorker
import tensorrt_llm.bindings as tllm
import tensorrt_llm.bindings.executor as trtllm
from tensorrt_llm import LLM, ModelConfig
from transformers import AutoTokenizer
from tensorrt_llm.hlapi.utils import SamplingConfig
from transformers.generation.logits_process import LogitsProcessor
import torch
config = toml.load('config.toml')
log_level_dict = {'CRITICAL': 50, 'FATAL': 50, 'ERROR': 40, 'WARNING': 30, 'WARN': 30, 'INFO': 20, 'DEBUG': 10,
'NOTSET': 0, 'critical': 50, 'fatal': 50, 'error': 40, 'warning': 30, 'warn': 30, 'info': 20,
'debug': 10, 'notset': 0}
def get_local_ip(ip, port):
try:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((ip, port))
ip = conn.getsockname()[0]
except Exception:
raise
conn.close()
return ip
async def heart_beat(ip, port):
beat_logger = Logger(log_name='beat', log_level=30, log_file='./logs/beat.log').logger
# 单位秒
interval = config["common"]["hb_interval"]
send_data = {
'method': 'heartbeat',
'params': {
'data': [
]
}
}
******
send_data = json.dumps(send_data)
client = aiohttp.ClientSession()
while True:
try:
await client.post(url=hb_url, data=send_data)
except Exception as e:
beat_logger.error(f'send heartbeat fail: {e}')
beat_logger.info(send_data)
await asyncio.sleep(interval)
class TrtLLMServer():
def __init__(self):
self.config = config
os.environ['CUDA_VISIBLE_DEVICES'] = config['common']['device']
self.logger = self.create_logger()
self.max_input_len = int(self.config['chatplusplus']['max_input_len'])
self.max_new_tokens = int(self.config['chatplusplus']['max_new_tokens'])
self.free_gpu_memory_fraction = float(self.config['chatplusplus']['free_gpu_memory_fraction'])
self.repetition_penalty = float(self.config['chatplusplus']['repetition_penalty'])
self.engine_dir = self.config['chatplusplus']['engine_dir']
self.tokenizer_dir = self.config['chatplusplus']['tokenizer_dir']
self.top_p = float(self.config['chatplusplus']['top_p'])
self.temperature = float(self.config['chatplusplus']['temperature'])
self.generation_kwargs = {
"max_new_tokens": self.max_new_tokens,
"repetition_penalty": self.repetition_penalty,
"top_p":self.top_p,
"temperature" : self.temperature
}
self.smaple_config = SamplingConfig(beam_width=1, max_new_tokens= self.max_new_tokens)
self.executor = self.init_trtllmexecutor()
def init_trtllmexecutor(self):
kv_cache_config = tllm.KvCacheConfig(free_gpu_memory_fraction=self.free_gpu_memory_fraction)
executor_config = tllm.TrtGptModelOptionalParams()
executor_config.kv_cache_config = kv_cache_config
self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_dir, trust_remote_code=True, use_fast=True)
self.logger.info(f"tokenizer.fast {self.tokenizer.is_fast}")
executor = GenerationExecutor.create(engine_dir=self.engine_dir, tokenizer=self.tokenizer, max_beam_width=1, executor_config=executor_config)
query = "你好呀"
prompt = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
result = executor.generate(prompt=prompt, streaming=False, sampling_config=self.smaple_config)
self.logger.info(f"query:{query}----response:{result.text}")
self.logger.info("init_trtllmexecutor finished!")
return executor
def build_rsp(self, answer, req_id, tokens, cost_t, speed):
elements = []
for a in answer.split('\n\n'):
a = a.split(":")
elements.append({"tag":a[0],"value":a[1]})
resp = {
"id": req_id,
"jsonrpc": "2.0",
"ret": 0,
"result": {
"chatInfo": {
"answer": answer,
"elements": elements
},
"tokens": tokens,
"cost_time": str(cost_t) +" ms",
"speed": str(speed)+" tokens/s"
}
}
return resp
async def inerence(self, request:web.Request):
req = await request.json()
self.logger.info(f"receive request: {json.dumps(req, ensure_ascii=False)}")
session_id = req['id']
data = req['params']['data']
query = data['content']
if "do_sample" in data:
do_smaple = data['do_sample']
else:
do_smaple = False
if not do_smaple:
self.smaple_config.top_p = None
self.smaple_config.top_k = None
# self.generation_kwargs['top_k'] = 1
# self.generation_kwargs['top_p'] = 0
# 输入长度进行截断
query = query[-self.max_input_len:]
query = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
start = time.time()
t1 = time.time()
input_ids = self.tokenizer.encode(query, return_tensors="pt", return_attention_mask=False)
t2 = time.time()
self.logger.info(f"tokenizer time cost {round((t2-t1)*1000, 4)} ms")
results_generator = self.executor.generate_async(prompt=query, streaming=False,
sampling_config=self.smaple_config)
async for result in results_generator:
end = time.time()
token_ids = result.token_ids[input_ids.shape[1]:]
answer = self.tokenizer.decode(token_ids)
cost_t = (end - start) * 1000
speed = round(len(token_ids) / (end - start), 2)
resp = self.build_rsp(answer, session_id, len(token_ids), round(cost_t, 4), speed)
self.logger.info(f"model inference session_id:{session_id} send resp:{json.dumps(resp, ensure_ascii=False)}")
self.logger.info(
f"model inference session_id:{session_id}, len of answer:{len(answer)}, len of token_ids:{len(token_ids)}, cost:{cost_t} ms speed:{speed} tokens/s")
return web.json_response(resp)
def create_logger(self):
log_level = config["log"]["log_level"]
log_level = log_level_dict[log_level]
log_path = "./logs/server.log"
logger = logging.getLogger(__name__)
logger.setLevel(level=log_level)
formatter = logging.Formatter("%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s")
# 创建一个handler,用于写入日志文件,按大小覆盖
# file_handler = logging.handlers.RotatingFileHandler(filename=log_path, maxBytes=838860800, backupCount=20, encoding='utf-8')
# 按日期覆盖
file_handler = logging.handlers.TimedRotatingFileHandler(filename=log_path, when='D', interval=1,
encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(level=log_level)
logger.addHandler(file_handler)
# 创建一个handler,用于将日志输出到控制台
console = logging.StreamHandler()
console.setLevel(level=log_level)
console.setFormatter(formatter)
logger.addHandler(console)
return logger
class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 5] = 5e4
return scores
async def main(ip, port):
trtllm_server = TrtLLMServer()
app = web.Application()
# 直连
app.add_routes([
web.post('/nlp', trtllm_server.inerence)
])
# 创建一个协程
asyncio.create_task(heart_beat(ip, port))
return app
if __name__ == '__main__':
if not os.path.exists("./logs"):
os.makedirs("./logs")
# 目标IP和端口
ip = ""
port = xxxx
bind_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0)
# 自动获取本地IP和端口
local_ip = get_local_ip(ip , port )
bind_socket.bind(('0.0.0.0', 0))
web.run_app(main(local_ip, bind_socket.getsockname()[1]), sock=bind_socket)
trtllm异步推理的核心代码如下:
kv_cache_config = tllm.KvCacheConfig(free_gpu_memory_fraction=self.free_gpu_memory_fraction)
executor_config = tllm.TrtGptModelOptionalParams()
executor_config.kv_cache_config = kv_cache_config
self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_dir, trust_remote_code=True, use_fast=True)
self.logger.info(f"tokenizer.fast {self.tokenizer.is_fast}")
executor = GenerationExecutor.create(engine_dir=self.engine_dir, tokenizer=self.tokenizer, max_beam_width=1, executor_config=executor_config)
其实trtllm的API有很多,目前我使用的这一版是经过多次测试才跑通的,主要是engine构建后,初始化的时候各种设置不是很清晰;还有推理的时候smaple_config中设置topk topp之类的参数会出问题,怎么设置,也没有比较详细的文档。
results_generator = self.executor.generate_async(prompt=query, streaming=False,
sampling_config=self.smaple_config)
async for result in results_generator:
end = time.time()
token_ids = result.token_ids[input_ids.shape[1]:]
answer = self.tokenizer.decode(token_ids)
推理结果同样是异步生成器,需要采用异步的方式来处理;同样的注意到这里的generate_async传入的参数prompt有多种形式,str、tensor、list、numpy;经过实验str 和tensor的结果并不一致;受限于nvidia的实现,只能采用str这种方式。
关于engine的构建
遵照github的教程,在/TensorRT-LLM/examples/chatglm路径下执行命令,得到不同的精度的引擎
float16 CUDA_VISIBLE_DEVICES=0 python3 convert_checkpoint.py --model_dir /llm_fast_service/chatglm2-6b-merge --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/fp16/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/fp16/1-gpu --gemm_plugin float16 --output_dir /llm_fast_service/trt_engines/chatglm2_6b/fp16/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/fp16/1-gpu
Weight Only quantization int8 CUDA_VISIBLE_DEVICES=0 python3 convert_checkpoint.py --use_weight_only --weight_only_precision int8 --model_dir /llm_fast_service/chatglm2-6b-merge --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int8_wo/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int8_wo/1-gpu --gemm_plugin float16 --output_dir /llm_fast_service/trt_engines/chatglm2_6b/int8_wo/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/int8_wo/1-gpu
smoothquant——无效输出结果完全不对 smoothquant越大表示量化难度往weight那边迁移的越多 CUDA_VISIBLE_DEVICES=0 python3 convert_checkpoint.py --model_dir /llm_fast_service/chatglm2-6b-merge --smoothquant 0 --per_token --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/sq/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/sq/1-gpu --gemm_plugin float16 --output_dir /llm_fast_service/trt_engines/chatglm2_6b/sq/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/sq/1-gpu
AWQ CUDA_VISIBLE_DEVICES=0 python3 ../quantization/quantize.py --model_dir /llm_fast_service/chatglm2-6b-merge --dtype float16 --qformat int4_awq --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int4_awq/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int4_awq/1-gpu --gemm_plugin float16 --output_dir /llm_fast_service/trt_engines/chatglm2_6b/int4_awq/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/int4_awq/1-gpu
Weight Only quantization int8 in-flight CUDA_VISIBLE_DEVICES=0 python3 convert_checkpoint.py --use_weight_only --weight_only_precision int8 --model_dir /llm_fast_service/chatglm2-6b-merge --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int8_wo/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int8_wo/1-gpu --max_batch_size 8 --gemm_plugin float16 --gpt_attention_plugin float16 --paged_kv_cache enable --remove_input_padding enable --output_dir /llm_fast_service/trt_engines/chatglm2_6b/int8_wo_In-flight/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/int8_wo_In-flight/1-gpu
AWQ In-flight CUDA_VISIBLE_DEVICES=0 python3 ../quantization/quantize.py --model_dir /llm_fast_service/chatglm2-6b-merge --dtype float16 --qformat int4_awq --output_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int4_awq/1-gpu CUDA_VISIBLE_DEVICES=0 trtllm-build --checkpoint_dir /llm_fast_service/trt_ckpt/chatglm2-6b/int4_awq/1-gpu --max_batch_size 8 --gemm_plugin float16 --gpt_attention_plugin float16 --paged_kv_cache enable --remove_input_padding enable --output_dir /llm_fast_service/trt_engines/chatglm2_6b/int4_awq_In-flight/1-gpu CUDA_VISIBLE_DEVICES=0 python3 ../chatglm_run.py --engine_dir /llm_fast_service/trt_engines/chatglm2_6b/int4_awq_In-flight/1-gpu
值得注意的点是
量化的加速,只能是对应小batch和不太大的模型,因为计算的过程中量化权重还会反量化到fp16,模型太大以及batch太大,量化后缓解memory bound得到的收益不能抵消大矩阵计算compute bound的消耗,从而导致并不能加速
三、推理速度对比
模型版本——6B版本ChatGLM先看看同步bs=1的时候,torch、vllm和trtllm模型推理的效果。
1、非业务数据
"你好", "你是谁呀?", "你能做什么呀?", "你真厉害", "真棒呀", "再见了", "给我推荐一部电影", "你知道明天天气怎么样吗?"
torch的生成
代码简单如下:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
from transformers import AutoTokenizer,AutoModel
import time
if __name__ == '__main__':
device = "cuda"
gen_kwargs = {"max_length": 8192, "num_beams": 1,
"do_sample": False, "top_p": 0.8,
"temperature": 0.95
}
model = AutoModel.from_pretrained(pretrained_model_name_or_path="./chatglm2-6b-merge", trust_remote_code=True).to(device)
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path="./chatglm2-6b-merge", trust_remote_code=True)
query = "截止2024年,中国有多少人口?"
prompt = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
start = time.time()
inputs = tokenizer(prompt, return_tensors="pt", padding=True)
inputs = inputs.to(model.device)
outputs = model.generate(**inputs, **gen_kwargs)
outputs_i = outputs.tolist()[0][len(inputs["input_ids"][0]):]
response = tokenizer.decode(outputs_i)
print(f"query: {query}---response: {[response]}")
print('\n\n')
querys = [
"你好",
"你是谁呀?",
"你能做什么呀?",
"你真厉害",
"真棒呀",
"再见了",
"给我推荐一部电影",
"你知道明天天气怎么样吗?",
]
total_tokens = 0
total_times = 0
for index, query in enumerate(querys):
prompt = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
start = time.time()
inputs = tokenizer(prompt, return_tensors="pt", padding=True)
inputs = inputs.to(model.device)
outputs = model.generate(**inputs, **gen_kwargs)
end = time.time()
outputs_i = outputs.tolist()[0][len(inputs["input_ids"][0]):]
response = tokenizer.decode(outputs_i)
total_times += (end - start)
total_tokens += len(outputs_i)
print("*" * 100)
print(f'Output: "{[response]}"')
print(
f"tokens count:{len(outputs_i)} --time cost:{(end - start) * 1000} ms----->speed {len(outputs_i) / (end - start)} tokens/s")
print('*' * 100)
print(f"average speed: {round((total_tokens / total_times), 2)} tokes/s")
结果如下
average speed: 41.78 tokes/s,显存占用16G
trtllm的生成
对huggingface的模型权重进行转换为trtllm的权重,然后按照上述步骤构建对应的engine,运行模型推理,脚本核心函数如下
from tensorrt_llm.runtime import ModelRunnerCpp
import torch
from utils import (DEFAULT_HF_MODEL_DIRS, DEFAULT_PROMPT_TEMPLATES,
load_tokenizer, read_model_name, throttle_generator)
import tensorrt_llm
import tensorrt_llm.profiler
from tensorrt_llm.logger import logger
from tensorrt_llm.runtime import PYTHON_BINDINGS, ModelRunner
args.tokenizer_dir = "/llm_fast_service/chatglm2-6b-merge"
model_name, model_version = read_model_name(args.engine_dir)
if args.tokenizer_dir is None:
logger.warning(
"tokenizer_dir is not specified. Try to infer from model_name, but this may be incorrect."
)
args.tokenizer_dir = DEFAULT_HF_MODEL_DIRS[model_name]
tokenizer, pad_id, end_id = load_tokenizer(
tokenizer_dir=args.tokenizer_dir,
vocab_file=args.vocab_file,
model_name=model_name,
model_version=model_version,
tokenizer_type=args.tokenizer_type,
)
# 构建模型推理runner
runner_cls = ModelRunner if args.use_py_session else ModelRunnerCpp
runner_kwargs = dict(engine_dir=args.engine_dir,
lora_dir=args.lora_dir,
rank=runtime_rank,
debug_mode=args.debug_mode,
lora_ckpt_source=args.lora_ckpt_source,
max_batch_size=1,
max_input_len=1024,
max_output_len=1024,
max_beam_width=1,
free_gpu_memory_fraction=0.15)
runner = runner_cls.from_dir(**runner_kwargs)
querys = [
"你好",
"你是谁呀?",
"你能做什么呀?",
"你真厉害",
"真棒呀",
"再见了",
"给我推荐一部电影",
"你知道明天天气怎么样吗?",
]
total_tokens = 0
total_times = 0
with torch.no_grad():
for query in querys:
# query = json.loads(query)
t1 = time.time()
batch_input_ids = []
curr_text = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
input_ids = tokenizer.encode(curr_text,
add_special_tokens=True,
truncation=True,
max_length=1000)
batch_input_ids.append(input_ids)
batch_input_ids = [
torch.tensor(x, dtype=torch.int32) for x in batch_input_ids
]
input_lengths = [x.size(0) for x in batch_input_ids]
outputs = runner.generate(
batch_input_ids,
max_new_tokens=args.max_output_len,
max_attention_window_size=args.max_attention_window_size,
sink_token_length=args.sink_token_length,
end_id=end_id,
pad_id=pad_id,
temperature=args.temperature,
top_k=args.top_k,
top_p=args.top_p,
num_beams=args.num_beams,
length_penalty=args.length_penalty,
early_stopping=args.early_stopping,
repetition_penalty=args.repetition_penalty,
presence_penalty=args.presence_penalty,
frequency_penalty=args.frequency_penalty,
stop_words_list=stop_words_list,
bad_words_list=bad_words_list,
output_cum_log_probs=(args.output_cum_log_probs_npy != None),
output_log_probs=(args.output_log_probs_npy != None),
lora_uids=args.lora_task_uids,
prompt_table_path=args.prompt_table_path,
prompt_tasks=args.prompt_tasks,
streaming=args.streaming,
output_sequence_lengths=True,
return_dict=True,
medusa_choices=args.medusa_choices)
torch.cuda.synchronize()
output_ids = outputs['output_ids']
sequence_lengths = outputs['sequence_lengths']
batch_size, num_beams, _ = output_ids.size()
for batch_idx in range(batch_size):
inputs = output_ids[batch_idx][0][:input_lengths[batch_idx]].tolist()
input_text = tokenizer.decode(inputs)
print(f'Input [Text {batch_idx}]: \"{[input_text]}\"')
for beam in range(num_beams):
output_begin = input_lengths[batch_idx]
output_end = sequence_lengths[batch_idx][beam]
outputs = output_ids[batch_idx][beam][output_begin:output_end].tolist()
output_text = tokenizer.decode(outputs)
total_tokens += (output_end - output_begin)
t2 = time.time()
total_times += (t2 - t1)
print(f'Output [Text {batch_idx} Beam {beam}]: \"{[output_text]}\"')
print(f"tokens count:{output_end - output_begin} --time cost:{(t2 - t1) * 1000} ms----->speed {(output_end - output_begin) / (t2 - t1)} tokens/s")
print('*'*100)
print(f"average speed: {round((total_tokens/total_times).tolist(),2)} tokes/s")
不同精度运行结果如下
fp16权重
free_gpu_memory_fraction=0.15 显存占用15G average speed: 69.5 tokes/s
wo_int8权重
free_gpu_memory_fraction=0.15 显存占用10G average speed: 129.7 tokes/s
wo_int4
free_gpu_memory_fraction=0.15 显存占用7.4G average speed: 207.73 tokes/s
sq_W8A8
这个文本生成有问题,应该是量化或者校准的时候出现了问题,我这边没有搞清楚那里出现了问题。
可以看到推理速度非常快,average speed: 122.28 tokes/s;但是生成的字符全部是错误的,没有意义的。
vllm推理
代码如下,非常简单
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "0"
from vllm import LLM, SamplingParams
import torch
import time
from transformers.generation.logits_process import LogitsProcessor
from transformers.generation.utils import LogitsProcessorList
class InvalidScoreLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
if torch.isnan(scores).any() or torch.isinf(scores).any():
scores.zero_()
scores[..., 5] = 5e4
return scores
logits_processor = LogitsProcessorList()
logits_processor.append(InvalidScoreLogitsProcessor())
if __name__ == '__main__':
request_dict = {
"n": 1,
"temperature": 0,
"top_p": 1.0,
"max_tokens": 200,
"logits_processors": logits_processor
}
sampling_params = SamplingParams(**request_dict)
# Create an LLM.
llm = LLM(model="./chatglm2-6b-merge", gpu_memory_utilization=0.8, trust_remote_code=True)
querys = [
"你好",
"你是谁呀?",
"你能做什么呀?",
"你真厉害",
"真棒呀",
"再见了",
"给我推荐一部电影",
"你知道明天天气怎么样吗?",
]
total_tokens = 0
total_times = 0
with torch.no_grad():
for query in querys:
# query = json.loads(query)
batch_input_ids = []
curr_text = "[Round {}]\n\n问:{}\n\n答:".format(1, query)
start = time.time()
outputs = llm.generate(curr_text, sampling_params)
end = time.time()
# Print the outputs.
for output in outputs:
output = output.outputs[0]
token_ids = output.token_ids
answer = output.text.strip()
total_times += (end - start)
total_tokens += len(token_ids)
print(f'Output: "{[answer]}"')
print(
f"tokens count:{len(token_ids)} --time cost:{(end - start) * 1000} ms----->speed { len(token_ids) / (end - start)} tokens/s")
print('*' * 100)
print(f"average speed: {round((total_tokens / total_times), 2)} tokes/s")
只采用FP16来推理,量化的稍显麻烦这里就不做对比了
average speed: 68.96 tokes/s gpu_memory_utilization=0.8(模型才能启动成功) 显存占用15G
2、业务数据
业务数据主要考虑线上web服务的调用,需要的是高性能低延迟,这里我们简单的给出一个业务数据异步调用和串行调用耗时的对比,由于保密需要,不列举业务数据了。
trtllm相对vllm来说还是具有一定的优势的,功能齐全,但是目前trtllm还有些不足,构建引擎的时候需要预先指定max_bs/max_input/max_output之类的,不能动态修改,这可能也是trtllm推理速度优化的如此之快的一个必须的措施,灵活性上有所损失;另外trtllm异步推理的时候不支持topk、topp之类的设置,所以只能是greedy seach解码策略进行文本生成。
最后终于把之前计划的博客写出来了,现在AI领域发展的太快了,日新月异,各种基座模型、各种应用方案、各种加速技术层出不穷、目不暇接,有一阵子是真的比较焦虑,怕掉队了。后面想了一想,自己是普通人,没有那么多精力和时间去全部学习,只要保持一个敬畏的心态,保持持续学习的动作,一步一个脚印,抓住自己感兴趣的点以及主流方向,还有公司所需要的东西,慢慢学习吸收,就不会落伍。
参考文章
vLLM(一)PagedAttention 算法
大模型推理核心技术之Continuous Batching和我的WXG往事
How continuous batching enables 23x throughput in LLM inference while reducing p50 latency
SmoothQuant: Accurate and Efficient Post-Training Quantization for Large Language Models
W4A16模型量化大法 AWQ