Spring boot 集成netty实现websocket通信

一、netty介绍

Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

二、工程搭建

实验目标:实现推送消息给指定的用户

pom.xml

 
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>netty</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.87.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.6.1</version>
        </dependency>
    </dependencies>
</project>

属性文件

 
 
server:
  port: 8088

netty server

 
 
package com.et.netty.server;


import com.et.netty.config.ProjectInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;


import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName NettyServer.java
 * @description: TODO
 * @createTime 2023年02月06日 16:41:00
 */
@Component
public class NettyServer {
    static final Logger log = LoggerFactory.getLogger(NettyServer.class);


    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:8889}")
    int port;


    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;


    @Autowired
    ProjectInitializer nettyInitializer;


    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
            bootstrap.group(bossGroup, workGroup);
            // 设置NIO类型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 设置监听端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 设置管道
            bootstrap.childHandler(nettyInitializer);


            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }


    /**
     * 释放资源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

ProjectInitializer

初始化,设置websocket handler

 
 
package com.et.netty.config;


import com.et.netty.handler.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName ProjectInitializer.java
 * @description: 管道配置
 * @createTime 2023年02月06日 16:43:00
 */
@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {


    /**
     * webSocket协议名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";


    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;


    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水线管理通道中的处理程序(Handler),用来处理业务
        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定义的handler,处理业务逻辑
        pipeline.addLast(webSocketHandler);
    }
}

WebSocketHandler

 
 
package com.et.netty.handler;


import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.et.netty.config.NettyConfig;
import com.et.netty.server.NettyServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName WebSocketHandler.java
 * @description: TODO
 * @createTime 2023年02月06日 16:44:00
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);


    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }


    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());


        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());


        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);


        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }


    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }


    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

NettyConfig

 
 
package com.et.netty.config;


import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;


import java.util.concurrent.ConcurrentHashMap;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName NettyConfig.java
 * @description: 管理全局Channel以及用户对应的channel(推送消息)
 * @createTime 2023年02月06日 16:43:00
 */
public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;


    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;


    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();




    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }


    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }


    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

controller

 
 
package com.et.netty.controller;


import com.et.netty.service.PushMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;




/**
 * @author dongliang7
 * @projectName
 * @ClassName TestController.java
 * @description: TODO
 * @createTime 2023年02月06日 17:48:00
 */
@RestController
@RequestMapping("/push")
public class TestController {
    @Autowired
    PushMsgService pushMsgService;


    /**
     * 推送消息到具体客户端
     * @param uid
     */
    @GetMapping("/{uid}")
    public void pushOne(@PathVariable String uid) {
        pushMsgService.pushMsgToOne(uid, "hello-------------------------");
    }


    /**
     * 推送消息到所有客户端
     */
    @GetMapping("/pushAll")
    public void pushAll() {
        pushMsgService.pushMsgToAll("hello all-------------------------");
    }
}

PushMsgService

package com.et.netty.service;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName PushMsgService.java
 * @description: 推送消息接口
 * @createTime 2023年02月06日 16:44:00
 */
public interface PushMsgService {
    /**
     * 推送给指定用户
     */
    void pushMsgToOne(String userId, String msg);


    /**
     * 推送给所有用户
     */
    void pushMsgToAll(String msg);
}

PushMsgServiceImpl

package com.et.netty.service;


import com.et.netty.config.NettyConfig;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;




import java.util.Objects;


/**
 * @author dongliang7
 * @projectName websocket-parent
 * @ClassName PushMsgServiceImpl.java
 * @description: 推送消息实现类
 * @createTime 2023年02月06日 16:45:00
 */
@Service
public class PushMsgServiceImpl implements PushMsgService {


    @Override
    public void pushMsgToOne(String userId, String msg) {
        Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未连接socket服务器");
        }


        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }


    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

文章值贴出部分关键代码,具体的详情代码参加代码仓库的netty模块

代码仓库

  • https://github.com/Harries/springboot-demo

三、测试

启动springboot工程

2024-03-08 11:21:32.975  INFO 10348 --- [       Thread-2] com.et.netty.server.NettyServer          : Server started and listen on:/0:0:0:0:0:0:0:0:8889

postman创建websocket连接 ws://127.0.0.1:8889/webSocket,并发送消息{'uid':'sss'}给服务端95ed8191ebb1c8c9f021df580ed604bf.png打开浏览器,给用户sss推送消息 http://127.0.0.1:8088/push/sssbb8db40adbcff28d239bc1495dff7c36.png 

四、引用

  • https://www.cnblogs.com/dongl961230/p/17099057.html

  • http://www.liuhaihua.cn/archives/710299.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/454347.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

力扣-[700. 二叉搜索树中的搜索]

递归法 确定递归函数的参数和返回值 递归函数的参数传入的就是根节点和要搜索的数值&#xff0c;返回的就是以这个搜索数值所在的节点。 代码如下&#xff1a; public TreeNode searchBST(TreeNode root, int val) 确定终止条件 如果root为空&#xff0c;返回null&#xff0c…

【前端】HTML常用标签

因为想当个全栈&#xff0c;所以巩固了一下HTML与CSS和JS基础&#xff0c;这一篇博客是HTML部分 文章目录 HTML 基础标签 1HTML 基础框架HTML 基础标签语义标签文本格式化标签div 与 span 标签图像标签超链接特殊字符 基础标签 2 | 表格表格的使用表格标签表格属性表格的头部与…

JavaEE:网络编程

网络编程&#xff1a;通过代码完成基于网络的跨主机通信 跨主机通信方式&#xff1a; 1.TCP/IP网络 2.蓝牙通信 3.近场通信NFC 4.毫米波通信&#xff1a;功率高&#xff0c;带宽高&#xff0c;抗干扰能力差 其中TCP/IP网络是日常编程中最常涉及到的&#xff0c;最通用的跨主机通…

蓝桥杯 2022 dp 背包

蓝桥杯 2022 dp 背包 题目链接&#xff1a; https://www.lanqiao.cn/problems/2186/learning/?subject_code1&group_code4&match_num13&match_flow2&origincup 题目&#xff1a; 代码&#xff1a; #include<bits/stdc.h> using namespace std;#defi…

代码随想录算法训练营第七天| 454.四数相加II、383.赎金信、15.三数之和、18.四数之和

系列文章目录 目录 系列文章目录454.四数相加II使用HashMap法 383.赎金信哈希解法&#xff08;数组&#xff09; 15.三数之和双指针法 18.四数之和双指针法 454.四数相加II 题解&#xff1a;该题和1.两数之和的方法是一样的&#xff0c;这个题的难点在于key和value分别是什么。…

网络建设与运维培训介绍和能力介绍

1.开过的发票 3.培训获奖的证书 4合同签署 5.实训设备

垃圾回收器介绍

java堆内存结构包括&#xff1a;新生代和老年代&#xff0c;其中新生代由一个伊甸区和2个幸存区组成&#xff0c;2个幸存区是大小相同&#xff0c;完全对称的&#xff0c;没有任何差别。我们把它们称为S0区和S1区&#xff0c;也可以称为from区和to区。 JVM的垃圾回收主要是针对…

Spring具体拓展点:后置处理器

一图胜千言 mermaid示例图&#xff1a; #mermaid-svg-YEqFb5JcEk5FWkwO {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-YEqFb5JcEk5FWkwO .error-icon{fill:#552222;}#mermaid-svg-YEqFb5JcEk5FWkwO .error-text{fi…

详细分析Mysql中的LOCATE函数(附Demo)

目录 1. 基本概念2. Demo3. 实战 1. 基本概念 LOCATE()函数在SQL中用于在字符串中查找子字符串的位置 它的一般语法如下&#xff1a; LOCATE(substring, string, start)LOCATE()函数返回子字符串在主字符串中第一次出现的位置 如果未找到子字符串&#xff0c;则返回0 具体的…

stm32学习笔记:SPI通信协议原理(未完)

一、SPI简介(serial Peripheral Interface&#xff08;串行 外设 接口&#xff09;) 1、电路模式&#xff08;采用一主多从的模式&#xff09;、同步&#xff0c;全双工 1 所有SPI设备的SCK、MOSI、MISO分别连在一起 2 主机另外引出多条SS控制线&#xff0c;分别接到各从机的S…

数据集成工具 ---- datax 3.0

1、datax: 是一个异构数据源离线同步工具&#xff0c;致力于实现关系型数据库&#xff08;mysql、oracle等&#xff09;hdfs、hive、hbase等各种异构数据源之间的数据同步 2、参考网址文献&#xff1a; https://github.com/alibaba/DataX/blob/master/introduction.mdhttps:/…

代码随想录算法训练营Day46 ||leetCode 139.单词拆分 || 322. 零钱兑换 || 279.完全平方数

139.单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {unordered_set<string> wordSet(wordDict.begin(), wordDict.end());vector<bool> dp(s.size() 1, false);dp[0] true;for (int i 1; i < s.size(); …

【Linux】-Linux下的软件商店yum工具介绍(linux和windows互传文件仅仅一个拖拽搞定!!!!)

目录 1.Linux 软件包管理器yum 1.1快速认识yum 1.2 yumz下载方式&#xff08;如何使用yum进行下载&#xff0c;注意下载一定要是root用户或者白名单用户&#xff08;可提权&#xff09;&#xff09; 1.2.1下载小工具rzsz 1.2.2 rzsz使用 1.2.2查看软件包 1.3软件的卸载 2.yum生…

三、HarmonyOS 应用开发入门之运行Hello World

目录 1、课程对象 1.1、有移动端开发经验 1.2、无移动端开发经验 1.3、对 HarmonyOS 感兴趣 2、DevEco Studio 的使用 2.1、DevEco Studio 的关键特性 智能代码编辑 低代码开发 多段双向实时预览 多端模拟仿真 2.2、安装配置 DevEco Studio 2.2.1、官网开发工具下载地…

蓝桥杯真题讲解:三国游戏(贪心)

蓝桥杯真题讲解&#xff1a;三国游戏&#xff08;贪心&#xff09; 一、视频讲解二、正解代码 一、视频讲解 蓝桥杯真题讲解&#xff1a;三国游戏&#xff08;贪心&#xff09; 二、正解代码 //三国游戏&#xff1a;贪心 #include<bits/stdc.h> #define int long lon…

哪些订单预计会亏?一张报表告诉你

各位数据的朋友&#xff0c;大家好&#xff0c;我是老周道数据&#xff0c;和你一起&#xff0c;用常人思维数据分析&#xff0c;通过数据讲故事。 销售订单一般是企业在销售活动中重要的单据&#xff0c;当我们接到一个客户的订单时&#xff0c;就需要在系统中录入一个销售订…

jQuery模态框弹窗提示代码

jQuery模态框弹窗提示代码 下载地址 jQuery模态框弹窗提示代码

Volatile与JMM

被Volatile修饰的变量有两大特点 可见性 有序性&#xff08;禁重排&#xff09; 如何保证的&#xff1f;内存屏障 Volatile的内存语义 当写一个Volatile变量的时候&#xff0c;JMM会把该线程对应的本地内存共享变量值立即刷新回主内存。 当读一个Volatile变量的时候&…

【Java语言】遍历List元素时删除集合中的元素

目录 前言 实现方式 1.普通实现 1.1 使用【for循环】 方式 1.2 使用【迭代器】方式 2.jdk1.8新增功能实现 2.1 使用【lambda表达式】方式 2.2 使用【stream流】方式 注意事项 1. 使用【for循环】 方式 2. 不能使用增强for遍历修改元素 总结 前言 分享几种从List中移…

程序语言设计

一、程序设计语言及其构成 1.程序设计语言 2.高级程序设计语言划分 3.常见的高级程序语言 4.标记语言 5.程序设计语言的构成 二、表达式 表达式的类型及转换规则 三、传值和传址调用 1.数据类型 2.传值和传址调用 四、语言处理程序 1.语言处理程序 语言处理程序&#xff1…