Springboot+Netty搭建基于TCP协议的服务端

文章目录

    • 概要
    • pom依赖
    • Netty的server服务端类
    • Netty通道初始化
    • I/O数据读写处理
    • 测试发送消息 并 接收服务端回复
    • 异步启动Netty
    • 运行截图

概要

Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点
Netty的优点:
1.API使用简单,开发入门门槛低。
2.功能十分强大,预置多种编码解码功能,支持多种主流协议。
3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。
4.性能优异,特别在综合性能上的优异性。
5.成熟,稳定,适用范围广。
6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。

好了,废话不多说了,上代码

pom依赖

        <!-- netty依赖 springboot2.x自动导入版本 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

Netty的server服务端类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Netty服务器(端口自行更换,默认端口10100)
 * @author wusiwee
 */
@Service
@Slf4j
public class NettyServer {

    /**
     * 注入Netty通道初始化处理器
     */
    private final NettyChannelInboundHandlerAdapter handlerAdapter;

    /**
     * 通过构造函数注入依赖
     * @param handlerAdapter 处理器
     */
    @Autowired
    public NettyServer(NettyChannelInboundHandlerAdapter handlerAdapter) {
        this.handlerAdapter = handlerAdapter;
    }

    /**
     * 启动Netty服务器
     * @throws Exception 如果启动过程中发生异常
     */
    public void bind() throws Exception {
        // 定义bossGroup和workerGroup来处理网络事件
        // 用于接受客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 用于实际的业务处理操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建ServerBootstrap实例来引导绑定和启动服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 指定使用NIO的传输Channel
                    .channel(NioServerSocketChannel.class)
                    // 设置TCP接收缓冲区大小
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    // 设置自定义的Channel初始化器
                    .childHandler(new NettyChannelInitializer(handlerAdapter));

            log.info("netty server start success!");
            // 绑定端口,并同步等待成功,即启动Netty服务
            ChannelFuture f = serverBootstrap.bind(10100).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Netty server startup interrupted", e);
            Thread.currentThread().interrupt();
        } finally {
            // 优雅关闭事件循环组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Netty通道初始化

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;

/**
 * 通道初始化
 * @author wusiwee
 */
@Component
public class NettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {

    /**
     * 注入,目的是在该 HandlerAdapter 可以正确的注入业务Service
     */
    private final NettyChannelInboundHandlerAdapter handlerAdapter;

    public NettyChannelInitializer(NettyChannelInboundHandlerAdapter handlerAdapter) {
        this.handlerAdapter = handlerAdapter;
    }

    @Override
    protected void initChannel(Channel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        // 响应字符串
        pipeline.addLast(new StringEncoder());
        // 自定义ChannelInboundHandlerAdapter
        pipeline.addLast(handlerAdapter);
    }

I/O数据读写处理

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.util.Date;

/**
 * I/O数据读写处理类
 *  客户端发送的消息 以及 回复客户端消息 均在此处
 *  @ChannelHandler.Sharable 此注解用于在多个 Channel 中重复使用同一个 Handler 实例
 * @author wusiwee
 */
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{

    /**
     * 这里可以注入自己的service
     */
    @Autowired
    private IUserService iUserService;

    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        // 确保接收的数据长度足够,minimumLength 是所有字段长度的总和
        if (in.readableBytes() < MINIMUM_LENGTH) {
            ctx.writeAndFlush("报文长度过低,数据不完整"+"\n");
            return;
        }
        // 1,读取固定长度字符
        byte[] frameStart = new byte[4];
        in.readBytes(frameStart);
        String frameStartStr = new String(frameStart, java.nio.charset.StandardCharsets.UTF_8);
        log.info("1.解析:"+frameStartStr);
        ctx.writeAndFlush("I got it\n");
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        log.info("读取完成 channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("exceptionCaught");
        cause.printStackTrace();
        //抛出异常,断开与客户端的连接
        ctx.close();
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        log.info("客户端连接 channelActive{}", clientIp+" "+ctx.name());
    }

    /**
     * 客户端与服务端 断连时 执行
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        ctx.close();
        log.info("channelInactive{}", clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //超时 断开连接
        ctx.close();
        log.info("userEventTriggered{}", clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        log.info("注册 channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        log.info("channelUnregistered");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        log.info("channelWritabilityChanged");
    }
}

测试发送消息 并 接收服务端回复


    @Test
    void contextLoads() {

        try {
            // 服务器地址
            String host = "127.0.0.1";
            // 服务器端口
            int port = 10100;
            // 要发送的消息
            String message = "7E7E010038401010123433004D02000B22";
            Socket socket = new Socket(host, port);

            // 获取输出流
            OutputStream outputStream = socket.getOutputStream();

            // 将字符串转换为字节数组
            byte[] data = message.getBytes();

            // 写入数据到输出流
            outputStream.write(data);
            // 刷新输出流,确保数据发送
            outputStream.flush();

            InputStream input = socket.getInputStream();
            //读取服务器返回的消息
            BufferedReader br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
            String mess = br.readLine();
            System.out.println("服务器回复:" + mess);
            input.close();
            outputStream.close();
            socket.close();
        }catch (Exception e){
            System.out.println("出现异常");
        }
    }

异步启动Netty

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * 启动类
 */
@SpringBootApplication
@EnableAsync
public class NettyApplication implements ApplicationRunner{
    
    /**
     * 启动springboot
     */
    public static void main( String[] args ) {
        SpringApplication.run(NettyApplication.class, args);
    }

    /**
     * 创建独立线程池
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 30, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(2),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy());
    /**
     * 注入Netty消息处理器
     */
    @Resource
    private NettyChannelInboundHandlerAdapter handlerAdapter;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 使用线程池 异步启动Netty服务器
        executorService.submit(() -> {
            try {
                // 启动netty,绑定端口号
                new NettyServer(handlerAdapter).bind();
            } catch (Exception e) {
                // 异常处理
                System.out.println("启动netty出现异常:"+e.getMessage());
            }
        });
    }
}

运行截图

启动服务
回复客户端消息的代码片段
消息回复
测试发送
发送测试
客户端收到回复,断开连接
在这里插入图片描述

攀峰之高险,岂有崖颠;搏海之明辉,何来彼岸?前进不止,奋斗不息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/352641.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高中数学常识

一、大小关系 |x| > |sinx| 理由&#xff1a; 很明显&#xff0c;在圆内&#xff0c;弧长x>垂线sinx 3x、2x 、 1 2 \frac{1}{2} 21​x 理由&#xff1a; log 1 2 _\frac{1}{2} 21​​x、log 2 _2 2​x、 log 3 _3 3​x 二、(xy)? 的求法 利用二项式定理 三、平…

GitHub国内打不开(解决办法有效)

最近国内访问github.com经常打不开&#xff0c;无法访问。 github网站打不开的解决方法 1.打开网站http://tool.chinaz.com/dns/ &#xff0c;在A类型的查询中输入 github.com&#xff0c;找出最快的IP地址。 2.修改hosts文件。 在hosts文件中添加&#xff1a; # localhost n…

超简单的正则表达式从入门到精通

正则表达式&#xff0c;又称规则表达式&#xff08;英语&#xff1a;Regular Expression&#xff0c;在代码中常简写为regex、regexp或RE&#xff09;&#xff0c;计算机科学的一个概念。正则表达式通常被用来检索、替换那些符合某个模式(规则)的文本。 概念 正则表达式是对字…

【Linux】从C语言文件操作 到Linux文件IO | 文件系统调用

文章目录 前言一、C语言文件I/O复习文件操作&#xff1a;打开和关闭文件操作&#xff1a;顺序读写文件操作&#xff1a;随机读写stdin、stdout、stderr 二、承上启下三、Linux系统的文件I/O系统调用接口介绍open()close()read()write()lseek() Linux文件相关重点 复习C文件IO相…

Leetcode541反转字符串Ⅱ(java实现)

我们今天分享的题目是字符串反转的进阶版反转字符串Ⅱ。 我们首先来看题目描述&#xff1a; 乍一看题目&#xff0c;有种懵逼的感觉&#xff0c;不要慌&#xff0c;博主来带着你分析题目&#xff0c;题目要求&#xff1a; 1. 每隔2k个字符&#xff0c;就对2k字符中的前k个字符…

SVN安装使用

​ 1.下载 TortoiseSVN-1.14.6.29673-x64-svn-1.14.3.msi 2.安装 ​ ​ ​ 可以修改安装目录 ​ 修改命令行工具&#xff0c;否则idea无法配置svn可执行文件 ​ ​ ​ ​ ​ ​ ​ 可以选择no&#xff0c;先不重启电脑 3.拉取代码 ​ ​ 4.Idea配置svn ​…

代码随想录算法训练营day4 | 链表(2)

一、LeetCode 24 两两交换链表中的节点 题目链接&#xff1a;24.两两交换链表中的节点https://leetcode.cn/problems/swap-nodes-in-pairs/ 思路&#xff1a;设置快慢指针&#xff0c;暂存节点逐对进行交换。 代码优化前&#xff1a; /*** Definition for singly-linked list…

435. 无重叠区间 - 力扣(LeetCode)

题目描述 给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 题目示例 输入: intervals [[1,2],[2,3],[3,4],[1,3]] 输出: 1 解释: 移除 [1,3] 后&#xff0c;剩下的区间没有重…

Django模型(一)

一、介绍 模型,就是python中的类对应数据库中的表 1.1、ORM ORM 就是通过实例对象的语法,完成关系型数据库的操作的技术,是"对象-关系映射"(Object/Relational Mapping) 的缩写 ORM 把数据库映射成对象 1.2、示例 1.2.1、模型 from django.db import models…

海康实时监控预览视频流接入web

我们采取的方案是后端获取视频流返回给前端&#xff0c;然后前端播放 海康开放平台海康威视合作生态致力打造一个能力开放体系、两个生态圈&#xff0c;Hikvision AI Cloud开放平台是能力开放体系的核心内容。它是海康威视基于多年在视频及物联网核心技术积累之上&#xff0c;…

我们应该怎样定义 BTC Layer2?

撰文&#xff1a;Jademont&#xff0c;水滴资本创始人 原文来自Techub News&#xff1a;我们应该怎样定义 BTC Layer2&#xff1f; 广义的 BTC Layer2&#xff1a; 只要消耗 BTC 作为 gas&#xff0c;以 BTC 为底层资产&#xff0c;可以做为 dapp 平台&#xff0c;性能又远优…

【2024】Docker部署Redis

1.说明&#xff1a; 因为容器实例的运行是有生命周期的&#xff0c;一些redis的备份、日志和配置文件什么的最好还是放在服务器本地。这样当容器删除时&#xff0c;我们也可以保留备份和日志文件。所以先在本地服务器安装redis并配置文件设置。下面是安装步骤: 2.安装步骤 1…

人脸识别 FaceNet人脸识别(一种人脸识别与聚类的统一嵌入表示)

人脸识别 FaceNet人脸识别&#xff08;一种人脸识别与聚类的统一嵌入表示&#xff09; FaceNet的简介Facenet的实现思路训练部分 FaceNet的简介 Facenet的实现思路 import torch.nn as nndef conv_bn(inp, oup, stride 1):return nn.Sequential(nn.Conv2d(inp, oup, 3, stride…

什么是RBAC

什么是RBAC 概述&#xff1a;RBAC&#xff1a;Role-Based Access Control详解&#xff1a;什么是基于⻆⾊的访问控制具体实现&#xff1a;如何设计RABC模型其他介绍&#xff1a;RBAC支持三个著名的安全原则 概述&#xff1a;RBAC&#xff1a;Role-Based Access Control RBAC&a…

【网站项目】基于SSM的228图书商城网站

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

数据监控-Prometheus/Grafana

一、数据监控Prometheus 1、什么是Prometheus Prometheus是由SoundCloud开源监控告警解决方案,从2012年开始编写代码,到2015年github上开源以来,吸引不少用户以及公司的使用。Prometheus作为新一代的开源解决方案,很多理念与Google SRE的运维之道不谋而合。 2、Promet…

YOLO自制数据集及训练

使用 Make Sense 网站进行标注 https://www.makesense.ai/可以让AI帮你先标一下 一定要点一下 + ,不然不会加进去 导出标签

【第五天】蓝桥杯备战

1、金币 https://www.lanqiao.cn/problems/357/learning/ 解法&#xff1a;暴力 import java.util.Scanner; // 1:无需package // 2: 类名必须Main, 不可修改public class Main {public static void main(String[] args) {Scanner scan new Scanner(System.in);//在此输入…

蓝牙----蓝牙GAP层

蓝牙协议栈----GAP GAP的角色连接过程连接参数 GAP&#xff1a;通用访问配置协议层 gap的角色发现的模式与过程连接模式与过程安全模式与过程 CC2640R2F的GAP层抽象 GAP的角色 Broadcaster 广播电台 -不可连接的广播者。Observer 观察者 -扫描广播者但无法启动连接。Periphe…

SpringBoot系列之JPA实现按年月日查询

SpringBoot系列之JPA实现按年月日查询 通过例子的方式介绍Springboot集成Spring Data JPA的方法&#xff0c;进行实验&#xff0c;要先创建一个Initializer工程&#xff0c;如图&#xff1a; 选择&#xff0c;需要的jdk版本&#xff0c;maven项目 选择需要的maven配置&#x…