SpringBoot使用WebSocket收发实时离线消息

引入maven依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

 WebScoket配置处理器


import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;


/**
 * WebScoket配置处理器
 */
@Configuration
public class WebSocketConfig implements ServletContextInitializer {
	 /**
     * ServerEndpointExporter 作用
     *
     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     *
     * @return
     */
	@Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    //设置websocket发送内容长度
    @Override
    public void onStartup(ServletContext servletContext)  {
        servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");
    }
}

webScoket消息对象

import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Date;

/**
* @author: ws
* @date: 20223/10/26 15:59
* @Description: WebSocketMessage
*/
@Data
public class WebSocketMessage {

/**
* 用户ID
*/
private String fromId;

/**
* 对方ID
*/
private String toOtherId;
//消息内容
private String message;

//发送时间
@JSONField(format="yyyy-MM-dd HH:mm:ss")
public Date date;

}

WebSocket操作类

import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.ws.wxyinghang.entity.WebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author: ws
 * @date: 20223/10/26 15:59
 * @Description: WebSocket操作类
 */
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketSever {

    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String userId;


    // session集合,存放对应的session
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
    // 用于存放离线消息
    private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();

    /**
     * 建立WebSocket连接
     *
     * @param session
     * @param userId  用户ID
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        log.info("WebSocket建立连接中,连接用户ID:{}", userId);
        try {
            Session historySession = sessionPool.get(userId);
            // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
            if (historySession != null) {
                webSocketSet.remove(historySession);
                historySession.close();
            }
        } catch (IOException e) {
            log.error("重复登录异常,错误信息:" + e.getMessage(), e);
        }
        // 建立连接
        this.session = session;
        this.userId = userId;
        webSocketSet.add(this);
        sessionPool.put(userId, session);
        //从离线消息队列里面获取消息
        if (offlineMessageMap.containsKey(userId)) {
            List<WebSocketMessage> list = offlineMessageMap.get(userId);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Object x = it.next();
                //离线消息接收成功后删除消息
                Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));
                if (bb) {
                    System.out.println("从队列中删除离线消息" + x);
                    it.remove();
                }
            }
            offlineMessageMap.remove(userId);
        }
        log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());
    }

    /**
     * 发生错误
     *
     * @param throwable e
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    /**
     * 连接关闭
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        sessionPool.remove(this.userId);
        log.info("连接断开,当前在线人数为:{}", webSocketSet.size());
    }

    /**
     * 接收客户端消息
     *
     * @param message 接收的消息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("收到客户端发来的消息:{}", message);
        sendMessageByUser(message);
    }

    /**
     * 推送消息到指定用户
     *
     * @param message 发送的消息
     */
    public static Boolean sendMessageByUser(String message) {
        WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
        log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
        Session session = sessionPool.get(msg.getToOtherId());
        //判断session是否正常
        if (session == null || !session.isOpen()) {
            log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");
            if (offlineMessageMap.containsKey(msg.getToOtherId())) {
                List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());
                list.add(msg);
                offlineMessageMap.put(msg.getToOtherId(), list);
            } else {
                offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));
            }
        }//发送消息
        else {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
                return false;
            }
        }
        return true;
    }

    //发送离线消息
    public static Boolean sendOfflineMessageByUser(String message) {
        WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
        log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
        Session session = sessionPool.get(msg.getToOtherId());
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
            return false;
        }
        return true;
    }

    /**
     * 群发消息
     *
     * @param message 发送的消息
     */
    public static void sendAllMessage(String message) {
        log.info("发送消息:{}", message);
        for (WebSocketSever webSocket : webSocketSet) {
            try {
                webSocket.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("群发消息发生错误:" + e.getMessage(), e);
            }
        }
    }

}

启动项目,使用apiFox测试,新建webScoket接口

新建websocket1,连接后发送消息 

 

新建webScoket2 ,可以看到连接后接收到了消息 

 

如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/107115.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM面试知识点整理

文章目录 (一) JVM组成JVM组成部分和运行流程从图中可以看出 JVM 的主要组成部分运行流程&#xff1a;程序计数器Java堆虚拟机栈方法区堆栈的区别是什么&#xff1f; (二) 类加载器双亲委派模型类装载的执行过程 (三) 垃圾回收对象什么时候可以被垃圾回收哪些可以作为根对象 垃…

浅谈安科瑞EMS能源管控平台建设的意义-安科瑞 蒋静

摘 要&#xff1a;能源消耗量大、能源运输供给不足、环境压力日趋增加、能耗双控等一系列问题一直困扰着钢铁冶金行业&#xff0c;制约着企业快速稳定健康发展。本文介绍的安科瑞EMS能源管控平台&#xff0c;采用自动化、信息化技术&#xff0c;实现从能源数据采集、过程监控、…

Spring Boot简介

Spring Boot帮助你创建可以运行的独立的、基于Spring的生产级应用程序。 我们对Spring平台和第三方库采取了有主见的观点&#xff0c;这样你就能以最少的麻烦开始工作。 大多数Spring Boot应用程序只需要很少的Spring配置。 你可以使用Spring Boot来创建Java应用程序&#xff…

【Python3】【力扣题】202. 快乐数

【力扣题】题目描述&#xff1a; 【Python3】代码&#xff1a; 1、解题思路&#xff1a;用哈希集合检测循环。设置集合记录每次结果&#xff0c;判断结果是否为1。若计算结果已在集合中则进入循环&#xff0c;结果一定不为1。 &#xff08;1-1&#xff09;知识点&#xff1a;…

基于SSM和VUE的留守儿童信息管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

关键词搜索1688商品数据接口(标题|主图|SKU|价格|优惠价|掌柜昵称|店铺链接|店铺所在地)

1688商品列表接口是一个用于获取1688网站上商品列表信息的接口。通过该接口&#xff0c;您可以获取到1688网站上不同类别的商品列表&#xff0c;包括商品的名称、价格、图片等信息。 要使用1688商品列表接口&#xff0c;您需要按照以下步骤进行操作&#xff1a; 登录1688网站…

听力检测为什么要在标准化的隔声屏蔽系统中进行?

作者兰明&#xff0c;医学硕士&#xff0c;听力学博士&#xff0c;听觉健康门诊主任 美国国家研究委员会;;行为、认知和感官科学委员会联合出版的听力损失确定社会保障福利的资格一书中关于测试环境的要求如下&#xff1a; 行动建议4-4 测试环境 听力学评估是在受控的声学环境中…

接口返回响应,统一封装(ResponseBodyAdvice + Result)(SpringBoot)

需求 接口的返回响应&#xff0c;封装成统一的数据格式&#xff0c;再返回给前端。 依赖 对于SpringBoot项目&#xff0c;接口层基于 SpringWeb&#xff0c;也就是 SpringMVC。 <dependency><groupId>org.springframework.boot</groupId><artifactId&g…

使用WebStorm创建和配置TypeScript项目

创建 这里我用的是WebStorm 2019.2.2版本 首先&#xff0c;创建一个空项目 File -> New -> Project->Empty Project生成配置文件 自动配置&#xff1a; 打开终端输入tsc --init&#xff0c;即可自动生成tsconfig.json文件 手动配置&#xff1a; 在项目根目录下新建一…

数据结构与算法之矩阵: Leetcode 48. 旋转矩阵 (Typescript版)

旋转图像 https://leetcode.cn/problems/rotate-image/ 描述 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 1 输入&…

【Docker】Linux网桥连接多个命名空间

veth实现了点对点的虚拟连接&#xff0c;可以通过veth连接两个namespace&#xff0c;如果我们需要将3个或者多个namespace接入同一个二层网络时&#xff0c;就不能只使用veth了。 在物理网络中&#xff0c;如果需要连接多个主机&#xff0c;我们会使用bridge&#xff08;网桥&…

增强常见问题解答搜索引擎:在 Elasticsearch 中利用 KNN 的力量

在快速准确的信息检索至关重要的时代&#xff0c;开发强大的搜索引擎至关重要。 随着大型语言模型和信息检索架构&#xff08;如 RAG&#xff09;的出现&#xff0c;在现代软件系统中利用文本表示&#xff08;向量/嵌入&#xff09;和向量数据库已变得越来越流行。 在本文中&am…

javaweb+mysql的电子书查阅和下载系统

图书分类查看、热门下载、最新上传、站内数据统计。 登陆注册、图书查询、图书详情、图书下载。 身份分为管理员和用户。 源码下载地址 支持&#xff1a;远程部署/安装/调试、讲解、二次开发/修改/定制

串口占用检测工具

串口占用检测工具 平时需要检测哪个程序占用了串口&#xff0c;下面介绍一款非常方便的工具&#xff0c;它的工具箱里包含一个串口占用检测工具&#xff0c;可以非常方便的检测出来哪个程序占用了串口&#xff0c;并给出程序名和PID。 官网下载地址&#xff1a;http://www.red…

安装 tensorflow==1.15.2 遇见的问题

一、直接安装 命令&#xff1a;pip install tensorflow1.15.2 二、换 阿里云 镜像源 命令&#xff1a;pip install -i http://mirrors.aliyun.com/pypi/simple tensorflow1.15.2 三、换 豆瓣 镜像源 命令&#xff1a;pip install http://pypi.douban.com/simple tensorflow1…

UWB室内定位系统全套源码 高精度人员定位系统源码

UWB室内定位系统全套源码 高精度人员定位系统源码 UWB室内定位系统是一种高精度的室内定位技术&#xff0c;它可以实现对室内人员和物品的实时精确定位&#xff0c;具有重要的应用意义和社会价值。 UWB定位精度在厘米级内&#xff0c;其精度远远高于WIFI和蓝牙定位。精度、安全…

华为eNSP配置专题-策略路由的配置

文章目录 华为eNSP配置专题-策略路由的配置0、概要介绍1、前置环境1.1、宿主机1.2、eNSP模拟器 2、基本环境搭建2.1、终端构成和连接2.2、终端的基本配置 3、配置接入交换机上的VLAN4、配置核心交换机为网关和DHCP服务器5、配置核心交换机和出口路由器互通6、配置PC和出口路由器…

ubuntu安装nps客户端

Ubuntu安装nps客户端 1.什么是nps内网穿透&#xff1f;2.设备情况3.下载客户端3.链接服务端3.1、无配置文件模式3.2、注册到系统服务(启动启动、监控进程) 1.什么是nps内网穿透&#xff1f; nps是一款轻量级、高性能、功能强大的内网穿透代理服务器。目前支持tcp、udp流量转发…

单片机为什么一直用C语言,不用其他编程语言?

单片机为什么一直用C语言&#xff0c;不用其他编程语言&#xff1f; 51 单片机规模小得拮据&#xff0c;C 的优势几乎看不到。放个类型信息进去都费劲&#xff0c;你还想用虚函数&#xff1f;还想模板展开&#xff1f;程序轻松破 10k。最近很多小伙伴找我&#xff0c;说想要一些…

uview 1 uni-app表单 number digit 的输入框有初始化赋值后,但是校验失败

背景&#xff1a; 在onReady初始化规则 onReady() { this.$refs.uForm.setRules(this.rules); }, 同时&#xff1a;ref,model,rules,props都要配置好。 报错 当input框限定type为number&#xff0c;digit类型有初始值不做修改动作,直接提交会报错&#xff0c;验…