基于Netty构建Websocket服务端

除了构建TCP和UDP服务器和客户端,Netty还可以用于构建WebSocket服务器。WebSocket是一种基于TCP协议的双向通信协议,可以在Web浏览器和Web服务器之间建立实时通信通道。下面是一个简单的示例,演示如何使用Netty构建一个WebSocket服务器。
项目目录:
在这里插入图片描述
引入pom依赖:

 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.69.Final</version>
 </dependency>
 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <optional>true</optional>
 </dependency>

编写SocketServer:

package com.lzq.websocket.config;

import com.lzq.websocket.handlers.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration
public class WebSocketConfig implements CommandLineRunner {

    private static final Integer PORT = 8888;

    @Override
    public void run(String... args) throws Exception {
        new WebSocketConfig().start();
    }

    public void start() {
        // 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            // 最大数据长度
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // 添加接收websocket请求的url匹配路径
                            pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                            // 10秒内收不到消息强制断开连接
                            // pipeline.addLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(PORT).sync();
            log.info("websocket server started, port={}", PORT);
            // 处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭
            // 阻塞
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("websocket server exception", e);
            throw new RuntimeException(e);
        } finally {
            log.info("websocket server close");
            // 关闭EventLoopGroup
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写WebSocketHandler:

package com.lzq.websocket.handlers;

import com.lzq.websocket.config.NettyConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker webSocketServerHandshaker;
    private static final String WEB_SOCKET_URL = "ws://127.0.0.1:8888/websocket";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 创建连接时执行
        NettyConfig.group.add(ctx.channel());
        log.info("client channel active, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 关闭连接时执行
        NettyConfig.group.remove(ctx.channel());
        log.info("client channel disconnected, id={}", ctx.channel().id().toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 服务端接收客户端发送过来的数据结束之后调用
        ctx.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshake = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            log.info("client channel connected, id={}, url={}", ctx.channel().id().toString(), handshake.requestUri());
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            // 处理客户端http握手请求
            handlerHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 处理websocket连接业务
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 处理websocket连接业务
     *
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        log.info("handlerWebSocketFrame>>>>class={}", frame.getClass().getName());
        // 判断是否是关闭websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
            return;
        }
        // 判断是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new RuntimeException("不支持消息类型:" + frame.getClass().getName());
        }
        String text = ((TextWebSocketFrame) frame).text();
        if ("ping".equals(text)) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        log.info("WebSocket message received: {}", text);
        /**
         * 可通过客户传输的text,设计处理策略:
         * 如:text={"type": "messageHandler", "userId": "111"}
         * 服务端根据type,采用策略模式,自行派发处理
         *
         * 注意:这里不需要使用线程池,因为Netty 采用 Reactor线程模型(目前使用的是主从Reactor模型),
         * Handler已经是线程处理,每个用户的请求是线程隔离的
         */
        // 返回WebSocket响应
        ctx.writeAndFlush(new TextWebSocketFrame("server return:" + text));
        /*// 群发
        TextWebSocketFrame twsf = new TextWebSocketFrame(new Date().toString()
                + ctx.channel().id()
                + " : "
                + text);
        NettyConfig.group.writeAndFlush(twsf);*/
    }

    /**
     * 处理客户端http握手请求
     *
     * @param ctx
     * @param request
     */
    private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        log.info("handlerHttpRequest>>>>class={}", request.getClass().getName());
        // 判断是否采用WebSocket协议
        if (!request.getDecoderResult().isSuccess() || !("websocket".equals(request.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
        webSocketServerHandshaker = wsFactory.newHandshaker(request);
        if (webSocketServerHandshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            webSocketServerHandshaker.handshake(ctx.channel(), request);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
        if (response.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        // 服务端向客户端发送数据
        ChannelFuture f = ctx.channel().writeAndFlush(response);
        if (response.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 非正常断开时调用
        log.error("client channel execute exception, id={}", ctx.channel().id().toString(), cause);
        ctx.close();
    }
}

NettyConfig:

package com.lzq.websocket.config;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class NettyConfig {
    /**
     * 存储接入的客户端的channel对象
     */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

使用Apifox测试:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/264540.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

构建数字化金融生态系统:云原生的创新方法

内容来自演讲&#xff1a;曾祥龙 | DaoCloud | 解决方案架构师 摘要 本文探讨了金融企业在实施云原生体系时面临的挑战&#xff0c;包括复杂性、安全、数据持久化、服务网格使用和高可用容灾架构等。针对网络管理复杂性&#xff0c;文章提出了Spiderpool开源项目&#xff0c;…

csrf自动化检测调研

https://github.com/pillarjs/understanding-csrf/blob/master/README_zh.md CSRF 攻击者在钓鱼站点&#xff0c;可以通过创建一个AJAX按钮或者表单来针对你的网站创建一个请求&#xff1a; <form action"https://my.site.com/me/something-destructive" metho…

VM进行TCP/IP通信

OK就变成这样 vm充当服务端的话也是差不多的操作 点击连接 这里我把端口号换掉了因为可能被占用报错了&#xff0c;如果有报错可以尝试尝试换个端口号 注&#xff1a; 还有一个点在工作中要是充当服务器&#xff0c;要去网络这边看下他的ip地址 拉到最后面

HarmonyOS - macOS 上搭建 鸿蒙开发环境

文章目录 安装 DevEco第一个 App1、工程基本信息设置2、安装设备3、运行工程 安装 DevEco 软件下载地址&#xff1a; https://developer.harmonyos.com/cn/develop/deveco-studio 今天我下载 DevEco Studio 3.1.1 Release - Mac 版本 解压后是一个 dmg 文件&#xff08;也不必…

Mac电脑上soucetree账户更改

在开发公司项目的时候遇到一个问题。soucetree提示需要输入已离职员工-张三的密码。 问题&#xff1a;Mac电脑使用souetree&#xff0c;拉取仓库代码提示需要输入其他员工密码。 解决&#xff1a; Mac电脑 SourceTree去掉之前的账户 1、前往文件路径 /Library/Application Su…

03-基于GEC6818开发板实现BMP图片的加载——实例分析

03-基于GEC6818开发板实现加载一张图片 实现基于GEC6818开发板实现加载一张BMP文件。其中详细解析了一张BMP格式图的内容。 其他相关GEC6818开发板的内容可以参考 01-基于粤嵌GEC6818实现屏幕的显示固定颜色进行自动切换 02-基于GEC6818开发板的画正方形、画圆的操作——使用mm…

LTO-3 磁带机种草终于是用上了

跑来跑去&#xff0c;买了不少配件&#xff0c;终于是把这磁带机给用上了&#xff0c;已经备份好了300 多 GB 的数据。 我们用了 NAS 的数据压缩功能&#xff0c;把需要备份的文件用 NAS 压缩成一个 Zip 文件&#xff0c;如果你可以 tar 的话也行。 这样传输速度更快&#xf…

【即插即用篇】YOLOv8改进实战 | 引入 Involution(内卷),用于视觉识别的新一代神经网络!涨点神器!

YOLOv8专栏导航:点击此处跳转 前言 YOLOv8 是由 YOLOv5 的发布者 Ultralytics 发布的最新版本的 YOLO。它可用于对象检测、分割、分类任务以及大型数据集的学习,并且可以在包括 CPU 和 GPU 在内的各种硬件上执行。 YOLOv8是一种尖端的、最先进的 (SOTA) 模型,它建立在以前成…

金蝶Apusic应用服务器 loadTree JNDI注入漏洞复现(QVD-2023-48297)

0x01 产品简介 金蝶Apusic应用服务器是一款企业级应用服务器,支持Java EE技术,适用于各种商业环境。 0x02 漏洞概述 由于金蝶Apusic应用服务器权限验证不当,导致攻击者可以向loadTree接口执行JNDI注入,造成远程代码执行漏洞。利用该漏洞需低版本JDK。(漏洞比较旧,8月份…

Linux ContOS7 日志管理(rsyslog)

目录 01. rsyslog 记录日志程序 02.日志文件 03.日志等级 Linux 日志文件是记录 Linux 系统运行信息的文件。它们类似于人类的日记&#xff0c;记录了系统的各种活动&#xff0c;如用户登录、进程启动、错误消息等。 Linux 日志文件通常存储在 /var/log/ 目录中。该目录包含…

STM32G4x FLASH 读写配置结构体(LL库下使用)

主要工作就是把HAL的超时用LL库延时替代&#xff0c;保留了中断擦写模式、轮询等待擦写&#xff0c;我已经验证了部分。 笔者用的芯片为STM32G473CBT6 128KB Flash&#xff0c;开环环境为CUBEMXMDK5.32&#xff0c;因为G4已经没有标准库了&#xff0c;笔者还是习惯使用标准库的…

3.[BUUCTF HCTF 2018]WarmUp1

1.看题目提示分析题目内容 盲猜一波~ &#xff1a; 是关于PHP代码审计的 2.打开链接&#xff0c;分析题目 给你提示了我们访问source.php来看一下 大boss出现&#xff0c;开始详细手撕~ 3.手撕PHP代码&#xff08;代码审计&#xff09; 本人是小白&#xff0c;所以第一步&…

Python 将RTF文件转为Word 、PDF、HTML

RTF也称富文本格式&#xff0c;是一种具有良好兼容性的文档格式&#xff0c;可以在不同的操作系统和应用程序之间进行交换和共享。有时出于不同项目的需求&#xff0c;我们可能需要将RTF文件转为其他格式。本文将介如何通过简单的Python代码将RTF文件转换为Word Doc/Docx、PDF、…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)

在这篇文章中虽然实现了能够和多客户端建立连接&#xff0c;并且同时和多个客户端进行通信。 基于多反应堆的高并发服务器【C/C/Reactor】&#xff08;上&#xff09;-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135141316?spm1001.2014.3001.5501但是有…

XML简介 (EXtensible Markup Language)

XML简介 (EXtensible Markup Language) 可扩展标记语言 特点 XML与操作系统、编程语言的开发平台无关实现不同系统之间的数据交换 作用 数据交互配置应用程序和网站Ajax基石 XML标签 XML文档内容由一系列标签元素组成 <元素名 属性名"属性值">元素内容&l…

Echarts饼图tooltip渐变色,内部legend百分比保留整数方法

业务场景&#xff1a;1、tooltip的背景需要渐变色&#xff0c;写 html 标签&#xff0c; 2、饼图内部的百分比需要保留整数 &#xff0c;使用formatter&#xff0c; export function genChartPieOption(pieData) {const res {replaceMerge: [series,], // 解决刷新之后y轴丢失…

P1883 函数

题目链接 P1883 函数 思路 举例 题目中的 F ( x ) F(x) F(x) 看起来很复杂&#xff0c;但由于每个 f ( x ) f(x) f(x) 的二次项系数 a a a 都不是负数&#xff0c;故 F ( x ) F(x) F(x) 是一个单谷函数。直接说出结论可能有些令人难以接受&#xff0c;不妨举出两个例子…

动物分类识别教程+分类释义+界面展示

1.项目简介 动物分类教程分类释义界面展示 动物分类是生物学中的一个基础知识&#xff0c;它是对动物进行分类、命名和描述的科学方法。本教程将向您介绍动物分类的基本原则和方法&#xff0c;并提供一些常见的动物分类释义。 动物分类的基本原则 动物分类根据动物的形态、…

Linux系统中的地址映射

一. 简介 在前面的裸机开发实验 LED灯实验中 &#xff0c;其实就是操作 IMX6ULL芯片的寄存器。 Linux 驱动开发也可以操作寄存器&#xff0c;但是&#xff0c;Linux不能直接对寄存器物理地址进行读写操作&#xff0c;例如&#xff0c;寄存器 A的物理地址为 0X01010101。 裸机…

2023亚马逊云科技re:Invent用Amazon Q打造你的知识库

随着ChatGPT的问世&#xff0c;我们迎来了许多创新和变革的机会。一年一度的亚马逊云科技大会re:Invent也带来了许多前言的技术&#xff0c;其中亚马逊云科技CEO Adam Selipsky在2023亚马逊云科技re:Invent大会中重磅推出Amazon Q&#xff0c;这预示着生成式AI的又一个里程碑。…