基于WebSocket实现简易即时通讯功能

代码实现

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.15.0</version>
</dependency>

配置信息

部分内容非必须,按自身需求处理即可

  • WebSocketConfig
package com.example.im.config;

import com.example.im.infra.handle.ImRejectExecutionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.annotation.Resource;

/**
 * @author PC
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Resource
    private WebSocketProperties webSocketProperties;

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

    /***
     * 配置线程池
     * @return 线程池
     */
    @Bean
    public TaskExecutor taskExecutor() {
        WebSocketProperties.ExecutorProperties executorProperties = webSocketProperties.getExecutorProperties();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(executorProperties.getCorePoolSize());
        // 设置最大线程数
        executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
        // 设置队列容量
        executor.setQueueCapacity(executorProperties.getQueueCapacity());
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
        // 设置默认线程名称
        executor.setThreadNamePrefix("im-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ImRejectExecutionHandler());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}
  • WebSocketProperties
package com.example.im.config;

import com.example.im.infra.constant.ImConstants;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author PC
 */
@Configuration
@ConfigurationProperties(prefix = "cus.ws")
public class WebSocketProperties {

    /**
     * 接收人是否排除自身,默认排除
     */
    private Boolean receiverExcludesHimselfFlag = true;

    /**
     * 消息是否排除接收人信息,默认不排除
     */
    private Boolean excludeReceiverInfoFlag = false;

    /**
     * 线程池信息
     */
    private ExecutorProperties executorProperties = new ExecutorProperties();

    /**
     * 发送消息给指定人的分隔符,默认为@
     */
    private String receiverSeparator = ImConstants.Symbol.AT;

    public Boolean getReceiverExcludesHimselfFlag() {
        return receiverExcludesHimselfFlag;
    }

    public void setReceiverExcludesHimselfFlag(Boolean receiverExcludesHimselfFlag) {
        this.receiverExcludesHimselfFlag = receiverExcludesHimselfFlag;
    }

    public Boolean getExcludeReceiverInfoFlag() {
        return excludeReceiverInfoFlag;
    }

    public void setExcludeReceiverInfoFlag(Boolean excludeReceiverInfoFlag) {
        this.excludeReceiverInfoFlag = excludeReceiverInfoFlag;
    }

    public String getReceiverSeparator() {
        return receiverSeparator;
    }

    public void setReceiverSeparator(String receiverSeparator) {
        this.receiverSeparator = receiverSeparator;
    }

    public ExecutorProperties getExecutorProperties() {
        return executorProperties;
    }

    public void setExecutorProperties(ExecutorProperties executorProperties) {
        this.executorProperties = executorProperties;
    }

    /**
     * 线程池信息
     */
    public static class ExecutorProperties {
        /**
         * 核心线程数
         */
        private int corePoolSize = 10;
        /**
         * 最大线程数
         */
        private int maxPoolSize = 20;
        /**
         * 队列容量
         */
        private int queueCapacity = 50;
        /**
         * 线程活跃时间(秒)
         */
        private int keepAliveSeconds = 60;

        public int getCorePoolSize() {
            return corePoolSize;
        }

        public void setCorePoolSize(int corePoolSize) {
            this.corePoolSize = corePoolSize;
        }

        public int getMaxPoolSize() {
            return maxPoolSize;
        }

        public void setMaxPoolSize(int maxPoolSize) {
            this.maxPoolSize = maxPoolSize;
        }

        public int getQueueCapacity() {
            return queueCapacity;
        }

        public void setQueueCapacity(int queueCapacity) {
            this.queueCapacity = queueCapacity;
        }

        public int getKeepAliveSeconds() {
            return keepAliveSeconds;
        }

        public void setKeepAliveSeconds(int keepAliveSeconds) {
            this.keepAliveSeconds = keepAliveSeconds;
        }
    }
}

application.yml

server:
  port: 18080
cus:
  ws:
    exclude-receiver-info-flag: true
    receiver-excludes-himself-flag: true

ws端口

  • WebSocketEndpoint

注意:若按常规注入方式(非static修饰),在项目启动时setWebSocketMessageService是有值的,但是发送消息时WebSocketMessageService会变为null,需要用static修饰。

其原因为Spring的bean管理是单例的,但是WebSocket是多对象的,当新用户进入系统时,会创建一个新的WebSocketEndpoint对象,但是不会再注入WebSocketMessageService,这样就会导致其为null。若想解决该问题,可以使用static修饰WebSocketMessageService,static修饰的对象属于类,而非实例,其在类加载时即可进行初始化。

package com.example.im.endpoint;

import com.example.im.app.service.WebSocketMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author PC
 */
@Component
@ServerEndpoint("/ws")
public class WebSocketEndpoint {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);

    public static final ConcurrentHashMap<String, WebSocketEndpoint> WEB_SOCKET_ENDPOINT_MAP = new ConcurrentHashMap<>();

    private Session session;
    private static WebSocketMessageService webSocketMessageService;

    @Autowired
    public void setWebSocketMessageService(WebSocketMessageService webSocketMessageService) {
        WebSocketEndpoint.webSocketMessageService = webSocketMessageService;
    }

    /**
     * 打开ws连接
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        //连接成功
        logger.info("The connection is successful:" + getUserName(session));
        this.session = session;
        WEB_SOCKET_ENDPOINT_MAP.put(getUserName(session), this);
    }

    /**
     * 断开ws连接
     *
     * @param session 会话
     */
    @OnClose
    public void onClose(Session session) {
        WEB_SOCKET_ENDPOINT_MAP.remove(getUserName(session));
        //断开连接
        logger.info("Disconnect:" + getUserName(session));
    }

    /**
     * 接收到的消息
     *
     * @param message 消息内容
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        //接收消息
        String sendUserName = getUserName(session);
        logger.info(sendUserName + " send message: " + message);
        webSocketMessageService.sendMessage(sendUserName, message);
    }

    private String getUserName(Session session) {
        return Optional.ofNullable(session.getRequestParameterMap().get("userName")).orElse(new ArrayList<>())
                .stream().findFirst().orElse("anonymous_users");
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

实现类

WebSocketMessageServiceImpl

package com.example.im.app.service.impl;

import com.example.im.app.service.WebSocketMessageService;
import com.example.im.config.WebSocketProperties;
import com.example.im.endpoint.WebSocketEndpoint;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author PC
 */
@Service
public class WebSocketMessageServiceImpl implements WebSocketMessageService {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketMessageServiceImpl.class);

    private WebSocketProperties webSocketProperties;

    @Autowired
    public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
        this.webSocketProperties = webSocketProperties;
    }

    private TaskExecutor taskExecutor;

    @Autowired
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void sendMessage(String sendUserName, String message) {
        //包含@发给指定人,否则发给全部人
        if (StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
            this.sendToUser(sendUserName, message);
        } else {
            this.sendToAll(sendUserName, message);
        }
    }

    private void sendToUser(String sendUserName, String message) {
        getReceiverName(sendUserName, message).forEach(receiverName -> taskExecutor.execute(() -> {
                            try {
                                if (WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.containsKey(receiverName)) {
                                    WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverName).getSession().getBasicRemote()
                                            .sendText(generatorMessage(message));
                                }
                            } catch (IOException ioException) {
                                logger.error("send error:" + ioException);
                            }
                        }
                )
        );
    }

    private void sendToAll(String sendUserName, String message) {
        for (Map.Entry<String, WebSocketEndpoint> webSocketEndpointEntry : WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.entrySet()) {
            taskExecutor.execute(() -> {
                        if (webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, webSocketEndpointEntry.getKey())) {
                            return;
                        }
                        try {
                            webSocketEndpointEntry.getValue().getSession().getBasicRemote()
                                    .sendText(generatorMessage(message));
                        } catch (IOException ioException) {
                            logger.error("send error:" + ioException);
                        }
                    }
            );
        }
    }

    private List<String> getReceiverName(String sendUserName, String message) {
        if (!StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
            return new ArrayList<>();
        }
        String[] names = StringUtils.split(message, webSocketProperties.getReceiverSeparator());
        return Stream.of(names).skip(1).filter(receiver ->
                        !(webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, receiver)))
                .collect(Collectors.toList());
    }

    /**
     * 根据配置处理发送的信息
     *
     * @param message 原消息
     * @return 被处理后的消息
     */
    private String generatorMessage(String message) {
        return BooleanUtils.isTrue(webSocketProperties.getExcludeReceiverInfoFlag()) ?
                StringUtils.substringBefore(message, webSocketProperties.getReceiverSeparator()) : message;
    }
}

测试

Postman访问WebSocket

点击new,新建WebSocket连接

创建ws连接

连接格式:ws://ip:port/endpoint

例如,本次实例demo的ws连接如下,userName为自定义参数,测试使用,非必须,根据自身需求调整即可

ws://127.0.0.1:18080/ws?userName=test1

点击Connect进行连接

为了方便测试,再创建三个ws连接,也进行Connect

ws://127.0.0.1:18080/ws?userName=test2

ws://127.0.0.1:18080/ws?userName=test3

ws://127.0.0.1:18080/ws?userName=test4

测试

连接后,在test1所在页面发送消息

  • 首先测试@用户的情况

test2、test3可接收消息,test4无消息

  • 而后测试发送给所有人的情况

test2、test3、test4均接收到消息

参考资料

[1].即时通讯demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/890906.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SQL 干货 | 使用 Having 子句筛选聚合字段

如果你编写 SQL 查询已有一段时间&#xff0c;那么你可能对 WHERE 子句非常熟悉。虽然它对聚合字段没有影响&#xff0c;但有一种方法可以根据聚合值过滤记录&#xff0c;那就是使用 HAVING 子句。本博客将介绍它的工作原理&#xff0c;并提供几个在 SELECT 查询中使用它的示例…

Redis-缓存一致性

缓存双写一致性 更新策略探讨 面试题 缓存设计要求 缓存分类&#xff1a; 只读缓存&#xff1a;&#xff08;脚本批量写入&#xff0c;canal 等&#xff09;读写缓存 同步直写&#xff1a;vip数据等即时数据异步缓写&#xff1a;允许延时&#xff08;仓库&#xff0c;物流&a…

C语言练习

题目&#xff1a; 1.编写一段C语言&#xff0c;向下边这样输入2个整数&#xff0c;如果他们的差值小于等于10&#xff0c;则显示“它们的差值小于等于10”&#xff0c;否则显示“它们的差大于等于11”. 请输入两个整数&#xff1a; 整数A&#xff1a;12 整数B&#xff1a;7…

SQL分类中的DDL

DDL&#xff08;Data Definition Language):数据定义语言&#xff0c;用来定义数据库对象&#xff08;数据库&#xff0c;表&#xff0c;字段&#xff09;。 一、DDL语句操作数据库 1、查询所有数据库&#xff1a;show databases&#xff1b;&#xff08;一般用大写&#xff…

spring |Spring Security安全框架 —— 认证流程实现

文章目录 开头简介环境搭建入门使用1、认证1、实体类2、Controller层3、Service层3.1、接口3.2、实现类3.3、实现类&#xff1a;UserDetailsServiceImpl 4、Mapper层3、自定义token认证filter 注意事项小结 开头 Spring Security 官方网址&#xff1a;Spring Security官网 开…

React路由 基本使用 嵌套路由 动态路由 获取路由参数 异步路由 根据配置文件来生成路由

文章目录 React-router的三个版本react-router使用嵌套路由动态路由 获取路由参数Params参数Query参数Location信息 控制跳转地址异步路由根据配置文件生成路由 React-router的三个版本 React-router 服务端渲染使用React-router-dom 浏览器端渲染使用React-router-native Rea…

API项目3:API签名认证

问题引入 我们为开发者提供了接口&#xff0c;却对调用者一无所知 假设我们的服务器只能允许 100 个人同时调用接口。如果有攻击者疯狂地请求这个接口&#xff0c;那是很危险的。一方面这可能会损害安全性&#xff0c;另一方面耗尽服务器性能&#xff0c;影响正常用户的使用。…

若依前后端分离版本el-select下拉框字典如何设置默认值。

在若依前后端分离框架中&#xff0c;如何给下拉框设置默认值&#xff0c;刚入门的小伙伴&#xff0c;可能会不知道如何去做。 本章教程&#xff0c;主要以用户管理模块中的添加用户举例说明如何设置用户性别默认值为男。 解决思路 首先&#xff0c;我们需要找到打开新增页面的方…

【工具】前端js数字金额转中文大写金额

【工具】前端js数字金额转中文大写金额 代码 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>金额转…

多功能点击器(文末附Gitee源码)——光遇自动弹奏

之前提到的多功能点击器&#xff0c;使用场景比较多&#xff0c;之前玩光遇喜欢在里面弹琴&#xff0c;想到用这个点击器也能自动弹琴&#xff0c;跟别的自动弹琴脚本不一样&#xff0c;这个比较简单容易操作。 借这个光遇自动弹琴使用教程再讲解一下这个多功能点击头的使用方法…

ModelMapper的常见用法 ,号称是beanUtils.copyProp....的升级版??,代码复制粘贴即可复现效果,so convenient

官网案例 以下将官网案例做一个解释 1&#xff09;快速入门 递归遍历源对象的属性拷贝给目标对象 拷贝对象下对象的属性值 Data class Order {private Customer customer;private Address billingAddress; }Data class Customer {private Name name; }Data class Name {pr…

在三维可视化项目中,B/S和C/S架构该如何选择?

一、什么是B/S和C/S 在3D数据可视化中&#xff0c;有两种常见的架构模式&#xff1a;BS&#xff08;Browser/Server&#xff09;和CS&#xff08;Client/Server&#xff09; B/S模式 B/S模式是指将3D数据可视化的逻辑和处理放在服务器端&#xff0c;而在客户端使用浏览器进行…

智能汽车智能网联

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一个人的特殊竞争力&#xff0c;任何消耗你的人和事&#xff0c;多看一眼都是你的不…

rom定制系列------小米6x_MIUI14_安卓13刷机包修改写入以及功能定制 界面预览

在接待一些定制化系统中。有很多工作室或者一些特殊行业的友友需要在已有固件基础上简略修改其中的功能。方便使用。例如usb调试默认开启。usb安装设置以及usb安装与内置删减一些app的定制服务。今天给友友预览其中小米6X此款机型定制相关的一些界面与功能演示。 定制机型以及…

公司新来的00后测试开发,让我对“跨界”二字有了全新认识

最近&#xff0c;我们部门迎来了一位新面孔——一个00后的年轻人&#xff0c;阿沅。初见他时&#xff0c;我以为他只是众多新入职员工中的普通一员&#xff0c;毕竟他的专业背景与我们的IT行业似乎相去甚远——广告学。然而&#xff0c;他的到来&#xff0c;却如同一阵清风&…

IDEA中的Postfix Completion与Live Templates功能详解

目录 前言1. Postfix Completion&#xff08;后缀补全&#xff09;1.1 什么是Postfix Completion1.2 使用Postfix Completion的步骤1.3 Postfix Completion的具体应用1.4 自定义Postfix Completion 2. Live Templates&#xff08;实时模板&#xff09;2.1 什么是Live Templates…

聊聊 Facebook Audience Network 绑定收款账号的问题

大家好&#xff0c;我是牢鹅&#xff01;本篇是Facebook开发者系列的第五篇&#xff0c;最近看见好多群友在群里问这个&#xff0c;说Facebook的变现账户在绑定国内的银行账户时&#xff08;有些用户反馈就算不是国内的卡也会出现该问题&#xff09;&#xff0c;显示“无法绑定…

【WRF工具】QGis插件GIS4WRF:根据嵌套网格生成namelist.wps文件

【WRF工具】QGis插件GIS4WRF:根据嵌套网格生成namelist.wps文件 准备:WRF嵌套网格QGis根据嵌套网格生成namelist.wps文件检查:根据namelist.wps绘制模拟区域参考GIS4WRF 是一个免费且开源的 QGIS 插件,旨在帮助研究人员和从业者进行高级研究天气研究与预报(WRF)模型的建模…

利用可解释性技术增强制造质量预测模型

概述 论文地址&#xff1a;https://arxiv.org/abs/2403.18731 本研究提出了一种利用可解释性技术提高机器学习&#xff08;ML&#xff09;模型性能的方法。该方法已用于铣削质量预测&#xff0c;这一过程首先训练 ML 模型&#xff0c;然后使用可解释性技术识别不需要的特征并去…

Lucene 倒排索引

倒排索引是什么&#xff1f; 【定义】倒排索引&#xff08;Inverted Index&#xff09;是一种用于信息检索的数据结构&#xff0c;尤其适用于文本搜索。它与传统索引的主要区别在于&#xff0c;传统索引是根据文档来查找词语的位置&#xff0c;而倒排索引则是根据词语来查找文…