websocket前后端长连接之java部分

一共有4个类,第一个WebSocketConfig 配置类

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Autowired
    private WebSocketInterceptor webSocketInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler, "/ws")
                .addInterceptors(webSocketInterceptor)
                .setAllowedOrigins("*");
    }
}

第二个,拦截器,这里我区分了pc和app,因为代码需求是同一个id登录的用户要在pc端和app端同时连接websocket,为做区分,在pc的userid后面加了pc两个字母.

@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class);

    @Resource
    private ISysUserService userService;
    /**
     * 握手前
     * @param request    请求对象
     * @param response   响应对象
     * @param wsHandler  请求处理器
     * @param attributes 属性域
     * @return true放行,false拒绝
     * @throws Exception 可能抛出的异常
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        // 获得请求参数
        Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.defaultCharset());
        String userId = paramMap.get("userId");
        if (CharSequenceUtil.isNotBlank(userId)) {
            if (userId.endsWith("pc")){
//                String substring = userId.substring(0, userId.length() - 2);
//                // 校验连接人在系统是否存在
//                SysUser user = userService.selectUserById(Long.valueOf(substring));
//                if (user == null) {
//                    response.setStatusCode(HttpStatus.UNAUTHORIZED);
//                    return false;
//                }
            }else {
                // 校验连接人在系统是否存在
                SysUser user = userService.selectUserById(Long.valueOf(userId));
                if (user == null) {
                    response.setStatusCode(HttpStatus.UNAUTHORIZED);
                    return false;
                }
            }
            // 放入属性域
            attributes.put("userId", userId);
            logger.info("用户:{}握手成功!", userId);
            return true;
        } else {
            logger.info("接受到一个websocket连接请求但是没有参数!");
        }
        return false;
    }

    /**
     * 握手后
     *
     * @param request   请求独享
     * @param response  响应对象
     * @param wsHandler 处理器
     * @param exception 抛出的异常
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, org.springframework.web.socket.WebSocketHandler wsHandler, Exception exception) {
        logger.info("握手结束!");
    }
}

第三个是管理器,其中的add方法,本身是有一个判重机制,如果该连接已存在就把原来的踢下线,重新连接新的,防止出现多个同样的id的问题.但是这又导致了新的频繁关闭重连的问题,所以后来改成了如果已经存在就直接return

@Slf4j
public class WsSessionManager {
    private WsSessionManager() {
    }

    private static final Logger logger = LoggerFactory.getLogger(WsSessionManager.class);

    /**
     * 记录当前在线连接数
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * 保存连接 session 的地方
     */
    private static final ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>(99999);

    /**
     * 添加 session
     *
     * @param key     键
     * @param session 值
     */
    public static synchronized void add(String key, WebSocketSession session) {
        WebSocketSession existingSession = SESSION_POOL.get(key);
        if (existingSession != null) {
            if (existingSession.equals(session)) {
                logger.info("用户 {} 的 WebSocket 已存在,无需重复添加", key);
                return;
            }
//            if (existingSession.isOpen()) {
//                try {
//                    existingSession.close();
//                    logger.info("关闭旧的连接, userId: {}", key);
//                } catch (IOException e) {
//                    logger.error("关闭旧的连接时出现异常, userId: {}, 异常: {}", key, e.getMessage());
//                }
//            }
            if (existingSession.isOpen()) return;

        }
        SESSION_POOL.put(key, session);
        onlineCount.incrementAndGet();
        logger.info("新连接已添加, userId: {}, 当前在线人数: {}", key, getOnlineCount());
    }


    /**
     * 删除 session, 会返回删除的 session
     *
     * @param key 键
     * @return 值
     */
    public static synchronized WebSocketSession remove(String key) {
        WebSocketSession session = SESSION_POOL.remove(key);
        if (session != null) {
            onlineCount.decrementAndGet();
            logger.info("连接已移除, userId: {}, 当前在线人数: {}", key, getOnlineCount());
        }
        return session;
    }

    /**
     * 删除并同步关闭连接
     *
     * @param key 键
     */
    public static synchronized void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                session.close();
                logger.warn("关闭WebSocket会话, userId: {}", key);
            } catch (IOException e) {
                logger.error("关闭会话时出现异常, userId: {}, 异常: {}, {}", key, e.getMessage(), e);
            }
        }
    }

    /**
     * 获得 session
     *
     * @param key 键
     * @return 值
     */
    public static WebSocketSession get(String key) {
        return SESSION_POOL.get(key);
    }

    /**
     * 获取当前在线连接数
     *
     * @return 在线连接数
     */
    public static int getOnlineCount() {
        return onlineCount.get();
    }

    /**
     * 获得 Map
     *
     * @return 值
     */
    public static ConcurrentMap<String, WebSocketSession> getMap() {
        return SESSION_POOL;
    }
}

第四个是真正发送消息的处理器

@Component
public class WebSocketHandler extends TextWebSocketHandler {
    private final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);

    private static final String KEY = "userId";


    /**
     * socket 建立成功事件
     * @param session session对象
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        Object userId = session.getAttributes().get(KEY);
        if (userId != null) {
            // 将用户的连接放入 WsSessionManager,会自动关闭之前的旧连接
            WsSessionManager.add(userId.toString(), session);
            logger.info("用户连接成功, userId: {}", userId);
        } else {
            logger.warn("未能在连接中找到 userId 属性");
        }
        logger.info("建立连接了, 当前在线人数: {}, session: {}, 当前map: {}", WsSessionManager.getOnlineCount(), session, WsSessionManager.getMap());
    }


    /**
     * 接收消息事件
     *
     * @param session session对象
     * @param message 接收到的消息
     * @throws Exception 可能抛出的异常
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 获得客户端传来的消息
//        String payload = message.getPayload();
        logger.info("收到ws消息: {}", message);
        // 返回一条确认消息给发消息的用户
        TextMessage responseMessage = new TextMessage("pong");
        session.sendMessage(responseMessage);
    }

    /**
     * socket 断开连接时
     *
     * @param session session对象
     * @param status  断开状态
     * @throws Exception 可能抛出的异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        logger.info("断开连接了,session为{}", session == null ? "" : session);
        Object token = session.getAttributes().get(KEY);
        if (token != null) {
            // 用户退出,移除缓存
            WsSessionManager.removeAndClose(token.toString());
        }
    }

    /**
     * 发送消息给指定设备
     *
     * @param serialNumber 序列号
     * @param message      消息内容
     * @param type         1跳通知 2跳客户 3手机打电话 前端用  8 pc用未读消息数量   9 当前app是否在线,true或者false
     * @param noticeId     通知id,已读用
     */
    public void sendMessage(String serialNumber, String message, Integer type, Long noticeId) {
        WebSocketSession webSocketSession = WsSessionManager.get(serialNumber);
        try {
            if (webSocketSession != null && webSocketSession.isOpen()) {
                JSONObject jsonObject;
                jsonObject = JSONObject.of("type", type, "value", message, "noticeId", noticeId);
                webSocketSession.sendMessage(new TextMessage(jsonObject.toString()));
                logger.info("发送消息给{},消息内容为{}", serialNumber, message);
            }
        } catch (Exception e) {
            logger.error("消息发送失败,设备{},失败原因{}{}", webSocketSession.getAttributes().get(KEY), e.getMessage(), e);
        }
    }

    /**
     * 发送消息给指定设备
     *
     * @param serialNumber 序列号
     * @param message      消息内容
     * @param type         1跳通知 2跳客户 3手机打电话 前端用  8 pc用未读消息数量   9 当前app是否在线,true或者false
     * @param notice     通知整个对象
     */
    public void sendMessage(String serialNumber, String message, Integer type, ClientNoticeDO notice, Integer other) {
        WebSocketSession webSocketSession = WsSessionManager.get(serialNumber);
        try {
            if (webSocketSession != null && webSocketSession.isOpen()) {
                JSONObject jsonObject = JSONObject.of("type", type, "value", message, "notice", notice);
                webSocketSession.sendMessage(new TextMessage(jsonObject.toString()));
                logger.info("发送消息给{},消息内容为{}", serialNumber, message);
            } else {
                logger.warn("WebSocket 会话不可用, userId: {}", serialNumber);
            }
        } catch (IOException e) {
            logger.error("WebSocket 消息发送失败, userId: {}, 原因: {}", serialNumber, e.getMessage(), e);
            WsSessionManager.remove(serialNumber); // 自动移除无效会话
        } catch (Exception e) {
            logger.error("消息发送时发生未知错误, userId: {}, 原因: {}", serialNumber, e.getMessage(), e);
        }
    }


    /**
     * 广播消息
     *
     * @param message 消息
     */
    public void sendMessageAll(String message) {
        WsSessionManager.getMap().keySet().forEach(e -> sendMessage(e, message, 2, (Long) null));
    }
}

其中的sendMessage方法根据自己的业务需求有一个重载方法,正常一个sendMessage就足够了.日志相关的酌情增减.

心跳:在handleTextMessage方法中,接收到前端任何消息都返回一个pong,前端如果一段时间未收到pong就会发起重连,以此保证连接不中断.如果业务有前端发来的其他消息则加个if判断即可.

最终使用的时候注入

@Autowired
private WebSocketHandler webSocketHandler;
  //然后调用
  webSocketHandler.sendMessage(XXX,XXX,XXX)
  //即可.

连接的地址:ws://IP:端口/?userId=1
其中/ws是在WebSocketConfig配置的,
userId是在WebSocketHandler配置的KEY

最后附上在线连接websocket测试的网站:http://www.websocket-test.com/
以及相关可以直接测试的idea插件:CoolRequest
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924876.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

微软企业邮箱:安全可靠的企业级邮件服务!

微软企业邮箱的设置步骤&#xff1f;如何注册使用烽火域名邮箱&#xff1f; 微软企业邮箱作为一款专为企业设计的邮件服务&#xff0c;不仅提供了高效便捷的通信工具&#xff0c;更在安全性、可靠性和功能性方面树立了行业标杆。烽火将深入探讨微软企业邮箱的多重优势。 微软…

【R安装】VSCODE安装及R语言环境配置

目录 VSCODE下载及安装VSCODE上配置R语言环境参考 Visual Studio Code&#xff08;简称“VSCode” &#xff09;是Microsoft在2015年4月30日Build开发者大会上正式宣布一个运行于 Mac OS X、Windows和 Linux 之上的&#xff0c;针对于编写现代Web和云应用的跨平台源代码编辑器&…

3D技术如何应用到汽车营销中?

3D线上展示技术在汽车营销中的应用&#xff0c;为传统汽车销售模式带来了革命性的变化。以下详细阐述这一技术如何被应用于汽车营销中&#xff1a; 一、提供沉浸式体验 3D全景看车&#xff1a;通过高清晰度的图像和全景展示&#xff0c;3D技术能够创造出身临其境的沉浸感&…

【小白学机器学习38】用np.random 生成各种随机数,随机数数组/序列

目录 0 总结 np.random() 的一些点 1 用np.random.random() 生成[0,1) 区间内的随机数 2 生成指定范围内的随机整数/数组 np.random.randint() 3 用np.random.choice()生成指定数组范围内的随机数 3.1 np.random.choice(array6) 3.2 np.random.choice(array6) &#xff0…

本地局域 基于ip地址生成证书

将这个脚本直接执行&#xff0c;然后输入需要绑定的ip和输入证书年限就可以了&#xff0c;然后配置nginx。 脚本&#xff1a;sslzhegnhsu.sh #! /bin/bashecho "请输入服务器IP地址" read ip echo "请输入证书年限" read yearyear$(expr $year 0)days…

影响因子和期刊质量之间有什么关系?

影响因子与期刊质量之间存在一定的联系&#xff0c;但这种关系并非绝对。以下是对两者关系的详细分析&#xff1a; 影响因子的定义&#xff1a;影响因子&#xff08;Impact Factor&#xff0c;简称IF&#xff09;是汤森路透出品的期刊引证报告&#xff08;Journal Citation Rep…

Js-对象-04-Array

重点关注&#xff1a;Array String JSON BOM DOM Array Array对象时用来定义数组的。常用语法格式有如下2种&#xff1a; 方式1&#xff1a; var 变量名 new Array(元素列表); 例如&#xff1a; var arr new Array(1,2,3,4); //1,2,3,4 是存储在数组中的数据&#xff0…

《Unity Shader 入门精要》高级纹理

立方体纹理 图形学中&#xff0c;立方体纹理&#xff08;Cubemap&#xff09;是环境映射&#xff08;Environment Mapping&#xff09;的一种实现方法。环境映射可以模拟物体周围的环境&#xff0c;而使用了环境映射的物体可以看起来像镀了层金属一样反射出周围的环境。 对立…

算法编程题-合并石头的最低成本

算法编程题-合并石头的最低成本 原题描述方法一、动态规划思路简述代码实现复杂度分析 方法二、动态规划&#xff08;状态优化&#xff09;思路简述代码实现复杂度分析 参考 摘要&#xff1a;本文将对LeetCode原题合并石头的最低成本进行介绍&#xff0c;思路基于动态规划&…

VirtualBox中设置共享文件夹(ubuntu系统)

一、在自己的电脑&#xff08;宿主机&#xff09;上新建一个文件夹 文件夹名字任意&#xff08;我的文件夹的名称为vgateshare&#xff09;&#xff0c;建完之后记住自己建的文件夹的路径&#xff0c;后续需要使用 二、在 VirtualBox进行设置 打开对应虚拟机界面&#xff0c…

【Flink】快速理解 FlinkCDC 2.0 原理

快速理解 FlinkCDC 2.0 原理 要详细理解 Flink CDC 原理可以看看这篇文章&#xff0c;讲得很详细&#xff1a;深入解析 Flink CDC 增量快照读取机制 (https://juejin.cn/post/7325370003192578075)。 FlnkCDC 2.0&#xff1a; Flink 2.x 引入了增量快照读取机制&#xff0c;…

华为悦盒【EC6108V9】通用刷机固件及详细教程

固件特点&#xff1a; 高度精简&#xff0c;删除原机 IPTV 等 APP。删除在线升级功能。支持多屏互动功能。内置 U 盘自动安装程序功能&#xff0c;或用当贝市场-文件管理安装程序。 自动安装说明&#xff1a;在 U盘 根目录新建名为“YueMe_BOX”的文件夹&#xff0c;把需要安装…

算法的NPU终端移植:深入探讨与实践指南

目录 ​编辑 引言 算法选择 模型压缩 权重剪枝 量化 知识蒸馏 硬件适配 指令集适配 内存管理 并行计算 性能测试 速度测试 精度测试 功耗测试 案例分析 图像识别算法的NPU移植案例 结论 引言 在人工智能技术的浪潮中&#xff0c;神经网络处理器&#xff08;…

尚硅谷学习笔记——Java设计模式(一)设计模式七大原则

一、介绍 在软件工程中&#xff0c;设计模式&#xff08;design pattern&#xff09;是对软件设计中普遍存在&#xff08;反复出现&#xff09;的各种问题&#xff0c;提出的解决方案。我们希望我们的软件能够实现复用性、高稳定性、扩展性、维护性、代码重用性&#xff0c;所以…

CentOS7如何同时安装Java8和Java17

Java17是长期支持版本&#xff0c;升级到Java17的公司越来越多&#xff0c;特别是Spring Boot3.0最低要求Java17&#xff0c;放弃了对Java8的支持。 但是在升级的时候&#xff0c;还不能保证服务器上的所有Java进程都同步升级&#xff0c;所以系统需要同时安装Java8和Java17。 …

Python Turtle召唤童年:《哆啦A梦的奇妙世界》

Python Turtle召唤童年&#xff1a;《哆啦A梦的奇妙世界》 &#x1f438; 前言 &#x1f438;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f438; 前言 &#x1f438; 欢迎来到 《哆啦A梦的奇妙世界》&#xff0c;在这个博客里&#xff0c;我们…

Java接收LocalDateTime、LocalDatee参数

文章目录 引言I java服务端的实现1.1 基于注解规范日期格式1.2 json序列化和反序列化全局配置自动处理日期格式化II 知识扩展: 枚举的转换和序列化III 签名注意事项引言 应用场景举例:根据时间段进行分页查询数据 前后端交互日期字符串统一是yyyy-MM-dd HH:mm:ss 或者yyyy-M…

LayaBox1.8.4实现战争迷雾效果

实现思路&#xff1a; 和Unity实现思路一样&#xff0c;可看我写的下面的一篇文章 战争迷雾FogOfWar---Unity中实现-CSDN博客 根据碰撞点可以计算出需要透明的位置&#xff0c;怎样计算如下&#xff1a; 根据迷雾mesh的长宽和纵向横向的的像素数可以得出&#xff0c;每个小方…

linux安装部署mysql资料

安装虚拟机 等待检查完成 选择中文 软件选择 网络和主机名 开始安装 设置root密码 ADH-password 创建用户 等待安装完成 重启 接受许可证 Centos 7 64安装完成 安装mysql开始 Putty连接指定服务器 在 opt目录下新建download目录 将mysql文件传到该目录下 查看linux服务器的…

【计算机系统基础】程序数据与ELF数据节

目录 1. 任务描述 2. 实验阶段 2.1 反汇编获取重定位记录 2.2 分析 2.3 查看节头表&#xff0c;确定偏移量 2.4 使用hexedit工具修改指定内容 1. 任务描述 修改二进制可重定位目标文件“phase1.o”的数据&#xff08;.data&#xff09;节内容&#xff08;不允许修改其他节…