SpringBoot+Netty+Websocket实现消息推送

这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。

添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

定义netty端口号

websocket:
  netty:
    port: 8888
  path: /websocket

netty服务器

@Slf4j
@Component
public class NettyServer {
    /**
     * netty服务端口号
     */
    @Value("${websocket.netty.port}")
    private int port;
    /**
     * netty事件辅助组
     */
    private EventLoopGroup bossGroup;
    /**
     * netty事件工作组
     */
    private EventLoopGroup workGroup;

    /**
     * 管道配置
     */
    private final CustomChannelInitializer channelInitializer;

    public NettyServer(CustomChannelInitializer channelInitializer) {
        this.channelInitializer = channelInitializer;
    }


    /**
     * netty服务初始化
     */
    @PostConstruct
    public void start() {
        new Thread(() -> {

            bossGroup = new NioEventLoopGroup();

            workGroup = new NioEventLoopGroup();

            ServerBootstrap bootstrap = new ServerBootstrap();
            //bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            //设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            //设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            //设置管道
            bootstrap.childHandler(channelInitializer);
            try {
                ChannelFuture channelFuture = bootstrap.bind().sync();
                log.info("Netty服务启动成功,开启监听:{}", channelFuture.channel().localAddress());
                //对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("Netty服务启动失败!", e);
                throw new RuntimeException(e);
            }

        }).start();
    }

}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @version 1.0.0
 * @description 业务类
 */
public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;

    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();


    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

管道配置

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * @version 1.0.0
 * @description Netty管道配置类
 */
@Component
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {
    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
    /**
     * websocket服务地址
     */
    @Value("${websocket.path:/websocket}")
    private String websocketPath;
    private final CustomChannelHandler channelHandler;

    public CustomChannelInitializer(CustomChannelHandler channelHandler) {
        this.channelHandler = channelHandler;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(channelHandler);
    }
}

自定义CustomChannelHandler

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.utils.StringUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @version 1.0.0
 * @description Netty管道handler类
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class CustomChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        if(StringUtils.isNotEmpty(uid)){
            NettyConfig.getChannelMap().put(uid, ctx.channel());
            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        super.exceptionCaught(ctx,cause);
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        if(StringUtils.isNotEmpty(userId)){
            NettyConfig.getChannelMap().remove(userId);
        }
    }
}

推送消息接口及实现类

public interface PushMsgService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String group, String msg);

    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

实现接口

@Service
public class PushMsgServiceImpl implements PushMsgService {

    @Override
    public void pushMsgToOne(String group, String msg) {
        Channel channel = NettyConfig.getChannel(group);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }
    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

具体的controller层接口

   /**
     * 获取弹框网关状态
     */
    @GetMapping("/upKnxNetworkLink/{uid}")
    public void upKnxNetworkLink(@PathVariable String uid){
        KnxNetworkLinkInfo knxNetworkLinkInfo =new KnxNetworkLinkInfo();
        knxNetworkLinkInfo.setStatus("0");
       List<KnxNetworkLinkInfo>knxNetworkLinkInfoList=knxNetworkLinkInfoService.queryList(knxNetworkLinkInfo);
        JSONArray array= JSONArray.parseArray(JSON.toJSONString(knxNetworkLinkInfoList));
        pushMsgService.pushMsgToOne(uid,array.toJSONString());
    }

使用postman测试Websocket推送
在这里插入图片描述
连接Websocket
在这里插入图片描述
在开一个窗口测试发送消息的接口
在这里插入图片描述
发送过后在回到连接Websocket窗口
在这里插入图片描述
前端需要做一个定时访问发送消息的接口,每发一次就会往前端推送一次数据。
参考:Springboot + netty +websocket 实现推送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/236019.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SpringSecurity】-- 认证、授权

文章目录 SpringSecurity简介快速入门1.准备工作1.2引入SpringSecurity 认证1.登录校验流程2.原理2.1SpringSecurity完整流程2.2认证流程详解 3.解决问题3.1思路分析3.2准备工作3.3.实现3.3.1数据库校验用户3.3.2密码加密存储3.3.3登录接口3.3.4认证过滤器3.3.5退出登录 授权1.…

YOLOv8改进 | Neck篇 | Slim-Neck替换特征融合层实现超级涨点 (又轻量又超级涨点)

一、本文介绍 本文给大家带来的改进机制是Slim-neck提出的Neck部分&#xff0c;Slim-neck是一种设计用于优化卷积神经网络中neck部分的结构。在我们YOLOv8中&#xff0c;neck是连接主干网络&#xff08;backbone&#xff09;和头部网络&#xff08;head&#xff09;的部分&…

Tair(2):Tair安装部署

1 安装相关依赖库 yum install -y gcc gcc-c make m4 libtool boost-devel zlib-devel openssl-devel libcurl-devel yum&#xff1a;是yellowdog updater modified 的缩写&#xff0c;Linux中的包管理工具gcc&#xff1a;一开始称为GNU C Compiler&#xff0c;也就是一个C编…

本地如何使用PHP搭建简单Imagewheel云图床,结合内网穿透实现在外远程访问?

文章目录 1.前言2. Imagewheel网站搭建2.1. Imagewheel下载和安装2.2. Imagewheel网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar临时数据隧道3.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

vue3使用Mars3D写区块地图

效果图 引入相关文件 因为我也是第一次使用&#xff0c;所以我是把插件和源文件都引入了&#xff0c;能使用启动 源文件 下载地址&#xff1a; http://mars3d.cn/download.html 放入位置 在index.html中引入 <!--引入cesium基础lib--><link href"/static/C…

Stable diffusion 简介

Stable diffusion 是 CompVis、Stability AI、LAION、Runway 等公司研发的一个文生图模型&#xff0c;将 AI 图像生成提高到了全新高度&#xff0c;其效果和影响不亚于 Open AI 发布 ChatGPT。Stable diffusion 没有单独发布论文&#xff0c;而是基于 CVPR 2022 Oral —— 潜扩…

爆肝整理,Java接口自动化测试实战-rest-assured(详细总结)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、关于rest-assu…

聊个开心的敏捷话题——40小时工作制

近年来&#xff0c;加班现象在很多行业已经普遍制度化&#xff0c;甚至“996”已成为一些行业标签。企业高强度的压榨让员工不堪重负&#xff0c;且时常由此引发的各种悲剧也并不鲜见。 所以&#xff0c;今天我们一起来聊一个开心轻松的话题——极限编程的40h工作制原则。 40…

本科毕业论文查重率高吗【一文读懂】

大家好&#xff0c;今天来聊聊本科毕业论文查重率高吗&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff1a; 本科毕业论文查重率高吗&#xff1f;重要性与应对策略 摘要&#xff1a;对于本科毕业生来说&#…

PyQt6 QDateEdit日期控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计39条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…

java--正则表达式书写规则

1.正在表达式 ①就是由一些特定的字符组成&#xff0c;代表的是一个规则。 ②作用一&#xff1a;用来校验数据格式是否合法。 ③作用二&#xff1a;在一段文本中查找满足要求的内容 2.String提供了一个匹配正则表达式的方法 3.正则表达式的书写规则

Uniapp安卓原生插件开发Demo

文章目录 前言一、安装开发工具二、导入uni插件原生项目三、开发Module四、开发Component五、合并原生代码到uniapp项目中总结 前言 当HBuilderX中提供的能力无法满足App功能需求&#xff0c;需要通过使用Andorid/iOS原生开发实现时&#xff0c;可使用App离线SDK开发原生插件来…

第二十一章,网络通信

网络协议 IP协议 IP是Internet Protocol的简称&#xff0c;是一种网络协议。Internet 网络采用的协议是TCP/IP协议&#xff0c;其全称是Transmission Control Protocol/Internet Protocol。Internet 依靠TCP/IP协议&#xff0c;在全球范围内实现了不同硬件结构、不同操作系统…

排序算法之三:希尔排序

希尔排序基本思想 希尔排序法又称缩小增量法 希尔排序法的基本思想是&#xff1a;先选定一个整数&#xff0c;把待排序文件中所有记录分成个组&#xff0c;所有距离为的记录分在同一组内&#xff0c;并对每一组内的记录进行排序。然后&#xff0c;取&#xff0c;重复上述分组…

Vue3+ElementPlus:icon图标不显示(给表格字段里添加图标)

一、背景 在Vue3项目中&#xff0c;想在表格的字段中引入图标因为给字段做了触发提示&#xff0c;希望用户能够注意到这个功能&#xff0c;因此想加个图标提示一下用户&#xff0c;效果如下&#xff1a; 触发提示效果如下&#xff1a; &#xff08;样式这里就不进行优化了&am…

【VS Code开发】使用Live Server搭建MENJA小游戏并发布至公网远程访问

文章目录 前言1. 编写MENJA小游戏2. 安装cpolar内网穿透3. 配置MENJA小游戏公网访问地址4. 实现公网访问MENJA小游戏5. 固定MENJA小游戏公网地址 前言 本篇教程&#xff0c;我们将通过VS Code实现远程开发MENJA小游戏&#xff0c;并通过cpolar内网穿透发布到公网&#xff0c;分…

【nodejs升级版本】win10 nodejs版本低升级版本流程

首先 网上说的n模块不支持window系统&#xff01;&#xff01;&#xff01; window系统升级node只能到node官网下载window安装包来覆盖之前的node 升级步骤如下&#xff1a; 1&#xff0c;找到你node的安装路径&#xff0c;不知道的可以cmd命令行中输入这个命令就可以看到了…

Python:核心知识点整理大全13-笔记

目录 6.4.3 在字典中存储字典 6.5 小结 第7章 用户输入和while循环 7.1 函数 input()的工作原理 7.1.1 编写清晰的程序 7.1.2 使用 int()来获取数值输入 7.1.3 求模运算符 7.1.4 在 Python 2.7 中获取输入 7.2 while 循环简介 7.2.1 使用 while 循环 往期快速传送门…

jsonpath:使用Python处理JSON数据

使用Python处理JSON数据 25.1 JSON简介 25.1.1 什么是JSON JSON全称为JavaScript Object Notation&#xff0c;一般翻译为JS标记&#xff0c;是一种轻量级的数据交换格式。是基于ECMAScript的一个子集&#xff0c;采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清…

中国教师未来发展趋势

随着科技的进步和社会的发展&#xff0c;教师的发展趋势尤其引人关注。那么&#xff0c;教师未来的发展趋势又将是什么呢&#xff1f;” 引领未来的教育变革 在快速发展的信息化社会&#xff0c;教育行业正经历着前所未有的变革。中国教师将扮演着引领这场变革的重要角色。未来…