SpringBoot集成netty实现websocket通信

实现推送消息给指定的用户

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>netty</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.87.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
    </dependencies>
</project>

二、属性文件和启动类

server:
  port: 8088

三、Controller接口

@RestController
@RequestMapping("/push")
public class TestController{
	
	@Autowired
	PushMsgService pushMsgService
}

四、PushMsgService

public interface PushMsgService{
	
	//推送给指定用户
	void pushMsgToOne(String userId,String msg);

	//推送给所有用户
	void pushMsgToAll(String msg);
}
@Service
public class PushMsgServiceImpl implements PushMsgService{
	
	@Override
	public void pushMsgToOne(String userId, String msg){
		Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
	}

	@Override
	public void pushMsgToAll(String msg){
		NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
	}
}

五、NettyConfig

public class NettyConfig{
	
	//定义全局channel,管理所有的channel
	private static volatile ChannelGroup channelGroup = null;

	//存放请求ID与channel的对应关系
	private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

	//定义两把锁
	private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

	public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

六、netty server

@Component
public class NettyServer {
    static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:8889}")
    int port;

    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;

    @Autowired
    ProjectInitializer nettyInitializer;

    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            // 设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 设置管道
            bootstrap.childHandler(nettyInitializer);

            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

七、ProjectInitializer初始化,设置websocket handler

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * webSocket协议名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

八、WebSocketHandler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

测试:

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{‘uid’:‘sss’}给服务端
在这里插入图片描述
打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sss
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/446537.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

加密流量分类torch实践4:TrafficClassificationPandemonium项目更新

加密流量分类torch实践4&#xff1a;TrafficClassificationPandemonium项目更新 更新日志 代码已经推送开源至露露云的github&#xff0c;如果能帮助你&#xff0c;就给鼠鼠点一个star吧&#xff01;&#xff01;&#xff01; 3/10号更新 流量预处理更新 增加了基于splitCa…

加快代码审查的 7 个最佳实践

目录 前言 1-保持小的拉取请求 2-使用拉取请求模板 3-实施响应时间 SLA 4-培训初级和中级工程师 5-设置持续集成管道 6-使用拉取请求审查应用程序 7-生成图表以可视化您的代码更改 前言 代码审查可能会很痛苦软件工程师经常抱怨审查过程缓慢&#xff0c;延迟下游任务&…

Spring boot2.7整合jetcache 本地linkedhashmap缓存方案

好 上文 Spring boot2.7整合jetcache 远程redis缓存方案 我们讲完了 远程实现方案 本文 我们来说说 本地 jetcache解决方案 首先是 application.yml 在jetcache下加上 local:default:type: linkedhashmapkeyConvertor: fastjson我们技术用的 本地缓存 linkedhashmap 这里 我们…

conda 设置国内源 windows+linux

默认的conda源连接不好&#xff0c;时好时坏&#xff0c;而且速度很慢&#xff0c;可以使用国内的源 如果没有安装conda&#xff0c;可以参考&#xff1a; miniconda安装&#xff1a;链接 anaconda安装winlinux&#xff1a;链接 windows使用命令提示符&#xff0c;linux使用…

【C语言】glibc

一、获取源码 apt install glibc-source 在Debian系统中&#xff0c;通过apt install glibc-source命令安装的glibc源码通常会被放置在/usr/src/glibc目录下。安装完成后&#xff0c;可能需要解压缩该源码包。以下是解压缩源码包的步骤&#xff1a; 1. 打开终端。 2. 切换到源…

2024年【P气瓶充装】考试报名及P气瓶充装复审考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 P气瓶充装考试报名是安全生产模拟考试一点通总题库中生成的一套P气瓶充装复审考试&#xff0c;安全生产模拟考试一点通上P气瓶充装作业手机同步练习。2024年【P气瓶充装】考试报名及P气瓶充装复审考试 1、【多选题】《…

docker学习笔记——Dockerfile

Dockerfile是一个镜像描述文件&#xff0c;通过Dockerfile文件可以构建一个属于自己的镜像。 如何通过Dockerfile构建自己的镜像&#xff1a; 在指定位置创建一个Dockerfile文件&#xff0c;在文件中编写Dockerfile相关语法。 构建镜像&#xff0c;docker build -t aa:1.0 .(指…

esp32 idf.py cmd powershell 环境

esp32 idf.py cmd powershell 命令行 环境 win10 推荐使用 Windows Terminal 替换自己路径 设置–>添加新配置文件–>选择cmd 或者 powershell -->保存–> 去修改命令行 启动目录&#xff0c;推荐使用父进程目录 powershell C:\WINDOWS/System32/WindowsPowe…

【STA】SRAM / DDR SDRAM 接口时序约束学习记录

1. SRAM接口 相比于DDR SDRAM&#xff0c;SRAM接口数据与控制信号共享同一时钟。在用户逻辑&#xff08;这里记作DUA&#xff08;Design Under Analysis&#xff09;&#xff09;将数据写到SRAM中去的写周期中&#xff0c;数据和地址从DUA传送到SRAM中&#xff0c;并都在有效时…

关于查看 CentOS7虚拟机的 ip地址

1. 启动网卡 1.1 打开网卡配置文件。 vi /etc/sysconfig/network-scripts/ifcfg-eth01.2 启动网卡 修改为下图中的ONBOOTyes 2. 重启网络服务 sudo service network restart3. 查看ip地址 ip addr

NBlog整合OSS图库

NBlog部署维护流程记录&#xff08;持续更新&#xff09;&#xff1a;https://blog.csdn.net/qq_43349112/article/details/136129806 由于项目是fork的&#xff0c;所以我本身并不清楚哪里使用了图床&#xff0c;因此下面就是我熟悉项目期间边做边调整的。 目前已经调整的功能…

http协议分析

目录 一、实验目的 二、实验环境 三、实验步骤 四、实验数据记录和结果分析 五、实验体会、质疑和建议 一、实验目的 通过在真实网络环境访问HTTP服务器上网的过程中,捕获HTTP数据报文,分析报文的内容,掌握HTTP报文的构成,理解HTTP协议的工作过程&#xff0c; 二、实验环境…

02-在 ESP-IDF 项目中添加 .c 和 .h 文件的

在 ESP-IDF 项目中添加 .c 和 .h ESP-IDF&#xff08;Espressif IoT Development Framework&#xff09;是一个用于开发基于 ESP32 和 ESP8266 微控制器的嵌入式应用程序的框架。在 ESP-IDF 项目中添加新的 .c 和 .h 文件是很常见的&#xff0c;但要确保这些文件能够正确地被编…

C++类和对象(下篇)

目录 一.再谈构造函数 二.static成员 三.友元 四.内部类 五. 再次理解类和对象 一.再谈构造函数 1.构造函数体赋值 在创建对象时&#xff0c;编译器通过调用构造函数&#xff0c;给对象中各个成员变量一个合适的初始值。 class Date { public:Date(int year, int month…

js 获取浏览器相关的宽高尺寸

window 屏幕 屏幕分辨率的高&#xff1a; window.screen.height 屏幕分辨率的宽&#xff1a; window.screen.width 屏幕可用工作区高度&#xff1a; window.screen.availHeight 屏幕可用工作区宽度&#xff1a; window.screen.availWidth document 网页 网页可见区域宽&#xf…

JavaEE+springboot教学仪器设备管理系统o9b00-springmvc

本文旨在设计一款基于Java技术的教学仪器设备销售网站&#xff0c;以提高网站性能、功能完善、用户体验等方面的优势&#xff0c;解决现有教学仪器设备销售网站的问题&#xff0c;并为广大教育工作者和学生提供便捷的教学仪器设备销售渠道。本文首先介绍了Java技术的相关基础知…

三维不同坐标系下点位姿态旋转平移变换

文章目录 前言正文计算方法思路Python实现总结前言 本文主要说明以下几种场景3D变换的应用: 3D相机坐标系下长方体物体,有本身坐标系,沿该物体长边方向移动一段距离,并绕长边轴正旋转方向转90度,求解当前物体中心点在相机坐标系下的位置和姿态多关节机器人末端沿工具坐标…

Redis基础篇:初识Redis(认识NoSQL,单机安装Redis,配置Redis自启动,Redis客户端的基本使用)

目录 1.认识NoSQL2.认识Redis3.安装Redis1.单机安装Redis2.配置redis后台启动3.设置redis开机自启 4.Redis客户端1.Redis命令行客户端2.图形化桌面客户端 1.认识NoSQL NoSQL&#xff08;Not Only SQL&#xff09;数据库是一种非关系型数据库&#xff0c;它不使用传统的关系型数…

我们的一生都是在挤火车。

哈喽&#xff0c;你好啊&#xff0c;我是雷工&#xff01; 昨天从燕郊坐火车回石家庄&#xff0c;由于赶上元旦假期&#xff0c;所有高铁票都售罄&#xff0c;一张普通火车票&#xff0c;还是一周前就买才买到的。 从燕郊站&#xff0c;到北京站&#xff0c;然后地铁去北京西站…

C语言动态内存管理面(下)常⻅的动态内存的错误

我们接着C语言动态内存管理&#xff08;上&#xff09;没讲完整的继续来深度讲解。、 4. 常⻅的动态内存的错误 4.1 对NULL指针的解引⽤操作 主要的原因还是自己的粗心大意没有对malloc的值进行判断 void test() { int *p (int *)malloc(INT_MAX/4); *p 20; //如果p的值是…