用于服务端和客户端通信,服务端主动给客户端发送消息
前提:
确保安装了socket库:
pip install flask-socketio python-socketio
服务端代码
from flask import Flask
from flask_socketio import SocketIO
import threading
import time
app = Flask(__name__)
socketio = SocketIO(app)
# 全局变量
t = 0
list230 = []
def read230():
global t
global list230
while True:
try:
list230 = [1, 1, 1, 0, 0]
socketio.emit("update_data", {"data": list230})
except Exception as exc:
list230 = [0, 0, 0, 100, 100]
time.sleep(1)
time.sleep(0.1)
if __name__ == "__main__":
t1 = threading.Thread(target=read230, name="read230")
t1.start()
socketio.run(app, host="0.0.0.0", port=5000)
客户端是另一个程序,你可以使用 SocketIO 客户端库来接收数据
import socketio
# 创建一个 SocketIO 客户端
sio = socketio.Client()
@sio.event
def connect():
print("连接成功")
@sio.event
def update_data(msg):
print("接收到数据:", msg['data'])
@sio.event
def disconnect():
print("断开连接")
if __name__ == '__main__':
# 连接到服务器
sio.connect('http://127.0.0.1:5000')
try:
# 保持程序运行
while True:
pass
except KeyboardInterrupt:
sio.disconnect()
结果