Spring Boot与Netty打造TCP服务端(解决粘包问题)

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

Spring Boot与Netty打造TCP服务端

    • 前言
    • 功能目标
    • 项目实现
      • maven坐标
      • 构建自定义Handler
      • ChannelInitializer实现
      • server实现

前言

在物联网时代,设备之间的通信变得愈发重要。本文将带你踏上一场关于如何用Spring Boot和Netty搭建TCP服务端的冒险之旅。无论是智能家居、工业自动化还是其他物联网应用,构建一个稳健的通信桥梁将成为连接未来的关键。

功能目标

  • 实现springboot+netty整合TCP服务端(基础)
  • 实现消息回复功能
  • 实现消息太长导致的粘包问题(比如发送一个base64的图片信息)
  • 实现在自定义Handler中注入spring的bean
  • 保证完成任务,哈哈哈哈哈

项目实现

maven坐标

<!-- netty 这里你也可以引入全部-->
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-common</artifactId>
  <version>4.1.79.Final</version>
</dependency>

构建自定义Handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * I/O数据读写处理类
 *
 * @author xiaobo
 */
@Slf4j
public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {


    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
        // 这里是在前面的DelimiterBasedFrameDecoder转为了ByteBuf,验证是否是ByteBuf
        if (msg instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf) msg;
            try {
                String receivedData = byteBuf.toString(CharsetUtil.UTF_8);
                // 接收完整数据
                handleReceivedData(receivedData);
            } finally {
                // 释放 ByteBuf 占用的资源
                byteBuf.release();
                // 回复消息
                ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
            }
        }
    }

    private void handleReceivedData(String receivedData) {
        // 数据处理
        // 这里如果想实现spring中bean的注入,可以用geBean的方式获取
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        log.info("channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        cause.printStackTrace();
        ctx.close();// 抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        // 此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        System.out.println("channelActive:" + clientIp + ctx.name());
      	// 这里是向客户端发送回应
        ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelInactive(ctx);
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        // 断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        ctx.close();
        log.info("channelInactive:{}", clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        ctx.close();// 超时时断开连接
        log.info("userEventTriggered:" + clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        log.info("channelWritabilityChanged");
    }

}

ChannelInitializer实现

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

/**
 * description: <h1>通道初始化</h1>
 *
 * @author bo
 * @version 1.0
 * @date 2024/2/27 16:13
 */
public class CarTcpNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {

        ByteBuf delemiter = Unpooled.buffer();
        delemiter.writeBytes("$".getBytes());
        // 这里就是解决数据过长问题,而且数据是以$结尾的
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(907200, true, true, delemiter));

        // 自定义ChannelInboundHandlerAdapter
        ch.pipeline().addLast(new CarTcpNettyChannelInboundHandlerAdapter());

    }

}

server实现

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * description: <h1>netty创建的TCP</h1>
 *
 * @author bo
 * @version 1.0
 * @date 2024/2/27 16:25
 */
@Slf4j
public class CarTcpNettyServer {

    public void bind(int port) throws Exception {

        // 配置服务端的NIO线程组
        // NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
        // bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
        // bossGroup接收到连接后就会把连接信息注册到workerGroup
        // workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费    最小  初始化  最大 (根据生产环境实际情况来定)
                    // 使用对象池,重用缓冲区
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    // 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
                    .childHandler(new CarTcpNettyChannelInitializer<SocketChannel>());
            log.info("<===========netty server start success!==============>");
            // 绑定端口,同步等待成功
            ChannelFuture f = serverBootstrap.bind(port).sync();
            // 等待服务器监听端口关闭
            f.channel().closeFuture().sync();

        } finally {
            // 退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/435958.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

第三百八十五回

文章目录 1.概念介绍2.使用方法3.示例代码 我们在上一章回中介绍了Snackbar Widget相关的内容,本章回中将介绍TimePickerDialog Widget.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.概念介绍 我们在这里说的TimePickerDialog是一种弹出窗口&#xff0c;只不过窗口的内容…

【LeetCode】升级打怪之路 Day 15:二叉树解题的思维模式 —— 遍历、分解问题

今日题目&#xff1a; 226. 翻转二叉树101. 对称二叉树114. 二叉树展开为链表 目录 LC 226. 翻转二叉树 【easy】LC 101. 对称二叉树 ⭐⭐⭐LC 114. 二叉树展开为链表 ⭐⭐⭐ 今天的题目主要是对二叉树递归遍历的应用&#xff0c;东哥带你刷二叉树&#xff08;思路篇&#xff0…

设计模式—适配器模式

概念: 适配器模式&#xff08;有时候也称包装样式或者包装&#xff09;将一个类的接口适配成用户所期待的。一个适配允许通常因为接口不兼容而不能在一起工作的类工作在一起&#xff0c;做法是将类自己的接口包裹在一个已存在的类中。 本章代码: 小麻雀icknn/设计模式练习 - Gi…

微服务---Eureka注册中心

目录 一、服务中的提供者与消费者 二、Eureka工作流程 三、搭建Eureka服务 四、服务拉取 五、总结 1.搭建EurekaServer 2.服务注册 3.服务发现 一、服务中的提供者与消费者 服务提供者&#xff1a;一次业务中&#xff0c;被其他微服务调用的服务。即提供接口给其他微服务。…

SpringBoot整合【RocketMQ】

目录 1.POM文件添加依赖及yml配置 2.RocketmqUtil 3.生产者&#xff08;异步发送示例&#xff09; 4.消费者 5.测试 1.POM文件添加依赖及yml配置 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter&l…

Android使用Sensor.TYPE_STEP_COUNTER计步器传感器进行步数统计

1、首先&#xff0c;申请权限 必须声明 ACTIVITY_RECOGNITION 权限&#xff0c;以便您的应用在运行 Android 10 (API 级别 29) 或更高版本的设备上使用此传感器。 Manifest.xml也记得声明 if (Build.VERSION.SDK_INT > Build.VERSION_CODES.P) {Log.d(TAG, "[权限]&quo…

Elasticsearch:从 ES|QL 到 Python 数据帧

在我之前的文章 “Elasticsearch&#xff1a;ES|QL 查询展示”&#xff0c;我展示了如何在 Kibana 中使用 ES|QL 对索引来进行查询及统计。在很多的情况下&#xff0c;我们需要在客户端中来对数据进行查询&#xff0c;那么我们该怎么办呢&#xff1f;我们需要使用到 Elasticsea…

挖掘NCDA设计大赛获奖作品的成功之道:探讨表情包设计竞争力的关键因素

第12届大赛简介 - 未来设计师全国高校数字艺术设计大赛&#xff08;NCDA&#xff09;开始啦&#xff01;今天我们就特地来说说它的虚拟IP及表情包设计的命题之一的表情包设计选项&#xff0c;为了使大家更好地参加本次比赛&#xff0c;本文特别整理了往届NCDA的表情包设计获奖作…

(每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(五)

项目建议与立项申请、初步可行性研究、详细可行性研究、评估与决策是项目投资前使其的四个阶段。在实际工作中&#xff0c;初步可行性研究和详细可行性研究可以依据项目的规模和繁简程度合二为一&#xff0c;但详细可行性研究是不可缺少的。升级改造项目制作初步和详细研究&…

基于Python实现银行卡识别

在本文中将介绍如何使用Python和深度学习技术来实现银行卡识别功能。银行卡识别是一个在金融、安全等领域具有重要应用的问题&#xff0c;将使用深度学习模型来实现银行卡图像的识别和分类。 目录 引言数据集准备预处理和特征提取模型选择与训练模型评估与性能优化部署与应用 引…

【Leetcode 438】找到字符串中所有字母异位词 —— 滑动窗口

438. 找到字符串中所有字母异位词 给定两个字符串s和p&#xff0c;找到s中所有p的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 示例 1: 输入: s “cbaeb…

DRAM 是什么?一文看懂DRAM 产业完整介绍!

全球经济前景不乐观&#xff0c;导致 DRAM 需求下滑&#xff0c;随着 DRAM 价格的连续下跌&#xff0c;三星、海力士等相关大厂的业绩前景都不被看好。那究竟 DRAM 到底是什么产品&#xff1f; DRAM 是什么&#xff1f; DRAM 其实就是我们一般生活中常常在讲的“存储”&#x…

回溯算法05-分割回文子串(Java)

5.分割回文子串 题目描述 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s 所有可能的分割方案。 回文串 是正着读和反着读都一样的字符串。 示例 1&#xff1a; 输入&#xff1a;s "aab" 输出&#xff1a;[[…

云服务器99元一年阿里云和腾讯云对比,明智选择!

腾讯云服务器99元一年是真的吗&#xff1f;真的&#xff0c;只是又降价了&#xff0c;现在只要61元一年&#xff0c;配置为2核2G3M轻量应用服务器&#xff0c;40GB SSD盘&#xff0c;腾讯云百科txybk.com分享腾讯云官方活动购买链接 https://curl.qcloud.com/oRMoSucP 活动打开…

Clion调试QT程序qDebug()、cout控制台无输出的可能解决方法

qDebug()不输出 在当前项目配置中添加一个环境变量 方法一、单独为配置 QT_ASSUME_STDERR_HAS_CONSOLE1 方法二、全局配置&#xff08;系统变量&#xff09; 一劳永逸 效果 cout不输出 Clion在debug调试C/C的时候&#xff0c;printf/cout不会实时输出情况 结果同上~ 谢阅…

【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测

视觉AIGC识别——人脸伪造检测、误差特征 不可见水印 前言视觉AIGC识别【误差特征】DIRE for Diffusion-Generated Image Detection方法扩散模型的角色DIRE作为检测指标 实验结果泛化能力和抗扰动 人脸伪造监测&#xff08;Face Forgery Detection&#xff09;人脸伪造图生成 …

进程间通信之信号灯 || 网络协议UDP/TCP || 三次握手四次挥手

在线程通信中由于数据段等内存空间的共用性&#xff0c;导致同时访问时资源竞争的问题&#xff0c;在线程中我们使用信号量的申请和释放&#xff0c;在防止资源竞争的产生。在进程间的通信中&#xff0c;有信号灯的概念。搭配共享内存实现进程同步。 有名信号量: 1.创建 …

请你简单说一下 Mysql 的事务隔离级别

什么情况&#xff0c;写了 5 年的 CRUD&#xff0c;还搞不清楚 Mysql 的事务隔离级别&#xff0c;难怪第一面就被刷下来。 一个 5 年经验的粉丝&#xff0c;在一个公司干了 5 年&#xff0c;觉得自己特厉害&#xff0c;什么都能搞定&#xff0c;结果每次一到技术面就被刷。问我…

滴滴基于 Clickhouse 构建新一代日志存储系统

ClickHouse 是2016年开源的用于实时数据分析的一款高性能列式分布式数据库&#xff0c;支持向量化计算引擎、多核并行计算、高压缩比等功能&#xff0c;在分析型数据库中单表查询速度是最快的。2020年开始在滴滴内部大规模地推广和应用&#xff0c;服务网约车和日志检索等核心平…

Unity UGUI之InputField(TMP)基本了解

Unity的InputField组件是用于在Unity中创建可供用户输入文本的输入框的UI组件。通过InputField组件&#xff0c;可以让用户在运行时输入文本&#xff0c;比如用户名、密码、搜索关键字等。其中TMP版本的InputField是基于TextMeshPro的InputField组件&#xff0c;提供了更多的文…