Spring Boot+Netty

因工作中需要给第三方屏幕厂家下发广告,音频,图片等内容,对方提供TCP接口于是我使用Netty长链接进行数据传输

1.添加依赖

 <!--  netty依赖-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>   

2.创建Netty服务

@Slf4j
@Component
public class NettyServer {
    public void start(InetSocketAddress address) {
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 绑定线程池,编码解码
            //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    // 指定Channel
                    .channel(NioServerSocketChannel.class)
                    //使用指定的端口设置套接字地址
                    .localAddress(address)
                    //使用自定义处理类
                    .childHandler(new NettyServerChannelInitializer())
                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //保持长连接,2小时无数据激活心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //将小的数据包包装成更大的帧进行传送,提高网络的负载
                    .childOption(ChannelOption.TCP_NODELAY, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(address).sync();
            if (future.isSuccess()) {
                log.info("netty服务器开始监听端口:{}",address.getPort());
            }
            //关闭channel和块,直到它被关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3.创建Socket配置类(也可以直接在步骤2中写死)

1.在配置文件中

socket:
  # 监听端口 8090
  port: 8090
  #ip地址
  host: 0.0.0.0
#  host: 192.168.31.2

@Setter
@Getter
@ToString
@Component
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {
    private Integer port;
    private String host;

}

4.在springboot 启动类中启用Netty服务

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(Application.class);
        application.setApplicationStartup(new BufferingApplicationStartup(2048));
        application.run(args);
    }
    @Resource
    private NettyServer nettyServer;

    @Resource
    private SocketProperties socketProperties;
    @Override
    public void run(String... args) {
        InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());
        nettyServer.start(address);
    }

}

5.创建字符解析器,解析收到的消息

/**
 * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
 *
 */
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //接收消息格式,使用自定义解析数据格式
//        pipeline.addLast("decoder",new MyDecoder());
        //发送消息格式,使用自定义解析数据格式
//        pipeline.addLast("encoder",new MyEncoder());
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开
        //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用
        pipeline.addLast(new IdleStateHandler(0,0,90, TimeUnit.SECONDS));
        //自定义的空闲检测
        pipeline.addLast(new NettyServerHandler());
    }
}

6.创建Handler 类处理消息

/**
 * 功能描述: netty服务端处理类
 */

@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    /**
     * 功能描述: 有客户端连接服务器会触发此函数
     *
     * @param ctx 通道
     * @return void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        int clientPort = insocket.getPort();
        //获取连接通道唯一标识
        ChannelId channelId = ctx.channel().id();
        //如果map中不包含此连接,就保存连接
        if (ChannelMap.getChannelMap().containsKey(channelId)) {
            log.info("客户端:{},是连接状态,连接通道数量:{} ", channelId, ChannelMap.getChannelMap().size());
        } else {
            //保存连接
            ChannelMap.addChannel(channelId, ctx.channel());
            log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]", channelId, clientIp, clientPort);
            log.info("连接通道数量: {}", ChannelMap.getChannelMap().size());
        }
    }

    /**
     * 功能描述: 有客户端终止连接服务器会触发此函数
     * @param ctx 通道处理程序上下文
     * @return void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        ChannelId channelId = ctx.channel().id();
        //包含此客户端才去删除
        if (ChannelMap.getChannelMap().containsKey(channelId)) {
            //删除连接
            ChannelMap.getChannelMap().remove(channelId);
            log.info("客户端:{},断开netty服务器[IP:{}-->PORT:{}]", channelId, clientIp, inSocket.getPort());
            log.info("连接通道数量: " + ChannelMap.getChannelMap().size());
        }
    }

   
    @Transactional
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        ByteBuf rebuf = Unpooled.buffer();

        RedisUtils.setChannelId(ctx.channel().id().toString(), ctx.channel().id());
        // 读取帧头标识
        byte frameHeader = buf.readByte();
        if (frameHeader != 0x7E) {
            byte[] data = ByteBufUtil.getBytes(buf);
            String hex = bytesToHex(data);
            buf.release();
            String content = ((ByteBuf) msg).toString(Charset.defaultCharset());
        }  // 读取消息帧类型
        else {
            byte messageType = buf.readByte();
             // 读取帧尾标识
            if (buf.isReadable()) {
                // 读取校验值
                byte checksum = buf.readByte();
                byte frameTail = buf.readByte();
               
            }
        }
        buf.release();
    }

    /**
     * 功能描述: 服务端给客户端发送消息
     *
     * @param channelId 连接通道唯一id
     * @param msg       需要发送的消息内容
     * @return void
     */
    public void channelWrite(ChannelId channelId, Object msg) throws Exception {
        Channel channel = ChannelMap.getChannelMap().get(channelId);
        if (channel == null) {
            log.info("通道:{},不存在", channelId);
            return;
        }
        if (msg == null || msg == "") {
            log.info("服务端响应空的消息");
            return;
        }
        //将客户端的信息直接返回写入ctx
        channel.write(msg);
        //刷新缓存区
        channel.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String socketString = ctx.channel().remoteAddress().toString();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client:{},READER_IDLE 读超时", socketString);
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                // 超时未收到心跳包,更新设备状态为离线
                // todo 更新设备状态
                ctx.disconnect();
                ChannelMap.removeChannelByName(id);
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client:{}, WRITER_IDLE 写超时", socketString);
                ctx.disconnect();
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                ChannelMap.removeChannelByName(id);
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client:{},ALL_IDLE 总超时", socketString);
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                // 超时未收到心跳包,更新设备状态为离线
                // todo 更新设备状态
                ctx.disconnect();
                ChannelMap.removeChannelByName(id);
            }
        }
    }

    /**
     * 功能描述: 发生异常会触发此函数
     *
     * @param ctx   通道处理程序上下文
     * @param cause 异常
     * @return void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}", ctx.channel().id(), ChannelMap.getChannelMap().size());
    }


}

ChannelMap类

/**
 * 功能描述: 管理通道Map类
 *
 */
public class ChannelMap {

    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     */
    private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);

    public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {
        return CHANNEL_MAP;
    }

    /**
     *  获取指定name的channel
     */
    public static Channel getChannelByName(ChannelId channelId){
        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
            return null;
        }
        return CHANNEL_MAP.get(channelId);
    }

    /**
     *  将通道中的消息推送到每一个客户端
     */
    public static boolean pushNewsToAllClient(String obj){
        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
            return false;
        }
        for(ChannelId channelId: CHANNEL_MAP.keySet()) {
            Channel channel = CHANNEL_MAP.get(channelId);
            channel.writeAndFlush(new TextWebSocketFrame(obj));
        }
        return true;
    }

    /**
     *  将channel和对应的name添加到ConcurrentHashMap
     */
    public static void addChannel(ChannelId channelId,Channel channel){
        CHANNEL_MAP.put(channelId,channel);
    }

    /**
     *  移除掉name对应的channel
     */
    public static boolean removeChannelByName(ChannelId channelId){
        if(CHANNEL_MAP.containsKey(channelId)){
            CHANNEL_MAP.remove(channelId);
            return true;
        }
        return false;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/947505.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从0到机器视觉工程师(二):封装调用静态库和动态库

目录 静态库 编写静态库 使用静态库 方案一 方案二 动态库 编写动态库 使用动态库 方案一 方案二 方案三 总结 静态库 静态库是在编译时将库的代码合并到最终可执行程序中的库。静态库的优势是在编译时将所有代码包含在程序中&#xff0c;可以使程序独立运行&…

低代码开发:开启企业数智化转型“快捷键”

一、低代码开发浪潮来袭&#xff0c;企业转型正当时 在当今数字化飞速发展的时代&#xff0c;低代码开发已如汹涌浪潮&#xff0c;席卷全球。从国际市场来看&#xff0c;诸多企业巨头纷纷布局低代码领域&#xff0c;像微软的 PowerApps、OutSystems 等平台&#xff0c;凭借强大…

UE5动画蓝图

动画蓝图&#xff0c;混合空间&#xff0c;状态机&#xff0c;瞄准偏移&#xff0c;动画蒙太奇&#xff0c;动画混合&#xff0c;骨骼绑定&#xff0c;动画重定向&#xff0c;动画通知&#xff0c;Control Rig…… 虚幻动画模块是一个庞大的系统&#xff0c;大模块里又包含很多…

[redux] useDispatch的两种用法

先重写2个方法先, 方便ts类型推导,如果你看不懂为什么这么写, 先看我这篇 [redux] ts声明useSelector和useDispatch-CSDN博客 export type RootState ReturnType<typeof store.getState>; export type AppDispatch typeof store.dispatch; export const useAppDispat…

javaEE-网络原理-1初识

目录 一.网络发展史 1.独立模式 2.网络互联 二.局域网LAN 1.基于网线直连&#xff1a; 2.基于集线器组件&#xff1a; 3.基于交换机组件&#xff1a; 4.基于交换机和路由器组件 ​编辑 三、广域网WAN 四、网络通信基础 1.ip地址 2.端口号&#xff1a; 3.协议 4.五…

jenkins入门3

1、新建视图 视图可以理解为是item的集合&#xff0c;这样可以将item分类。新建视频可以选择加入已有的item 2、新建item 1)输入任务名称、选择一个类型&#xff0c;常用的是第一个freestyle project 2&#xff09;进行item相关配置&#xff0c;general 设置项目名字,描述,参数…

【C语言的小角落】--- 深度理解取余/取模运算

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; C语言的小角落 本篇博客我们来深度理解取余/取模&#xff0c;以及它们在不同语言中出现不同现象的原因。 &#x1f3e0; 关于取整 &#x1f3b5; 向0取整…

mapbox进阶,添加路径规划控件

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️MapboxDirections 控件二、🍀添加路径规划控件1. ☘️实现思路2. ☘️…

linux-25 文件管理(三)复制、移动文件,cp,mv

命令cp是copy的简写&#xff0c;而mv则是move的简写。那既然copy是用于实现复制文件的&#xff0c;那通常一般我们要指定其要复制的是谁&#xff1f;而且复制完以后保存在什么地方&#xff0c;对吧&#xff1f;那因此它的使用格式很简单&#xff0c;那就是cp srcfile dest&…

IDEA开发Java应用的初始化设置

一、插件安装 如下图所示&#xff1a; 1、Alibaba Java Coding Guidelines 2.1.1 阿里开发者规范&#xff0c;可以帮忙本地自动扫描出不符合开发者规范的代码&#xff0c;甚至是代码漏洞提示。 右击项目&#xff0c;选择《编码规约扫描》&#xff0c;可以进行本地代码规范扫…

GPU加速计算的专业云服务平台:蓝耘GPU算力平台的概述、具体应用与教学

文章目录 一、平台介绍蓝耘GPU算力平台概述平台优势与特点 二、注册与登录账号注册流程GPU服务器类型配置选择指南内存和存储容量网络带宽CPU配置 三、创建实例**实例创建步骤**镜像选择与设置 四、连接实例SSH连接方法远程桌面配置 一、平台介绍 蓝耘GPU算力平台概述 蓝耘GP…

golang:微服务架构下的日志追踪系统(二)

背景 在使用Gin框架进行服务开发时&#xff0c;我们遇到了一个日志记录的问题。由于Gin的上下文&#xff08;*gin.Context&#xff09;实现了context.Context接口&#xff0c;在调用日志记录器的Info、Warn、Error等方法时&#xff0c;直接传递Gin的上下文通常不会导致编译错误…

Vue项目整合与优化

前几篇文章&#xff0c;我们讲述了 Vue 项目构建的整体流程&#xff0c;从无到有的实现了单页和多页应用的功能配置&#xff0c;但在实现的过程中不乏一些可以整合的功能点及可行性的优化方案&#xff0c;就像大楼造完需要进行最后的项目验收改进一样&#xff0c;有待我们进一步…

网关的介绍

网关&#xff08;Gateway&#xff09;在网络技术中扮演着举足轻重的角色。为了让你更好地理解网关及其相关术语&#xff0c;我会尽量用简洁明了的语言来解释&#xff0c;同时也会穿插一些专业术语以便你深入学习。 网关的基本概念 网关&#xff0c;顾名思义&#xff0c;是网络的…

【C语言程序设计——循环程序设计】枚举法换硬币(头歌实践教学平台习题)【合集】

目录&#x1f60b; 任务描述 相关知识 一、循环控制 / 跳转语句的使用 1. 循环控制语句&#xff08;for 循环&#xff09; 2. 循环控制语句&#xff08;while 循环&#xff09; 3. 跳转语句&#xff08;break 语句&#xff09; 4. 跳转语句&#xff08;continue 语句&…

SD-WAN怎样减少异地组网的网络延迟?

在经济全球化的推动下&#xff0c;许多企业的业务已经扩展到多个国家或地区。这种情况下&#xff0c;企业需要搭建高效、稳定的网络连接&#xff0c;以确保异地的分支机构之间能够顺畅地交流。网络延迟是拉低异地组网数据传输效率的重要因素&#xff0c;直接影响到企业的运营和…

小程序学习06——uniapp组件常规引入和easycom引入语法

目录 一 组件注册 1.1 组件全局注册 1.2 组件全局引入 1.3 组件局部引入 页面引入组件方式 1.3.1 传统vue规范&#xff1a; 1.3.2 通过uni-app的easycom 二 组件的类型 2.1 基础组件列表 一 组件注册 1.1 组件全局注册 &#xff08;a&#xff09;新建compoents文件…

uniapp 微信小程序 自定义日历组件

效果图 功能&#xff1a;可以记录当天是否有某些任务或者某些记录 具体使用&#xff1a; 子组件代码 <template><view class"Accumulate"><view class"bx"><view class"bxx"><view class"plank"><…

上升沿下降沿递增

沿指令&#xff1a;P&#xff1a;上升沿 从01 导通一个扫描周期 N&#xff1a;下降沿 从10 导通一个扫描周期

大数据-268 实时数仓 - ODS层 将 Kafka 中的维度表写入 DIM

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; MyBatis 更新完毕目前开始更新 Spring&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; H…