关于SSE的基本概念可以看一下阮一峰老师的这篇文章:Server-Sent Events教程。
现在比较常见的场景是gpt回答的时候类似下图这种打字机的情况,因为AI一般响应时间会比较长,使用这种方式能让人别等那么久,是一个相对比较良好的用户体验。
这里简单说一下具体这个场景下的实现思路。
客户端(前端)
SSE 使用 HTTP 协议,客户端需要向服务端发起一次请求,需要定义一个fetchEventSource
方法,大概这样:
const fetchEventSource = (url, options) => {
fetch(url, options).then(resp => {
if (resp.status === 200) {
options.onopen && options.onopen()
return resp.body
}
}).then(rb => {
const reader = rb.getReader()
const push = () => {
// done 为数据是否接收完成 boolean 值
// value 为接收到的数据, Uint8Array 格式
return reader.read().then(({done, value}) => {
if (done) {
options.onclose && options.onclose()
return
}
options.onmessage && options.onmessage(new TextDecoder().decode(value))
return push()
});
}
// 开始读取流信息
return push()
}).catch((e) => {
options.error && options.error(e)
})
}
定义url和options参数,url传请求的地址,options可以定义更多内容,大概像这样调用fetchEventSource
方法:
fetchEventSource('http://localhost:8000/gpt/summary', {
method: 'POST',
body: JSON.stringify(data),
headers: {
"Content-Type": "application/json",
"Authorization": "此处可以带接口鉴权token"
},
onopen: () => {
console.log('连接sse成功')
},
onclose: () => {
console.log('sse连接关闭')
},
onmessage: (delta) => {
let prefix = 'data: '
if (!delta.startsWith(prefix)) {
return;
}
let delta_array = delta.split(prefix);
delta_array.forEach(function(element) {
element = element.replace(/\n$/, '')
if (element === '[DONE]\n') {
return;
}
if (element != '') {
// 可能有多个回答
let choices = JSON.parse(element).choices;
let content = choices[0].delta.content;
console.log(content);
$('#mean').append(content);
}
});
}
})
可以看到option中比实际http请求多了几个函数:onopen
onclose
onmessage
,onopen
代表和服务端建立连接成功,onclose
则是关闭服务端连接的时候会被调用,onmessage
则是处理接口返回的数据,一般是需要处理多份数据,这里详细讲一下onmessage
的处理方式。
onmessage
可以看到参数是delta,假如说GPT场景下返回了“你好,我是GPT的文字”,可能第一个delta里就返回包含”你好“,第二个delta里返回“,”,第三个delta返回“我是”等等以此类推直到数据返回完毕。数据返回的格式因业务不同也会不同,但基本要做的事情就是处理并解析里面的数据,然后把这些文字拼接到页面上做显示,实现效果。
服务端(后端)
服务端的任务通常是调用gpt的接口,然后流式读取,读到什么马上返回给前端。
这里给一个python的demo:
from sse_starlette.sse import ServerSentEvent, EventSourceResponse
def create_chat_completion(messages):
# 发起请求
response = openai.ChatCompletion.create(
engine="gpt-35-turbo",
messages=messages,
stream=True, # 该参数需要设置为true
max_tokens=1000
)
# 循环处理stream结果
for item in response:
# yield ServerSentEvent(json.dumps(item, ensure_ascii=False), event='delta')
result = json.dumps(item)
print(result)
yield ServerSentEvent(json.dumps(item))
@router.post("/summary")
async def summary(contentNeedSummary: posts.ContentNeedSummary, user = Depends(get_token_header)):
messages = [
{"role": "system", "content": "可以预置一些系统的设置"},
{"role": "user", "content": contentNeedSummary.content}
]
return EventSourceResponse(create_chat_completion(messages))
可以看到上面的方法,前端调用/summary
接口后,进入summary
函数,做一些这个方法特有的设置之后,return了EventSourceResponse(create_chat_completion(messages))。create_chat_completion再实现通用的调用gpt的方法,每得到一个响应,马上用ServerSentEvent包裹返回给前端。
也就是说,整个过程,是前端发起SSE请求服务端,服务端(可以做一些别的处理,比如鉴权和预置系统数据)再发起SSE请求GPT接口;在返回结果的过程中,GPT接口流式输出给服务端,服务端(可以做一些别的处理,比如处理好返回结果的格式,计算token成本)马上返回给前端,比如GPT接口流式输出了“你好”,服务端马上返回了“你好”给前端显示在网页上,再返回“,我是GPT”,服务端接着返回给前端显示在网页上。
这样的处理方式可以让结果尽快显示出来,也可以借由服务端的处理,保证安全和计算成本,以及处理好数据格式。