Java Springboot SSE如何判断客户端能否正常接收消息

目录

  • 背景
  • 解决方案
    • 思路
    • 代码
    • 代码解释
  • Java反射知识点补充

背景

当新建一个 emitter 对象的时候, 它的默认超时时间是 30s.

SseEmitter emitter = new SseEmitter(); 

但是很多情况下, 默认30s的时间太短, 需要把 emitter 对象的超时时间设置成不超时, 也就是永久有效.

private static long DEFAULT_TIMEOUT = 0L;

......

SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT); 

这样也会带来一个问题, 就是永久有效的 emitter 对象如果没有调用关闭连接的接口的话 (比如用户直接关闭浏览器了) , 这个 emitter 对象就会一直存在.

解决方案

思路

sseEmitter 有下面的几个属性:

在这里插入图片描述

注意一下 sendFailed 这个属性, 我们可以利用这个属性来判断客户端能否正常接到消息.

当客户端无法接受消息时,SseEmitter对象在send一次之后sendFailed状态会变为True,这时候就可以剔除。同时在订阅时用此判断可以减少重复创建的机会

还有一个 complete 属性, 这个属性是与 sendFailed 有关的, 也就是消息发送成功的时候 complete 为 true, 失败的时候 complete 为 false. 我们可以用这个属性当做一个辅助.

请添加图片描述

拿到客户端是否能够正常接收消息这个状态以后, 我们就可以建立一个定时器,固定时间发送消息用来检测客户端是否离线.

代码

package com.example.demo.utils;

import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class SSEUtils {
    public static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();

    /***
     * 添加订阅
     * @param id 客户id
     * @return
     */
    public static SseEmitter addSubscribe(String id) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            sseEmitter = new SseEmitter(0L); // 永久有效
            sseEmitter.onTimeout(() -> {
                subscribeMap.remove(id);
            });
            sseEmitter.onError(throwable -> {
                subscribeMap.remove(id);
            });
            SseEmitter finalSseEmitter = sseEmitter;
            sseEmitter.onCompletion(() -> {
                subscribeMap.put(id, finalSseEmitter);
            });
        }
        return sseEmitter;
    }

    /***
     * 给单个用户发消息
     * @param id
     * @param msg
     * @return
     */
    public static boolean sendSingleClientMsg(String id,Object msg) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            return false;
        }
        try {
            sseEmitter.send(msg, MediaType.APPLICATION_JSON);
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }


    /***
     * 关闭订阅
     * @param id
     * @return
     */
    public static boolean closeSubscribe(String id) {
        SseEmitter sseEmitter = subscribeMap.get(id);
        if (sseEmitter == null) {
            return true;
        }
        try {
            sseEmitter.complete();
            subscribeMap.remove(id);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /***
     * 检测客户端连接状态
     * @param sseEmitter
     * @return true代表还连接, false代表失去连接
     */
    public static boolean checkSseConnectAlive(SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            return false;
        }
        // 返回true代表还连接, 返回false代表失去连接
        return !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "sendFailed") &&
                !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "complete");
    }

    public static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception e) {
            }
        }

        return null;
    }

    /***
     * 给所有客户端发消息
     * @param msg
     */
    public void sendAllClientMsg(Object msg) {
        if (subscribeMap != null && !subscribeMap.isEmpty()) {
            for (String key : subscribeMap.keySet()) {
                // 发送检测消息
                sendSingleClientMsg(key,msg);
                // 判断客户端是否能接收到消息
                boolean isAlive = checkSseConnectAlive(subscribeMap.get(key));
                if (!isAlive) {
                    // 断开连接的业务代码
                }
            }
        }
    }

    /***
     * 定时判断所有客户端状态
     */
    @Async("threadPoolTaskExecutor")
    @Scheduled(fixedDelay = 1000*60*10) // 10min
    public void checkAlive() {
        sendAllClientMsg("CHECK_ALIVE");
    }
}

使用 @Scheduled 定时器, 不要忘记在启动类上面加这两个注解:

@SpringBootApplication
@EnableAsync
@EnableScheduling
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

代码解释

重点部分是下面这段代码:

    /***
     * 检测客户端连接状态
     * @param sseEmitter
     * @return
     */
    public static boolean checkSseConnectAlive(SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            return false;
        }
        // 返回true代表还连接, 返回false代表失去连接
        return !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "sendFailed") &&
                !(Boolean) getField(sseEmitter,sseEmitter.getClass(), "complete");
    }

    public static Object getField(Object obj, Class<?> clazz, String fieldName) {
        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
            try {
                Field field;
                field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                return field.get(obj);
            } catch (Exception e) {
            }
        }

        return null;
    }

1. 循环找 SseEmitter 和它的父类中是否存在 sendFailed 这个属性, 直到找到.

这是因为 sendFailed 这个属性是私有的, 不供外部访问, 这属性还正好在父类里, 所以要循环父类.

在这里插入图片描述

在这里插入图片描述

2. 通过 getDeclaredField() 方法拿到传入的 fieldName 的属性 (也就是 "sendFailed""complete" ), 接着使用 setAccessible(true) 把这个值设置为可访问的.

3. 最后通过 field.get(obj) 拿到这个属性的值, 也就是"sendFailed""complete" 的值是 true/false

思路和代码参考: Java Springboot SSE 解决永久存活 判断客户端离线问题. 关于 SSE utils的一些工具类的方法在这个博客里面也有.

Java反射知识点补充

Java 反射是指在运行时动态地获取一个类的信息,并且可以操作它的属性、方法和构造方法等。Java 反射机制提供了一种在运行时检查、创建和操作对象的能力,这使得 Java 程序可以实现动态性和灵活性。

Java 反射机制主要包括以下三个类:

  • java.lang.Class 类:代表一个类,在运行时动态获取一个类的信息。
  • java.lang.reflect.Method 类:代表类的方法,在运行时可以使用 Method.invoke() 方法调用一个方法。
  • java.lang.reflect.Field 类:代表类的属性,在运行时可以使用 Field.get() 和 Field.set() 方法获取或设置一个属性的值。

以下是一个简单的 Java 反射示例,演示如何使用反射获取一个类的信息:

import java.lang.reflect.*;

public class MyClass {
    private String name;
    private int age;

    public MyClass(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public void sayHello() {
        System.out.println("Hello, " + name + "!");
    }

    public static void main(String[] args) throws Exception {
        // 获取 MyClass 类的 Class 对象
        Class<?> myClass = MyClass.class;

        // 创建一个 MyClass 对象
        MyClass obj = new MyClass("Bob", 20);

        // 获取 MyClass 类的构造方法,并使用它创建一个新的 MyClass 对象
        Constructor<?> constructor = myClass.getConstructor(String.class, int.class);
        MyClass newObj = (MyClass) constructor.newInstance("Alice", 30);

        // 获取 MyClass 类的属性,并使用它获取 obj 对象的 name 属性值
        Field field = myClass.getDeclaredField("name");
        field.setAccessible(true);
        String name = (String) field.get(obj);

        // 获取 MyClass 类的方法,并使用它调用 obj 对象的 sayHello 方法
        Method method = myClass.getMethod("sayHello");
        method.invoke(obj);

        System.out.println(name);         // 输出:Bob
        System.out.println(newObj.name);  // 输出:Alice
    }
}

在上述示例中,我们首先获取了 MyClass 类的 Class 对象。然后,我们创建了一个 MyClass 对象,并使用 getConstructor() 方法获取了 MyClass 类的构造方法,并使用 newInstance() 方法创建了一个新的 MyClass 对象。

接着,我们使用 getDeclaredField() 方法获取了 MyClass 类的 name 属性,并使用 setAccessible() 方法设置该属性可访问性为 true,然后使用 get() 方法获取了 obj 对象中 name 属性的值。

最后,我们使用 getMethod() 方法获取了 MyClass 类的 sayHello() 方法,并使用 invoke() 方法调用了 obj 对象的 sayHello() 方法。

需要注意的是,在使用反射机制时,应该尽量避免使用硬编码的字符串来表示类名、方法名和属性名等信息,这样会使代码更加灵活和可维护。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/333920.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Proxifier海外動態IP代理工具使用教程

Proxifier是一款多平臺代理客戶端&#xff0c;能讓不支持代理伺服器的程式正常運行。它支持各種操作系統和代理協議&#xff0c;並允許自定義端口和應用程式代理設置。用戶可以將其與代理伺服器集成&#xff0c;從而最大程度釋放性能效果。 本文將對其進行全面的概述&#xff…

Docker 安装 MySQ

Docker 安装 MySQL MySQL 是世界上最受欢迎的开源数据库。凭借其可靠性、易用性和性能&#xff0c;MySQL 已成为 Web 应用程序的数据库优先选择。 1、查看可用的 MySQL 版本 访问 MySQL 镜像库地址&#xff1a;https://hub.docker.com/_/mysql?tabtags 。 可以通过 Sort b…

上海亚商投顾:沪指冲高回落 旅游板块全天强势

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日冲高回落&#xff0c;创业板指跌近1%&#xff0c;北证50指数跌超3%。旅游、零售板块全天强势&#xf…

Red Hat Enterprise Linux 7.9 安装图解

引导和开始安装 选择倒计时结束前&#xff0c;通过键盘上下键选择下图框选项&#xff0c;启动图形化安装过程。需要注意的不同主板默认或者自行配置的固件类型不一致&#xff0c;引导界面有所不同。也就是说使用UEFI和BIOS的安装引导界面是不同的&#xff0c;如图所示。若手动调…

29、WEB攻防——通用漏洞SQL注入增删改查盲注延迟布尔报错

文章目录 盲注增删改查 盲注 概念&#xff1a;在注入过程中&#xff0c;获取的数据不能回显至前端页面&#xff0c;此时我们需要利用一些方法进行判断或尝试&#xff0c;这个过程被称为盲注。 解决&#xff1a;常规的联合查询注入不行的情况。 分类&#xff1a; 基于布尔的SQ…

【51单片机系列】proteus中的LCD12864液晶屏

文章来源&#xff1a;《单片机C语言编程与Proteus仿真技术》。 点阵字符型LCD显示模块只能显示英文字符和简单的汉字&#xff0c;要想显示较为复杂的汉字或图形&#xff0c;就必须采用点阵图型LCD显示模块&#xff0c;比如12864点阵图型LCD显示模块。 文章目录 一、 LCD12864点…

【算法】串联所有单词的子串【滑动窗口】

题目 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。例如&#xff0c;如果 words ["ab","cd","ef"]&#xff0c; 那么 "abcd…

milkv-duo cvi-mmf 硬件加速 JPG 解码性能测试

前言 本文是基于 nihui 老师的 opencv-mobile 对其支持 milkv-duo cvi-mmf 硬件加速 JPG 解码的测试。 nihui 老师原文章如下&#xff1a;opencv-mobile 现已支持 milkv-duo cvi-mmf 硬件加速 JPG 解码 opencv-mobile 仓库地址如下&#xff1a;nihui/opencv-mobile: The minim…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例4-3 textarea

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>textarea</title> </head><body> <h2>多行文本框:</h2> <!--textarea&#xff08;文本域&#xff09;cols(列) rows(行)--> …

Spring Web文件上传功能简述

文章目录 正文简单文件上传文件写入 总结 正文 在日常项目开发过程中&#xff0c;文件上传是一个非常常见的功能&#xff0c;当然正规项目都有专门的文件服务器保存上传的文件&#xff0c;实际只需要保存文件路径链接到数据库中即可&#xff0c;但在小型项目中可能没有专门的文…

微信这个费用,终于降低了

大家好&#xff0c;我是小悟 这个费用降低了&#xff0c;这对于广大小程序开发者来说无疑是一个好消息。这一举措不仅可以降低开发者的成本&#xff0c;还有助于激发更多的创新和创业激情。 对于广大小程序开发者来说&#xff0c;这也是一个福音&#xff0c;因为他们可以降低开…

Pypputeer自动化

Pyppeteer简介 pyppeteer 是 Python 语言的一个库&#xff0c;它是对 Puppeteer 的一个非官方端口&#xff0c;Puppeteer 是一个 Node 库&#xff0c;Puppeteer是Google基于Node.js开发的一个工具&#xff0c;它提供了一种高层次的 API 来通过 DevTools 协议控制 Chrome 或 Ch…

#Pytorch 使用DDP训练第一轮,验证后第二轮卡住

问题&#xff1a;在使用DDP分布式训练的时候&#xff0c;在第一轮训练后验证结果&#xff0c;在第二轮开始时就卡住了。因为设置了dist.barrier()&#xff0c;所以只有第一个GPU跑了验证&#xff0c;在第二轮时只有第一个GPU的模型在&#xff0c;其他卡的模型都被阻塞住了。 解…

Linux下使用Docker部署MinIO实现远程上传

&#x1f4d1;前言 本文主要是Linux下通过Docker部署MinIO存储服务实现远程上传的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#…

在vue中使用echarts渲染地图,geo点击某个区域可高亮,取消

一、安装echarts npm install echarts --save二、main.js引入注册 import Vue from "vue";import * as echarts from "echarts";Vue.prototype.$echarts echarts;三、vue文件中使用echarts <template><div class"page-warp"><…

mysql原理--锁

1.解决并发事务带来问题的两种基本方式 上一章唠叨了事务并发执行时可能带来的各种问题&#xff0c;并发事务访问相同记录的情况大致可以划分为3种&#xff1a; (1). 读-读 情况&#xff1a;即并发事务相继读取相同的记录。 读取操作本身不会对记录有一毛钱影响&#xff0c;并不…

聚铭入选“2023中国数字安全能力图谱(精选版)”安全运营领域

近日&#xff0c;国内权威数字安全领域第三方调研机构数世咨询正式发布《2023年度中国数字安全能力图谱&#xff08;精选版&#xff09;》。聚铭网络作为国内领先的安全运营商&#xff0c;凭借在细分领域突出优势&#xff0c;成功入选该图谱“安全运营”领域代表厂商。 据悉&a…

6.4.2转换文件

6.4.2转换文件 利用Swf2VideoConverter2可以很方便地将Flash动画(*.swf)转换为其它的视频格式。 1&#xff0e;单击“添加”按钮&#xff0c;在弹出的下拉菜单中选择“添加文件”&#xff0c;在弹出的“Open Swf Files(打开Swf文件)”窗口中选择swf文件(如&#xff1a;那些花…

使用nginx搭建网页

一、实验要求 网站需求&#xff1a; 1.基于域名www.openlab.com可以访问网站内容为 welcome to openlab!!! 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c;基于www.openlab.com/student 网站访问学生信息&#xff0c;www.openlab.com…

element中Table表格控件单选、多选功能进一步优化

目录 一、代码实现1、 父组件2、子组件&#xff08;弹框&#xff09; 二、效果图 一、代码实现 1、 父组件 <template><div><!-- 用户选择嵌套弹框 --><el-dialog :close-on-click-modal"false" :close-on-press-escape"false" tit…