Python Tornado 实现SSE服务端主动推送方案

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor

class SSE(RequestHandler):

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        result = yield self.doHandle()
        self.write(result)
        # 结束
        self.finish()

    @run_on_executor
    def doHandle(self):
        tornado.ioloop.IOLoop.current()
        # 分十次推送信息
        for i in range(10):
            time.sleep(1)
            self.flush()
            self.callback(f"current: {i}")
        return f"data: end\n\n"

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)


def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

在这里插入图片描述

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="messages"></div>
</body>
<script>
    const eventSource = new EventSource('http://localhost:8020/sse');

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

运行后可以看到服务端分阶段推送过来的数据:

在这里插入图片描述

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor


# 单例
def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return wrapper


# 订阅推送工具类
@singleton
class Pusher():

    def __init__(self):
        self.clients = {}

    def add_client(self, client_id, callback):
        if client_id not in self.clients:
            self.clients[client_id] = callback
            print(f"{client_id} 连接")

    def send_all(self, message):
        for client_id in self.clients:
            callback = self.clients[client_id]
            print("发送消息给:", client_id)
            callback(message)

    def send(self, client_id, message):
        callback = self.clients[client_id]
        print("发送消息给:", client_id)
        callback(message)


class SSE(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    @tornado.gen.coroutine
    def get(self):
        # 客户端唯一标识
        client_id = self.get_argument("client_id")
        self.pusher.add_client(client_id, self.callback)

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


# 定义推送接口,模拟推送
class Push(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        # 客户端标识
        client_id = self.get_argument("client_id")
        # 推送的消息
        message = self.get_argument("message")
        result = yield self.doHandle(client_id, message)
        self.write(result)

    @run_on_executor
    def doHandle(self, client_id, message):
        tornado.ioloop.IOLoop.current()
        self.pusher.send(client_id, message)
        return "success"


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/push", Push),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)

def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="client"></div>
    <div id="messages"></div>
</body>
<script>
    function generateUUID() {
      let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        const r = Math.random() * 16 | 0;
        const v = c === 'x' ? r : (r & 0x3 | 0x8);
        return v.toString(16);
      });

      return uuid;
    }
    // 利用uuid 模拟生成唯一的客户端ID
    let client_id = generateUUID();
    document.getElementById('client').innerHTML = "当前 client_id = "+client_id;
    const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在服务端的日志中也能看到这三个客户端的连接:

在这里插入图片描述

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

在这里插入图片描述

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的页面:

在这里插入图片描述
已经成功收到推送的消息,反之看另外两个:

在这里插入图片描述
在这里插入图片描述
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/356469.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深度强化学习(王树森)笔记05

深度强化学习&#xff08;DRL&#xff09; 本文是学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。本文在ChatGPT辅助下完成。 参考链接 Deep Reinforcement Learning官方链接&#xff1a;https://github.com/wangshusen/DRL 源代码链接&#xff1a;https://github.c…

政安晨的机器学习笔记——基于Ubuntu系统的Miniconda安装Jupyter Notebook

一、准备工作 Miniconda的安装请参考我的另一篇博客文章&#xff1a; 实例讲解深度学习工具PyTorch在Ubuntu系统上的安装入门&#xff08;基于Miniconda&#xff09;&#xff08;非常详细&#xff09;https://blog.csdn.net/snowdenkeke/article/details/135887509 这里…

༺༽༾ཊ—Unity之-05-抽象工厂模式—ཏ༿༼༻

首先创建一个项目&#xff0c; 在这个初始界面我们需要做一些准备工作&#xff0c; 建基础通用文件夹&#xff0c; 创建一个Plane 重置后 缩放100倍 加一个颜色&#xff0c; 任务&#xff1a;使用 抽象工厂模式 创建 人物与宠物 模型&#xff0c; 首先资源商店下载 人物与宠物…

C++STL之map、set的使用和模拟实现

绪论​&#xff1a; “我这个人走得很慢&#xff0c;但是我从不后退。——亚伯拉罕林肯”&#xff0c;本章是接上一章搜索二叉树中红黑树的后续文章&#xff0c;若没有看过强烈建议观看&#xff0c;否则后面模拟实现部分很看懂其代码原理。本章主要讲了map、set是如何使用的&am…

torch与cuda\cudnn和torchvision的对应

以上图片来源于这篇博客 于是&#xff0c;我需要手动下载0.9.0torchvision 直接在网站https://pypi.tuna.tsinghua.edu.cn/simple/后面加上torchvision&#xff0c;就不用ctrlF搜torchvision了&#xff0c;即进入下面这个网站&#xff0c;找到对应版本的包下载安装即可 https…

html页面练习——公司发展流程图

1.效果图 2.html <div class"center"><header><h1>发展历程</h1><h3>CONMPANY HISTORY</h3></header><main><div class"left"><div class"time1">2012.12</div><div cla…

C/C++编码问题研究

文章目录 一、Unicode字符集与U8/U16/U32编码二、编码1. 占字节数2. ASCII、GB2312、GBK、GB18030 以及 UTF8 的关系3. BOM4. UTF-8的存储实现 三、编译器字符集设置1. GCC语法Example 2. MSVC语法Example 三、wchar_t五、编码转换函数六、代码 & 实践1. UTF8与UTF16、UTF3…

opencv#35 连通域分析

连通域分割原理 像素领域介绍: 4邻域是指中心的像素与它邻近的上下左右一共有4个像素&#xff0c;那么称这4个像素为中心像素的4邻域。 8邻域是以中心像素周围的8个像素分别是上下左右和对角线上的4个像素。 连通域的定义(分割)分为两种:以4邻域为相邻判定条件的连通域分割和…

老司机用脚本批量巧删恶意文件

作者&#xff1a;田逸&#xff08;formyz&#xff09; 一个NFS服务器&#xff0c;为多个Web项目所共享。这些目录包括PHP程序、图片、HTML页面和用户上传的文档和附件等。因为某些Web框架古老&#xff0c;存在诸如不对上传文件做严格的安全性检查&#xff0c;虽然此NFS服务器位…

腾讯发表多模态大模型最新综述,从26个主流大模型看多模态效果提升关键方法

在大规模语言模型&#xff08;LLMs&#xff09;通往通用人工智能&#xff08;AGI&#xff09;的道路中&#xff0c;从传统的单一的“语言模态”扩展到“图像”、“语音”等等的“多模态”必然是大模型进化的必经之路。 在过去的 2023 年&#xff0c;多模态大规模语言模型&…

建筑效果图渲染制作周期是多久

建筑效果图的渲染制作周期会根据多种因素而变化&#xff0c;包括项目的复杂性、渲染的详细程度、分辨率要求、场景中的元素数量和复杂度、以及项目所需的修改和迭代次数等。 通常&#xff0c;简单的建筑效果图可能在几个工作日内完成&#xff0c;而大型或高度复杂的项目可能需要…

合并两个排序的链表

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…

Cantor表(刷题)(C语言)

个人博客主页&#xff1a;https://blog.csdn.net/2301_79293429?typeblog 专栏&#xff1a;https://blog.csdn.net/2301_79293429/category_12545690.html 题目描述 现代数学的著名证明之一是 Georg Cantor 证明了有理数是可枚举的。他是用下面这一张表来证明这一命题的&…

【JLU】校园网linux客户端运行方法

终于给这输入法整好了&#xff0c;就像上面图里那样执行命令就行 写一个开机自启的脚本会更方便&#xff0c;每次都运行也挺烦的 补充了一键运行脚本&#xff0c;文件路径需要自己修改 #!/bin/bashrun_per_prog"sudo /home/d0/ubuntu-drclient-64/DrClient/privillege.s…

Java项目:基于SSM框架实现的高校毕业生就业管理系统(ssm+B/S架构+源码+数据库+毕业论文)

一、项目简介 本项目是一套ssm817基于SSM框架实现的高校毕业生就业管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调…

【linux】-centos7版本前后-变化篇

1.centos7版本前后区别 首先文件系统变化&#xff0c;由EXT4&#xff0c;变为XFS格式。可支持容量500TB的文件&#xff0c;而6代仅能支持16TB。首个进程变为systemd, 替换了熟悉的init进程。它的特点是功能强大&#xff0c;体积也很强大。 systemd给我们带来了一个全家桶命令&…

Java基础数据结构之反射

一.定义 Java的反射机制是在运行状态中的&#xff0c;对于任意一个类都能知道这个类的所有属性和方法&#xff1b;对于任意一个对象&#xff0c;都能够调用它的任意方法及属性。既然能拿到&#xff0c;我们就可以修改部分类型信息。这种动态获取信息以及动态调用对象方法的功能…

3d合并模型是重名材质---模大狮模型网

当合并3d模型时&#xff0c;如果存在重名的材质&#xff0c;可能会导致加载问题。这是因为3D软件在处理重名材质时可能会出现冲突。你可以尝试以下方法解决这个问题&#xff1a; 重命名材质&#xff1a;检查合并的模型中的材质&#xff0c;确保它们具有唯一的命名。修改重名的材…

【学网攻】 第(15)节 -- 标准ACL访问控制列表

系列文章目录 目录 系列文章目录 文章目录 前言 一、ACL(访问控制列表)是什么? 二、实验 1.引入 实验拓扑图 实验配置 测试PC2能否Ping通PC3 配置ACL访问控制 实验验证 PC1 Ping PC3 总结 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认…

Android T 远程动画显示流程(更新中)

序 本地动画和远程动画区别是什么? 本地动画&#xff1a;自给自足。对自身SurfaceControl矢量动画进行控制。 远程动画&#xff1a;拿来吧你&#xff01;一个app A对另一个app B通过binder跨进程通信&#xff0c;控制app B的SurfaceControl矢量动画。 无论是本地动画还是远程…