WebSocket解决方案(springboot 基于Redis发布订阅)

WebSocket        

        因为一般的请求都是HTTP请求(单向通信),HTTP是一个短连接(非持久化),且通信只能由客户端发起,HTTP协议做不到服务器主动向客户端推送消息。WebSocket确能很好的解决这个问题,服务端可以主动向客户端推送消息,客户端也可以主动向服务端发送消息,实现了服务端和客户端真正的平等

特点

1.全双工通信:允许服务器和客户端在同一连接上同时进行双向通信

2.持久连接:连接一旦建立,会一直保持打开状态,减少了每次连接建立和关闭的开销,使通信更加高效

3.低延迟:由于连接保持打开状态,WebSocket 通信具有较低的延迟,适用于实时性要求较高的应用

4.兼容性:代浏览器和大多数服务器支持 WebSocket

5.安全性:与其他网络通信协议一样,WebSocket 通信也需要一些安全性的考虑。可以使用加密协议(如 TLS)来保护数据在网络传输中的安全性

实战

1.添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.创建配置类

创建配置类,并将其注入到Bean容器中

@Configuration
public class WebSocketConfig {
    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
3.创建WebSocketServer类

创建WebSocketHandler类,并将其注入到Bean容器中

@ServerEndpoint("/websocket/{equipmentId}"),该注解用于配置建立WebSocket连接的路径,可以按需修改

@Component
@Slf4j
@ServerEndpoint("/websocket/{equipmentId}")
public class WebSocketHandler {
    private Session session;

    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    private static CopyOnWriteArraySet<WebSocketHandler> webSocketUtils = new CopyOnWriteArraySet<>();
    // 用来存在线连接数
    private static Map<String, Session> sessionPool = new HashMap<>();

    private static EquipmentService equipmentService = SpringContextHolder.getBean(EquipmentService.class);

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "equipmentId") String equipmentId) {
        try {
            this.session = session;
            webSocketUtils.add(this);
            sessionPool.put(equipmentId, session);
            sendOneMessage(equipmentId, "");

            equipmentService.onlineRecord(equipmentId,0);

            log.info("【websocket消息】有新的连接,总数为:" + webSocketUtils.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose(@PathParam(value = "equipmentId") String equipmentId) {
        try {
            webSocketUtils.remove(this);

            equipmentService.onlineRecord(equipmentId,1);

            log.info("【websocket消息】连接断开,总数为:" + webSocketUtils.size());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param
     */
    @OnMessage
    public void onMessage(@PathParam(value = "equipmentId") String equipmentId, String message) {

        log.info("【websocket消息】收到客户端消息:" + message);

        sendOneMessage(equipmentId, message);
    }

    /**
     * 发送错误时的处理
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {

        log.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }


    /**
     * 推消息给前端
     *
     * @param equipmentId
     * @param message
     * @return
     */
    public static Runnable sendOneMessage(String equipmentId, Object message) {
        Session session = sessionPool.get(equipmentId);
        if (session != null && session.isOpen()) {
            try {
                log.info("【推给前端消息】 :" + message);

                //高并发下,防止session占用期间,被其他线程调用
                synchronized (session) {
                    session.getBasicRemote().sendText(Objects.toString(message));
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return null;
    }

}

功能点:

1.处理异常: 与任何网络通信一样,WebSocket 连接可能会面临各种异常情况,如断开连接、网络问题等。WebSocket 服务器需要能够处理这些异常情况,进行适当的清理和处理。

2.消息处理: 一旦客户端连接成功,WebSocket 服务器需要处理客户端发送过来的消息。这可以在 WebSocket 端点中的方法上定义处理逻辑。服务器可以根据不同的业务需求处理不同类型的消息

3.WebSocket 服务器负责监听客户端的连接请求,一旦有客户端连接,服务器会创建一个 WebSocket 会话(Session)来管理这个连接。服务器需要能够维护这些连接,包括打开、关闭、保持心跳等操作。

4.WebSocket 服务器需要注册一个或多个 WebSocket 端点。每个端点对应一种处理逻辑,可以处理客户端发送过来的消息,以及向客户端发送消息。这些端点通过注解或配置来定义

因业务需求,常需要对获取的消息进行处理,websocket 不能注入( @Autowired ) service,解决办法:

private static EquipmentService equipmentService = SpringContextHolder.getBean(EquipmentService.class);

@Component
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
    private static ApplicationContext applicationContext = null;


    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        assertContextInjected();
        return applicationContext;
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        assertContextInjected();
        return (T) applicationContext.getBean(name);
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    public static <T> T getBean(Class<T> requiredType) {
        assertContextInjected();
        return applicationContext.getBean(requiredType);
    }

    /**
     * 清除SpringContextHolder中的ApplicationContext为Null.
     */
    public static void clearHolder() {
        applicationContext = null;
    }

    /**
     * 实现ApplicationContextAware接口, 注入Context到静态变量中.
     */
    @Override
    public void setApplicationContext(ApplicationContext appContext) {
        applicationContext = appContext;
    }

    /**
     * 实现DisposableBean接口, 在Context关闭时清理静态变量.
     */
    @Override
    public void destroy() throws Exception {
        SpringContextHolder.clearHolder();
    }

    /**
     * 检查ApplicationContext不为空.
     */
    private static void assertContextInjected() {
        Validate.validState(applicationContext != null, "applicaitonContext属性未注入, 请在applicationContext.xml中定义SpringContextHolder.");
    }
}
4.测试

Redis 发布/订阅

特点

        发布/订阅是一种消息通信模式,其中发送者(发布者)发布消息,多个接收者(订阅者)订阅并接收这些消息。发布者和订阅者之间没有直接联系,消息由消息中间件(如 Redis)传递。

优点

        高性能:Redis 作为内存存储,具备极高的读写性能,能够快速处理发布和订阅消息

        简单易用:Redis 的发布/订阅接口简单,易于集成和使用

        实时性强:发布的消息会立即传递给所有订阅者,具备高实时性

缺点

        消息丢失:由于 Redis 是内存存储,如果 Redis 实例宕机,未处理的消息可能会丢失
        无法持久化:Redis 的发布/订阅模式不支持消息持久化,无法存储和检索历史消息
        订阅者不可控:发布者无法控制订阅者的数量和状态,无法保证所有订阅者都能接收到消息
        无确认机制:发布者无法确认消息是否被订阅者接收和处理

        Redis 的发布订阅功能并不可靠,如果我们需要保证消息的可靠性、包括确认、重试等要求,我们还是要选择MQ实现发布订阅

运用场景

        对于消息处理可靠性要求不强

        消息无需持久化

        消费能力无需通过增加消费方进行增强

        架构简单 中小型系统不希望应用过多中间件

发布订阅命令

SpringBoot整合

1.添加依赖

<dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

2.配置redis

# application.yml

spring:

        redis:

                host: localhost

                port: 6379

3.创建redis配置类
public void sendMsg(String key,Object msg){
    redisTemplate.convertAndSend(key,msg);
}

注意:

当发布消息时,订阅着输出消息,可能会出现乱码情况:

设置实例化对象

@Bean
public RedisTemplate redisTemplateInit() {
    //设置序列化Key的实例化对象
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    //设置序列化Value的实例化对象
    redisTemplate.setValueSerializer(new StringRedisSerializer());

    return redisTemplate;
}
4.创建消息监听器
@Component
public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String messageStr = new String(message.getBody(),StandardCharsets.UTF_8);

        /**
         * 根据实际情况处理消息
         */
        List<WebsocketRes> websocketRes = JSONArray.parseArray(messageStr, WebsocketRes.class);
        String equipmentId = "";
        List<WebsocketRes> websocketResList = new ArrayList<>();

        for(WebsocketRes res : websocketRes){
            equipmentId = res.getEquipmentId();
            res.setEquipmentId(null);

            websocketResList.add(res);
        }

        Gson gson = new Gson();
        String jsonString = gson.toJson(websocketResList);

        WebSocketHandler.sendOneMessage(equipmentId,jsonString);
    }
}
5.配置消息监听容器
@Configuration
public class RedisConfig {

    @Autowired
    private RedisMessageListener redisMessageListener;

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 订阅一个或多个频道
        container.addMessageListener(listenerAdapter, new PatternTopic("my-topic"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisMessageListener redisMessageListener) {
        return new MessageListenerAdapter(redisMessageListener);
    }
}
6.发布消息
redisUtils.sendMsg("my-topic",jsonString);

websocket与发布/订阅结合

        并发过高时,websocket连接需单独部署,减缓压力;websocket将业务信息实时推送给前端,就用到了redis 发布订阅功能。

使用

socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送

1.在websocket服务中创建消息监听器(处理消息)

2.在websocket服务中创建消息监听容器

3.在业务服务中发布消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/767205.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

携手共筑爱的桥梁:引导接纳自闭症同学

在孩子的班级中&#xff0c;当自闭症儿童成为我们共同的一员时&#xff0c;作为老师和家长&#xff0c;我们肩负着特别的责任——引导孩子们以开放的心态接纳、善待并关爱他们。 首先&#xff0c;我们要以身作则&#xff0c;展现接纳与尊重。无论是老师还是家长&#xff0c;都…

【计算机网络】计算机网络的分类

计算机网络的分类 导读一、按分布范围分类1.1 广域网&#xff08;WAN&#xff09;。1.2 城域网&#xff08;MAN&#xff09;1.3 局域网&#xff08;LAN&#xff09;1.4 个人区域网&#xff08;PAN&#xff09;1.5 多处理器系统 二、按传输技术分类2.1 广播式网络2.2 点对点网络…

Ajax异步请求 axios

Ajax异步请求 axios 1 axios介绍 原生ajax请求的代码编写太过繁琐,我们可以使用axios这个库来简化操作&#xff01; 在后续学习的Vue(前端框架)中发送异步请求,使用的就是axios. 需要注意的是axios不是vue的插件,它可以独立使用. axios说明网站&#xff1a;(https://www.kancl…

【数据结构】04.双向链表

一、双向链表的结构 注意&#xff1a;这里的“带头”跟前面我们说的“头节点”是两个概念&#xff0c;带头链表里的头节点&#xff0c;实际为“哨兵位”&#xff0c;哨兵位节点不存储任何有效元素&#xff0c;只是站在这里“放哨的”。 “哨兵位”存在的意义&#xff1a;遍历循…

揭秘,PyArmor库让你的Python代码更安全

PyArmor 概述: PyArmor 是一个用于加密和保护 Python 源代码的工具,旨在防止代码被逆向工程和未经授权的使用.通过将 Python 源代码编译为加密的字节码,PyArmor 提供了一种有效的方法来保护知识产权和敏感算法. 安装 pip install pyarmor安装完成后,可以通过以下命令验证安装…

LLM端侧部署系列 | 手机上运行47B大模型?上交推理框架PowerInfer-2助力AI手机端侧部署

0. 引言 黄梅时节家家雨&#xff0c;青草池塘处处蛙。 有约不来过夜半&#xff0c;闲敲棋子落灯花。 当下&#xff0c;在移动设备上部署大型模型的趋势是愈演愈烈。Google推出了AI Core&#xff0c;使得Gemini Nano可以在智能手机上部署。此外&#xff0c;近期传闻苹果在iOS …

SQL语句(DQL)

Data Query Language&#xff08;数据查询语言&#xff09;&#xff0c;用来查询数据库中表的记录 DQL-基本查询 DQL-条件查询&#xff08;WHERE&#xff09; -- 查询姓名为2个字的员工信息 select * from emp where name like __;-- 查询身份证号最后一位是X的员工信息 selec…

uni-app打包小程序的一些趣事~

前言 Huilderx版本&#xff1a;4.15 uni-app Web端版本&#xff1a;3.4.21 问题1 Web端/APP端样式好好的&#xff0c;打包微信小程序就乱了咋整&#xff1f; 使用::v-deep/::deep/deep(){}都是没用滴~~ 原因&#xff1f; 解决&#xff1a; <script lang"ts"…

c语言回顾-数组(全网最详细,哈哈哈)

目录 前言&#xff0c;和小编一起感受数组的魅力&#xff01;&#xff01;&#xff01; 1.数组的概念 2.一维数组的创建和初始化 2.1数组创建 2.2数组的初始化 2.3数组的类型 3.一维数组的使用 3.1数组下标 3.2数组元素的输入输出 小结&#xff1a; 4.一维数组在内存…

Python中的__init__方法:为何它如此重要

目录 一、__init__方法的基本概念 1.1 定义与作用 1.2 调用时机 1.3 参数传递 二、__init__方法的工作原理 2.1 初始化属性 2.2 执行其他操作 2.3 继承与多态 三、__init__方法的使用技巧 3.1 参数传递与默认值 3.2 链式初始化 3.3 继承与超类初始化 3.4 初始化方…

常见锁策略之可重入锁VS不可重入锁

可重入锁VS不可重入锁 有一个线程,针对同一把锁,连续加锁两次,如果产生了死锁,那就是不可重入锁,如果没有产生死锁,那就是可重入锁. 死锁 我们之前引入多线程的时候不是讲了一个加数字的案例么,我们今天以它来举例 当我们这样写的时候会出现什么问题? 分析:第一个synchron…

Zookeeper:Zookeeper集群角色

文章目录 一、Leader选举二、Zookeeper集群角色 一、Leader选举 Serverid&#xff1a;服务器ID&#xff1b;比如有三台服务器&#xff0c;编号越大在选择算法中的权重越大。Zxid&#xff1a;数据ID&#xff1b;服务器中存放的最大数据ID&#xff0c;值越大说明数据越新&#x…

【创作纪念日】我的CSDN1024创作纪念

机缘 注册CSDN是很长时间了&#xff0c;但是上学时因为专业是电气工程&#xff0c;与编程打交道比较少&#xff0c;一直都是寻求帮助&#xff0c;而非内容输出。直到考研后专业改变&#xff0c;成为了主要跟软件编程、计算机知识相关的研究后&#xff0c;才逐步开启自己的CSDN…

模拟布局:为什么井、抽头和保护环至关重要

其中的关键示例是井、抽头和保护环。这些结构对于任何 MOSFET 电路的工作都至关重要。 这就是为什么了解衬底在 MOSFET 电路中的作用对于创建有效的模拟设计至关重要。要做到这一点&#xff0c;首先必须了解 MOSFET 晶体管的工作原理。 让我们来看看一种类型的 MOSFET&#x…

归并排序-MergeSort (C语言详解)

目录 前言归并排序的思想归并排序的递归法归并排序的非递归法归并排序的时间复杂度与适用场景总结 前言 好久不见, 前面我们了解到了快速排序, 那么本篇旨在介绍另外一种排序, 它和快速排序的思想雷同, 但又有区别, 这就是归并排序, 如下图, 我们对比快速排序与归并排序. 本…

编译器的控制流图分析

1&#xff0c;建立感性认识 1.1 源码 hello.c int x 10; int y 11; int main(){int z 12;for (int i 0;i < 10;i){z * x * y;}if(z>7.0)z1.0f;elsez 2.0f;return 0; }1.2 编译 2005 sudo apt-get install -y graphviz-doc libgraphviz-dev graphviz2034 ../ex_…

Java学习高级一

修饰符 static 类变量的应用场景 成员方法的分类 成员变量的执行原理 成员方法的执行原理 Java之 main 方法 类方法的常见应用场景 代码块 设计模式 单例设计模式 饿汉式单例设计模式 懒汉式单例设计模式 继承 权限修饰符

LeetCode题练习与总结:二叉树的后序遍历--145

一、题目描述 给你一棵二叉树的根节点 root &#xff0c;返回其节点值的 后序遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[3,2,1]示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[]示例 3&#xff1a; 输入&#xff1a…

以太坊DApp交易量激增83%的背后原因解析

引言 最近&#xff0c;以太坊网络上的去中心化应用程序&#xff08;DApp&#xff09;交易量激增83%&#xff0c;引发了广泛关注和讨论。尽管交易费用高达2.4美元&#xff0c;但以太坊仍在DApp交易量方面遥遥领先于其他区块链网络。本文将深入探讨导致这一现象的主要原因&#…

颅内感染性疾病患者就诊指南

颅内感染性疾病&#xff0c;即病原体侵入中枢神经系统&#xff0c;导致脑部或脑膜发生炎症的疾病。这些病原体可能是细菌、病毒、真菌或寄生虫等。颅内感染不仅会对脑组织造成损害&#xff0c;还可能引发一系列严重的并发症&#xff0c;如癫痫发作、意识障碍等 颅内感染性疾病的…