springboot整合SSE技术开发经验总结及心得

springboot整合SSE技术开发经验总结及心得

  • 一、开发背景
  • 二、快速了解SSE
    • 1、概念
    • 2、特性
  • 三、开发思路
  • 四、代码演示
    • 1、引入依赖
    • 2、服务端代码
    • 3、后端定时任务代码
  • 4、解决乱码的实体类
    • 4、前端代码
  • 五、核心代码分析

一、开发背景

公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。

二、快速了解SSE

1、概念

SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他 只能由服务端发送信息。

2、特性

  • 主动从服务端推送消息的技术
  • 本质是一个HTTP的长连接
  • 发送的是一个stream流,格式为text/event-stream

三、开发思路

要实现后端的实时推送消息,前台实时更新数据,思路如下:

  • 1、前后端需要建立连接
  • 2、后端如何做到实时推送信息呢?可以采用定时调度

四、代码演示

1、引入依赖

原则上是不需要引入的,因为springboot底层已经整合了SSE

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2、服务端代码

controller层


@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseEmitterController extends BaseController {

    @Autowired
    private SseEmitterService sseEmitterService;

    /**
     * 创建SSE连接
     *
     * @return
     */
    @GetMapping("/connect/{type}")
    public SseEmitter connect(@PathVariable("type") String type) {
        return sseEmitterService.connect(type);
    }
}

service层

public interface SseEmitterService {

    SseEmitter connect(String type);

    void volumeOverview();

    void sysOperation();

    void monitor();
    ........
}

service实现层


@Service
public class SseEmitterServiceImpl implements SseEmitterService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static Map<String, SseEmitterUTF8> sseCache = new ConcurrentHashMap<>();


    /**
     * 创建连接sse
     * @param type
     * @return
     */
    @Override
    public SseEmitter connect(String type) {

        if (sseCache.containsKey(type)){
            return sseCache.get(type);
        }
        SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
        try {
            sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!"));
        } catch (IOException e) {
            logger.error("创建连接失败 , {} " , e.getMessage());
        }
        sseEmitter.onCompletion(() -> {
            logger.info("connect onCompletion , {} 结束连接 ..." , type);
            removeUser(type);
        });
        sseEmitter.onTimeout(() -> {
            logger.info("connect onTimeout , {} 连接超时 ..." , type);
            removeUser(type);
        });
        sseEmitter.onError((throwable) -> {
            logger.error("connect onError , {} 连接异常 ..." , type);
            removeUser(type);
        });
        sseCache.put(type, sseEmitter);

        //立即推送
        volumeOverview();
        dealResp();
        monitor();
        if (type.equals(SseEmitterConstant.OVER_VIEW)){
            sysOperation();
            mileStone();
        }
        logger.info("当前用户总连接数 : {} " , sseCache.size());
        return sseEmitter;
    }

    /**
     * 交易量概览
     */
    @Override
    public void volumeOverview() {

        Map<String,Object> map = new HashMap<>();
        map.put("latest_tps",440.3);
        map.put("total_cics_trans",341656001);
        map.put("total_zjcx_trans",391656001);
        map.put("zjcx_tps",23657);
        map.put("day10",48388352);
        map.put("history",105013985);

        SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON);

        for (Map.Entry<String, SseEmitterUTF8> entry : sseCache.entrySet()) {
            SseEmitterUTF8 sseEmitter = entry.getValue();
            if (sseEmitter == null) {
                continue;
            }
            try {
                sseEmitter.send(data);
            } catch (IOException e) {
                String body = "SseEmitterServiceImpl[volumeOverview  ]";
                logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage());
                messageRepush(entry.getKey(),data,body);
            }

        }
    }
		private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){
        for (int i = 0; i < 3; i++) {
            try {
                Thread.sleep(2000);
                SseEmitterUTF8 sseEmitter = sseCache.get(type);
                if (sseEmitter == null) {
                    logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1);
                    continue;
                }
                sseEmitter.send(data);
            } catch (Exception ex) {
                logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex);
                continue;
            }
            logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1);
            return;
        }
    }

常量类

public class SseEmitterConstant {

    /**
     * 创建连接的客户端类型
     */
    public static final String OVER_VIEW = "overview";


    /**
     * even 数据类型
     */
    public static final String VOLUME_OVERVIEW = "vw";



    public SseEmitterConstant(){}
}

3、后端定时任务代码

采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。

启动类增加注解@EnableScheduling

package com.hidata;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
@EnableScheduling
public class HidataApplication {

    public static void main(String[] args)
    {
        SpringApplication.run(HidataApplication.class, args);
        System.out.println("[HiUrlShorter platform startup!]");
    }
}

创建 定时任务调度类,在该类上加上@Scheduled注解,


@Configuration
public class SendMessageTask{

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private SseEmitterService sseEmitterService;

    @Scheduled(cron = "0/40 * * * * ?}")
    public void volumeOverviewTask() {

        try {
            sseEmitterService.volumeOverview();
        } catch (Exception e) {
            logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage());
        }
    }
.......
}


4、解决乱码的实体类

如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理

package com.hidata.devops.lagrescreen.domain;

import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.nio.charset.StandardCharsets;

public class SseEmitterUTF8 extends SseEmitter {

    public SseEmitterUTF8(Long timeout) {
        super(timeout);
    }

    @Override
    protected void extendResponse(ServerHttpResponse outputMessage) {
        super.extendResponse(outputMessage);

        HttpHeaders headers = outputMessage.getHeaders();
        headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
    }
}


4、前端代码

    // 连接服务器
    var sseSource = new EventSource("http://localhost:8080/sse/connect");
    // 连接打开
    sseSource.onopen = function () {
        console.log("连接打开");
    }

    // 连接错误
    sseSource.onerror = function (err) {
        console.log("连接错误:", err);
    }
    
	//接收信息
    eventSource.addEventListener("vw", function (event) {
    console.log(event.data);
    .....
  });

五、核心代码分析

先看代码片段

SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);

分析:
后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码

SseEmitter.event().name("vw")

这样,前端就知道怎么渲染页面了,类似于这样
在这里插入图片描述
关于even()的属性,可以查看源码,

public interface SseEventBuilder {
        SseEmitter.SseEventBuilder id(String var1);

        SseEmitter.SseEventBuilder name(String var1);

        SseEmitter.SseEventBuilder reconnectTime(long var1);

        SseEmitter.SseEventBuilder comment(String var1);

        SseEmitter.SseEventBuilder data(Object var1);

        SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2);

        Set<DataWithMediaType> build();
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/141364.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用内网穿透实现U8用友ERP本地部署与异地访问

文章目录 前言1. 服务器本机安装U8并调试设置2. 用友U8借助cpolar实现企业远程办公2.1 在被控端电脑上&#xff0c;点击开始菜单栏&#xff0c;打开设置——系统2.2 找到远程桌面2.3 启用远程桌面 3. 安装cpolar内网穿透3.1 注册cpolar账号3.2 下载cpolar客户端 4. 获取远程桌面…

【C++笔记】AVL树的模拟实现

【C笔记】AVL树的模拟实现 一、AVL树的概念二、AVL树的模拟实现2.1、定义节点2.2、插入2.3、旋转2.3.1、左单旋2.3.2、右单旋2.3.3、左右双旋2.3.4、右左双旋2.3.5、插入接口的整体代码实现 三、验证AVL树3.1、验证 一、AVL树的概念 二叉搜索树虽然在一般情况下可以提高查找的…

生成式AI以及当前趋势

ChatGPT 激发了人们的想象力和好奇心。自 2022 年 11 月推出后&#xff0c;短短两个月内其月活用户便达到 1 亿&#xff0c;成为有史以来增长速度最快的消费类应用和第一个杀手级的生成式 AI 应用。随着创新节奏的加快&#xff0c;想要紧跟生成式 AI 的发展速度&#xff0c;难度…

web前端-Gulp入门

web前端-Gulp入门 gulp的概述使用gulp准备工作gulp的常用APIgulp的常用插件gulpfile.js的初体验打包css文件打包scss文件打包js打包html打包images创建一个默认任务创建一个删除任务gulp启动服务创建一个监控任务 gulp的概述 gulp&#xff1a; 前端自动化打包固件工具&#xf…

Ansible playbook详解

playbook是ansible用于配置&#xff0c;部署&#xff0c;和被管理被控节点的剧本 playbook常用的YMAL格式&#xff1a;&#xff08;文件名称以 .yml结尾&#xff09; 1、文件的第一行应该以 "---" (三个连字符)开始&#xff0c;表明YMAL文件的开始。    2、在同一…

IIC子系统测温湿度

采用stm32MP157AAA芯片&#xff0c;温度传感器 si7006 0x40 1、在内核空间不支持浮点数进行打印&#xff0c;所以需要将读取到的数据拷贝到用户空间&#xff0c;执行用户程序打印 2、在probe函数中 分步注册字符设备驱动自动创建设备节点 3、在i2c驱动代码中&#xff0c;需要自…

通用的链栈实现(C++)

template<class T> class MyStack//链栈 { private:struct StackNode{T data;StackNode* next;StackNode(const T& val T(), StackNode* p nullptr) :data(val), next(p) {}//};StackNode* top;int cursize;void clone(const MyStack& s){Clear();cursize s.c…

cgo与调用c的回调函数指针

cgo直接调用函数&#xff0c;使用基本数据类型非常简单&#xff0c;包括一些结构体也比较简单&#xff0c;嵌套的稍微复杂些&#xff0c;但也可以&#xff0c;但有的时候&#xff0c;cgo调用c函数&#xff0c;会需要传递一个回调函数的指针&#xff0c;这时候就比较复杂了&…

office365 outlook邮件无法删除

是否遇到过&#xff0c;office365邮件存储满了&#xff0c;删除邮件无法删除&#xff0c;即便用web方式登录到outlook&#xff0c;删除邮件当时是成功的&#xff0c;但一会儿就回滚回来了&#xff0c;已删除的邮件&#xff0c;你想清空&#xff0c;最后清理后还是回到原样。 请…

YTM32的循环冗余校验CRC外设模块详解

YTM32的循环冗余校验CRC外设模块详解 文章目录 YTM32的循环冗余校验CRC外设模块详解引言原理与机制CRC算法简介从CRC算法到CRC硬件外设 应用要点&#xff08;软件&#xff09;CRC16 用例CRC32 用例 总结参考文献 引言 在串行通信帧中&#xff0c;为了保证数据在传输过程中的完…

基于Python优化图片亮度与噪点

支持添加噪点类型包括&#xff1a;添加高斯噪点、添加椒盐噪点、添加波动噪点、添加泊松噪点、添加周期性噪点、添加斑点噪点、添加相位噪点&#xff0c;还提供清除噪点的功能。 我们先看一下实测效果&#xff1a;&#xff08;test.jpg为原图&#xff0c;new.jpg为添加后的图片…

自动化测试的成本高效果差,那么自动化测试的意义在哪呢?

一、自动化测试的成本高效果差&#xff0c;那么自动化测试的意义在哪呢&#xff1f; 我觉得这个问题带有很强的误导性&#xff0c;是典型的逻辑陷阱之一。“自动化测试的成本高效果差”是真的吗&#xff1f;当然不是。而且我始终相信&#xff0c;回答问题的最好方式是把问题本身…

达索系统3DEXPERIENCE WORKS 2024流体仿真功能增强

设计工作中&#xff0c;网格划分和设计验证十分重要&#xff0c;它可以方便我们把复杂组件简单化处理&#xff0c;从而提升工作效率。 轻松应对&#xff0c;精准划分 在优化设计以获得更好的空气动力学性能时&#xff0c;需要了解空气在其周围产生的流动方式。达索系统3DEXPE…

(论文阅读29/100 人体姿态估计)

29.文献阅读笔记 简介 题目 DeepCut: Joint Subset Partition and Labeling for Multi Person Pose Estimation 作者 Leonid Pishchulin, Eldar Insafutdinov, Siyu Tang, Bjoern Andres, Mykhaylo Andriluka, Peter Gehler, and Bernt Schiele, CVPR, 2016. 原文链接 h…

STM32 X-CUBE-AI:Pytorch模型部署全流程

文章目录 概要版本&#xff1a;参考资料STM32CUBEAI安装CUBEAI模型支持LSTM模型转换注意事项模型转换模型应用1 错误类型及代码2 模型创建和初始化3 获取输入输出数据变量4 获取模型前馈输出模型应用小结 小结 概要 STM32 CUBE MX扩展包&#xff1a;X-CUBE-AI部署流程&#xf…

Accelerate 0.24.0文档 一:两万字极速入门

文章目录 一、概述1.1 PyTorch DDP1.2 Accelerate 分布式训练简介1.2.1 实例化Accelerator类1.2.2 将所有训练相关 PyTorch 对象传递给 prepare()方法1.2.3 启用 accelerator.backward(loss) 1.3 Accelerate 分布式评估1.4 accelerate launch1.4.1 使用accelerate launch启动训…

k8s集群搭建(ubuntu 20.04 + k8s 1.28.3 + calico + containerd1.7.8)

环境&需求 服务器&#xff1a; 10.235.165.21 k8s-master 10.235.165.22 k8s-slave1 10.235.165.23 k8s-slave2OS版本&#xff1a; rootvms131:~# lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 20.04.5 LTS Release: …

Java自学第11课:电商项目(4)重新建立项目

经过前几节的学习&#xff0c;我们已经找到之前碰到的问题的原因了。那么下面接着做项目学习。 1 新建dynamic web project 建立时把web.xml也生成下&#xff0c;省的右面再添加。 会询问是否改为java ee环境&#xff1f;no就行&#xff0c;其实改过来也是可以的。这个不重要。…

css3 初步了解

1、css3的含义及简介 简而言之&#xff0c;css3 就是 css的最新标准&#xff0c;使用css3都要遵循这个标准&#xff0c;CSS3 已完全向后兼容&#xff0c;所以你就不必改变现有的设计&#xff0c; 2、一些比较重要的css3 模块 选择器 1、标签选择器&#xff0c;也称为元素选择…