Python实现半双工的实时通信SSE(Server-Sent Events)
1 简介
实现实时通信一般有WebSocket、Socket.IO和SSE(Server-Sent Events)三种方法。WebSocket和Socket.IO是全双工的实时双向通信技术,适合用于聊天和会话等,但相对于SSE比较笨重,SSE适合用于服务器主动向客户端实时推送数据,例如:用于大模型实时对话。
WebSocket是一种HTML5提供的全双工通信协议,它基于TCP在客户端和服务器之间建立持久性的连接,实现两者之间实时双向数据通信。
Socket.IO是一个封装了 Websocket 的实时双向数据通信库,它封装了自动重连、自动检测网络状况和自动跨浏览器兼容性等。
SSE(Server-Sent Events)是一种利用 HTTP 协议长连接特性,在服务器与客户端之间建立持久化连接,实现服务器主动向客户端推送数据的半双工实时数据通信技术,也被称为“事件流”(Event Stream)。
本文使用Python和Vue3实现SSE的实时通信,现在浏览器支持EventSource,不需要额外安装依赖包。
2 前端Vue3代码
<script setup lang="ts">
import { onBeforeUnmount} from 'vue'
defineProps<{ msg: string }>()
// 定义EventSource
let eventSource: any = null
// 建立连接
function createSseConnect(dataId: string) {
if (window.EventSource) {
// 创建连接
eventSource = new EventSource('http://127.0.0.1:5000/sse?data_id=' + dataId);
// 接收消息
eventSource.onmessage = (event: MessageEvent) => {
console.log("onmessage:" + dataId + ": " + event.data)
};
// // 也可以使用addEventListener实现自定义事件和默认message事件
// eventSource.addEventListener('message', (event: MessageEvent)=> {
// console.log("message" + dataId + ": " + event.data);
// }, false);
// 打开连接
eventSource.onopen = (event: Event) => {
console.log("onopen:" + dataId + ": " + event)
};
// 连接出错时
eventSource.onerror = (event: Event) => {
console.log("onerror :" + dataId + ": " + event)
};
} else {
console.log("浏览器不支持SSE")
}
}
// 组件销毁
onBeforeUnmount(() => {
// 关闭EventSource
if(eventSource != null){
eventSource.close()
}
})
</script>
<template>
<h1>{{ msg }}</h1>
<input type="button" value="发送消息" v-on:click="createSseConnect('1234')" />
</template>
<style scoped>
.read-the-docs {
color: #888;
}
</style>
3 后端Python代码
# 导入所需的模块
import json
import time
import datetime
from flask_cors import CORS
from flask import Flask, request, Response
app = Flask(__name__)
# 解决跨域问题
CORS(app, supports_credentials=True)
def get_data():
# 获取当前时间,并转换为 JSON 格式
dt_ms = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
return json.dumps({'time': dt_ms}, ensure_ascii=False)
@app.route('/sse')
def stream():
data_id = request.args.get('data_id')
print(data_id)
return Response(eventStream(), mimetype="text/event-stream")
def build_message(message: str, event="message"):
"""
构建消息
:param message: 数据消息
:param event: 事件,默认事件是“message”,可以根据自己的需求定制事件,对应前端的eventSource.addEventListener('message',()=>{}, false)中的message。
:return:
"""
head = "event:" + event + "\n" + "data:"
tail = "\n\n"
return head + message + tail
def eventStream():
id = 0
while True:
id += 1
# 睡眠
time.sleep(1)
str_out = build_message(get_data())
print(str_out)
# 构建迭代器
yield str_out
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)