SpringBoot 搭建 SSE

参考链接

https://www.51cto.com/article/798001.html

了解一下SseEmitter(一)-CSDN博客

依赖

有默认的 springboot-web 依赖即可

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

SSEmitter API

方法

说明

onCompletion()

回调方法,连接完成(正常关闭)时触发(超时,异常,complete() 之后触发)

onTimeout()

回调方法,当连接超时时触发

onError()

指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。

completeWithError(e)

在发生错误时关闭连接,并以错误的形式告知客户端

complete()

表示数据发送完成后关闭连接

event()

创建一个事件对象,设置事件名称和数据

使用示例emitter.send(SseEmitter.event().name("message").data("数据!"));

onCompletion() 回调函数触发条件说明:他只会在连接关闭(正常关闭)后调用,可以用于执行一些清理工作。

触发条件:

  1. 调用 complete()
  2. 调用 completeWithError(e)
  3. 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()

服务端

controller

/***
 * @author feiXun
 * @create 2025/1/13 14:09
 **/
@RestController
@RequestMapping("/sse")
@CrossOrigin(origins = "*")
public class SseController {

    @Autowired
    private SseService sseService;

    @GetMapping("/connect")
    @ApiOperation(value = "SSE 客户端连接")
    public SseEmitter connect() throws IOException {
        return sseService.connect();
    }


    @GetMapping("/batchSend")
    @ApiOperation(value = "SSE 群发消息", notes = "SSE 群发消息, 目前用于测试,后期可以删除")
    public void batchSend(@RequestParam("message") Object message){
        sseService.batchSend(message);
    }

}

Service 接口

/***
 * @author feiXun
 * @create 2025/1/13 14:52
 **/
public interface SseService {

    /**
     * 连接
     */
    SseEmitter connect() throws IOException;

    /**
     * 批量发送消息
     */
    void batchSend(Object message);

    /**
     * 发送心跳包
     */
    void sendHeartBeat();


}

Service 实现类

/***
 * @author feiXun
 * @create 2025/1/13 14:52
 **/
@Slf4j
@Service
public class SseServiceImpl implements SseService {

    /**
     * 用于存储 sseEmitterList
     */
    private final List<SseEmitter> sseEmitterList = new CopyOnWriteArrayList<>();

    /**
     * 消息队列
     * 用于将错误的信息,没有 SSE 连接时发送的信息 保存起来
     * 在重新连接的时候推送给 sse 客户端
     */
    private final Queue<SseEmitter.SseEventBuilder> messages = new ConcurrentLinkedQueue<>();

    @Override
    public SseEmitter connect() throws IOException {
        // 0 表示无限长连接;其他:毫秒数,表示连接时长,比如 1000L,就是 1秒后断开连接
        SseEmitter emitter = new SseEmitter(0L);

        // sse 连接完成,准备释放
        emitter.onCompletion(completionCallBack(emitter));

        // 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
        emitter.onError(errorCallBack(emitter));

        // 添加 list,用于发送给多个 sse 客户端
        sseEmitterList.add(emitter);

        // 这里是将 发生错误时,没有 SSE 客户端时 的消息在发一遍
        while (!messages.isEmpty()){
            SseEmitter.SseEventBuilder poll = messages.poll();
            try {
                // 发送信息
                emitter.send(poll);
            } catch (IOException e) {
                // 将信息加入队列,下次连接的时候推送给客户端
                messages.add(poll);
                // 关闭连接并抛出异常给客户端
                emitter.completeWithError(e);
                break;
            }
        }

        // 发送一条 心跳包信息
        emitter.send(MrSseVo.buildHeartBeat());
        return emitter;
    }

    /**
     * 群发消息
     */
    @Override
    public void batchSend(Object message) {
        // 将消息加入消息队列,当有 SSE 连接时,从队列中取出信息推送给 sse 客户端
        if (sseEmitterList.isEmpty()){
            SseEmitter.SseEventBuilder data = builderMessage(message);
            messages.add(data);
            return;
        }
        sseEmitterList.forEach(sseEmitter -> {
            send(sseEmitter, message, false);
        });
    }

    @Override
    public void sendHeartBeat() {
        if (!sseEmitterList.isEmpty()){
            // 批量发送心跳包
            sseEmitterList.forEach(sseEmitter -> {
                send(sseEmitter, "ping", true);
            });
        }
    }

    /**
     * 发送消息
     * @param isPing 是否是心跳包
     */
    private void send(SseEmitter sseEmitter, Object message, boolean isPing){
        SseEmitter.SseEventBuilder data = builderMessage(message);
        try {
            // 发送信息
            sseEmitter.send(message);
        } catch (IOException e) {
            if (!isPing){
                // 将信息加入队列,下次连接的时候推送给客户端
                messages.add(data);
            }
            // 关闭连接并抛出异常给客户端
            sseEmitter.completeWithError(e);
        }
    }

    /**
     * SSE 连接完成 回调(连接已关闭,正准备释放)
     * 触发条件(前提: 客户端的连接没断开)
     * 1. 调用 complete()
     * 2. 调用 completeWithError(e)
     * 3. 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()
     */
    private Runnable completionCallBack(SseEmitter emitter){
        return ()->{
            log.info("连接已关闭,准备释放!");
            // 将 sse 连接 移除
            sseEmitterList.remove(emitter);
        };
    }

    /**
     * SSE 异常回调
     * 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
     */
    private Consumer<Throwable> errorCallBack(SseEmitter sseEmitter){
        return throwable -> {
            log.error("SSE 异常 {}", throwable.getMessage());
            // 关闭连接并抛出异常给客户端
            sseEmitter.completeWithError(throwable);
        };
    }

    /**
     * 构造消息
     */
    private SseEmitter.SseEventBuilder builderMessage(Object message){
        return SseEmitter.event().
                id(UUID.randomUUID().toString())
                .data(message, MediaType.APPLICATION_JSON);
    }

}

客户端

http://127.0.0.1:8000/sse/connect:这个接口用于跟 sse 服务端建立连接

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Client</title>
</head>
<body>
    SSE Client
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('http://127.0.0.1:8000/sse/connect');

        eventSource.onmessage = function(event) {
            const eventsDiv = document.getElementById('events');
            eventsDiv.innerHTML += `<p>${event.data}</p>`;
        };

        eventSource.onerror = function(err) {
            console.error("EventSource failed:", err);
        };
    </script>
</body>
</html>

测试

  1. 打开客户端 或者 浏览器访问 连接 接口(/sse/connect)
  2. 调用发送接口 (/sse/send)
  3. 查看网页等有没有出现相应的信息

例如:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/957045.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python_在钉钉群@人员发送消息

python_在钉钉群人员发送消息 1、第一种 企业内部机器人群聊实现人接入指南&#xff0c;适用于群机器人接收消息&#xff0c;处理完一系列的动作之后&#xff0c;将消息返回给发消息的人员&#xff0c;同时该人员。 需要在企微后台新建一个自建应用&#xff0c;在自建应用里…

macOS安装Gradle环境

文章目录 说明安装JDK安装Gradle 说明 gradle8.5最高支持jdk21&#xff0c;如果使用jdk22建议使用gradle8.8以上版本 安装JDK mac系统安装最新&#xff08;截止2024.9.13&#xff09;Oracle JDK操作记录 安装Gradle 下载Gradle&#xff0c;解压将其存放到资源java/env目录…

HTML之拜年/跨年APP(改进版)

目录&#xff1a; 一&#xff1a;目录 二&#xff1a;效果 三&#xff1a;页面分析/开发逻辑 1.页面详细分析&#xff1a; 2.开发逻辑&#xff1a; 四&#xff1a;完整代码&#xff08;不多废话&#xff09; index.html部分 app.json部分 二&#xff1a;效果 三&#xff1a;页面…

PostgreSQL的学习心得和知识总结(一百六十六)|深入理解PostgreSQL数据库之\watch元命令的实现原理

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《PostgreSQL数据库内核分析》 2、参考书籍&#xff1a;《数据库事务处理的艺术&#xff1a;事务管理与并发控制》 3、PostgreSQL数据库仓库…

使用 Parcel 和 NPM 脚本进行打包

使用 Parcel 和 NPM 脚本进行打包 Parcel Parcel 是一个零配置的网页应用程序打包工具&#xff0c;主要用于快速构建现代 JavaScript 应用。 我们可以使用npm直接安装它 npm install --save-dev parcel //这将把 Parcel 添加到 devDependencies 中&#xff0c;表明它是一个…

项目实战--网页五子棋(游戏大厅)(3)

我们的游戏大厅界面主要需要包含两个功能&#xff0c;一是显示用户信息&#xff0c;二是匹配游戏按钮 1. 页面实现 hall.html <!DOCTYPE html> <html lang"ch"> <head><meta charset"UTF-8"><meta name"viewport"…

网络安全VS数据安全

关于网络安全和数据安全&#xff0c;我们常听到如下两种不同声音&#xff1a; 观点一&#xff1a;网络安全是数据安全的基础&#xff0c;把当年做网络安全的那一套用数据安全再做一遍。 观点二&#xff1a;数据安全如今普遍以为是网络安全的延伸&#xff0c;实际情况是忽略数据…

React 中hooks之useDeferredValue用法总结

目录 概述基本用法与防抖节流的区别使用场景区分过时内容最佳实践 概述 什么是 useDeferredValue? useDeferredValue 是 React 18 引入的新 Hook&#xff0c;用于延迟更新某个不那么重要的部分。它接收一个值并返回该值的新副本&#xff0c;新副本会延迟更新。这种延迟是有…

浅谈 JVM

JVM 内存划分 JVM 内存划分为 四个区域&#xff0c;分别为 程序计数器、元数据区、栈、堆 程序计数器是记录当前指令执行到哪个地址 元数据区存储存储的是当前类加载好的数据&#xff0c;包括常量池和类对象的信息&#xff0c;.java 编译之后产生 .class 文件&#xff0c;运…

HTTP / 2

序言 在之前的文章中我们介绍过了 HTTP/1.1 协议&#xff0c;现在再来认识一下迭代版本 2。了解比起 1.1 版本&#xff0c;后面的版本改进在哪里&#xff0c;特点在哪里&#xff1f;话不多说&#xff0c;开始吧⭐️&#xff01; 一、 HTTP / 1.1 存在的问题 很多时候新的版本的…

使用vscode在本地和远程服务器端运行和调试Python程序的方法总结

1 官网下载 下载网址&#xff1a;https://code.visualstudio.com/Download 如下图所示&#xff0c;可以分别下载Windows,Linux,macOS版本 历史版本下载链接: https://code.visualstudio.com/updates 2 安装Python扩展工具 打开 VS Code&#xff0c;安装 Microsoft 提供的官…

免费为企业IT规划WSUS:Windows Server 更新服务 (WSUS) 之快速入门教程(一)

哈喽大家好&#xff0c;欢迎来到虚拟化时代君&#xff08;XNHCYL&#xff09;&#xff0c;收不到通知请将我点击星标&#xff01;“ 大家好&#xff0c;我是虚拟化时代君&#xff0c;一位潜心于互联网的技术宅男。这里每天为你分享各种你感兴趣的技术、教程、软件、资源、福利…

Ubuntu 24.04 LTS 安装 tailscale 并访问 SMB共享文件夹

Ubuntu 24.04 LTS 安装 tailscale 安装 Tailscale 官方仓库 首先&#xff0c;确保系统包列表是最新的&#xff1a; sudo apt update接下来&#xff0c;安装 Tailscale 所需的仓库和密钥&#xff1a; curl -fsSL https://tailscale.com/install.sh | sh这会自动下载并安装 …

基于python+Django+mysql鲜花水果销售商城网站系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育、辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

“AI人工智能内容辅助创作平台:让创意不再“卡壳”

在如今这个信息爆炸的时代&#xff0c;内容创作成了每个人的“必修课”。无论是自媒体大V、文案策划&#xff0c;还是普通学生写作文&#xff0c;大家都会遇到一个让人抓狂的问题——“创意枯竭”。有时候&#xff0c;脑袋里空空如也&#xff0c;一个字都写不出来&#xff0c;那…

【计算机网络】传输层协议TCP与UDP

传输层 传输层位于OSI七层网络模型的第四层&#xff0c;主要负责端到端通信&#xff0c;可靠性保障&#xff08;TCP&#xff09;&#xff0c;流量控制(TCP)&#xff0c;拥塞控制(TCP)&#xff0c;数据分段与分组&#xff0c;多路复用与解复用等&#xff0c;通过TCP与UDP协议实现…

基础入门-传输加密数据格式编码算法密文存储代码混淆逆向保护安全影响

知识点&#xff1a; 1、传输格式&传输数据-类型&编码&算法 2、密码存储&代码混淆-不可逆&非对称性 一、演示案例-传输格式&传输数据-类型&编码&算法 传输格式 JSON XML WebSockets HTML 二进制 自定义 WebSockets&#xff1a;聊天交互较常…

DenseNet-密集连接卷积网络

DenseNet&#xff08;Densely Connected Convolutional Network&#xff09;是近年来图像识别领域中一种创新且高效的深度卷积神经网络架构。它通过引入密集连接的设计&#xff0c;极大地提高了特征传递效率&#xff0c;减缓了梯度消失问题&#xff0c;促进了特征重用&#xff…

记一次数据库连接 bug

整个的报错如下&#xff1a; com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Metho…

Docker:基于自制openjdk8镜像 or 官方openjdk8镜像,制作tomcat镜像

一、制作openjdk8基础镜像【基于自定义alpine-3.18.0:v1 】 docker pull maven:3.5.0-jdk-8-alpine 78.56 MB https://hub.docker.com/_/maven/tagspage8&namealpine openjdk二进制下载地址 https://blog.csdn.net/fenglllle/article/details/124786948 https://adoptope…