Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制

Netty

  • 自定义消息协议的实现逻辑
    • 自定义编码器
  • 心跳机制
    • 实现客户端发送心跳包

自定义消息协议的实现逻辑

在这里插入图片描述
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。

自定义编码器

自定义消息协议:

//自定义消息协议
public class MessageProtocal {
    //消息的长度
    private int length;
    //消息的内容
    private byte[] content;

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        //设置相关的参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加处理器,分包编码器
                        pipeline.addLast(new MessageEncoder());
                        //添加具体的业务处理器
                        pipeline.addLast(new NettyMessageClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

客户端业务代码

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {
    //连接通道创建后要向服务端发送消息

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<200;i++){
            String msg = "西安科技大学";
            //创建消息协议对象
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);
            messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));
            //发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
            ctx.writeAndFlush(messageProtocal);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {

    }
}

自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

服务端基本代码

public class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boosGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加解码器
                        pipeline.addLast(new MessageDecoder());
                        pipeline.addLast(new NettyMessageServerHandler());
                    }
                });
        System.out.println("Netty的服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

自定义解码器

//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {
    int length = 0;


    //ctx
    //in:客户端发送来的MessageProtocol编码后的ByteBuf数据
    //out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("ByteBuf:"+in);
        //获得前面的4个字节的数据 == 描述实际内容的长度
        if(in.readableBytes()>=4){
            //ByteBuf里面可能有MessageProtocol数据
            if(length==0){
                length = in.readInt();
            }
            //length = 15
            if(in.readableBytes()<length){
                //说明数据还没到齐,等待下一次调用decode
                System.out.println("当前数据量不够,继续等待");
                return;
            }
            //可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了
            //创建了一个指定length长度的字节数组
            byte[] content = new byte[length];
            //把ByteBuf里面的指定长度的数据读到content数组中
            in.readBytes(content);
            //创建协议MessageProtocol对象赋值
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(length);
            messageProtocal.setContent(content);
            out.add(messageProtocal);
            length=0;
        }
    }
}

服务端业务处理代码

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
        System.out.println("---服务器收到的数据---");
        System.out.println("消息的长度:"+msg.getLength());
        System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
    }
}

运行结果:
在这里插入图片描述


心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端

在这里插入图片描述

实现客户端发送心跳包

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());

                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();
        //模拟向服务端发送心跳数据
        String packet = "heartbeat packet";
        Random random = new Random();
        Channel channel = channelFuture.channel();
        while (channel.isActive()){
            //随机的事件来实现时间间隔等待
            int num = random.nextInt(10);
            Thread.sleep(num*1000);
            channel.writeAndFlush(packet);
        }
        group.shutdownGracefully();
    }
}

客户端拦截器

public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("客户端收到的数据"+s);
    }
}

在这里插入图片描述
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。

服务端基本代码

public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件
                        //创建出IdleStateEvent对象,将该对象交给下一个Handler
                        pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
                        //HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理
                        pipeline.addLast(new HeartbeatServerHandler());
                    }
                });
        System.out.println("Netty服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

服务端业务代码

public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("服务端收到的心跳"+s);
        channelHandlerContext.writeAndFlush("服务端已经收到了心跳");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent)evt;
        switch (event.state()){
            case READER_IDLE:
                readIdleTimes++;
                break;
            case WRITER_IDLE:
                System.out.println("写超时");
                break;
            case ALL_IDLE:
                System.out.println("读写超时");
                break;
        }
        if(readIdleTimes>3){
            System.out.println("读超时超过三次,关闭连接");
            ctx.writeAndFlush("超时关闭");
            ctx.channel().close();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/56476.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于latch up的重读

衬底电流容易导致寄生三极管导通(衬底电阻衬底电流》衬底压差)&#xff0c;更容易触发latchup&#xff1b; 一般常用的实际产品中会用衬底隔离的器件来做负压器件&#xff1b;用DNW&NBL组成一个隔离盆将整个负压区和正常电路分开&#xff0c;DNW&NBL接高电压&#xff1…

抄写Linux源码(Day7:读闪客文章第二回 “自己给自己挪个地儿”)

闪客文章地址&#xff1a;https://mp.weixin.qq.com/s?__bizMzk0MjE3NDE0Ng&mid2247499274&idx1&sn23885b5b1344a1425f5a971d06ad2e7d&chksmc2c584a7f5b20db1b0a75ea896e7218a9f8bcd006e68f53693bab240b13f9e2fb0ec0c9b9a6a&cur_album_id2123743679373688…

iMX6ULL驱动开发 | 让imx6ull开发板支持usb接口FC游戏手柄

手边有一闲置的linux开发板iMX6ULL一直在吃灰&#xff0c;不用来搞点事情&#xff0c;总觉得对不住它。业余打发时间就玩起来吧&#xff0c;总比刷某音强。从某多多上8块儿大洋买来一个usb接口的游戏手柄&#xff0c;让开发板支持以下它&#xff0c;后续就可以接着在上面玩童年…

BUU [网鼎杯 2020 朱雀组]phpweb

BUU [网鼎杯 2020 朱雀组]phpweb 众生皆懒狗。打开题目&#xff0c;只有一个报错&#xff0c;不知何从下手。 翻译一下报错&#xff0c;data()函数:,还是没有头绪&#xff0c;中国有句古话说的好“遇事不决抓个包” 抓个包果然有东西&#xff0c;仔细一看这不就分别是函数和参…

软件外包开发的JAVA开发框架

Java的开发框架有很多&#xff0c;以下是一些常见的Java开发框架及其特点&#xff0c;每个框架都有其特定的使用场景和优势&#xff0c;开发者可以根据项目的需求选择合适的框架。今天和大家介绍常见的框架及特点&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&…

【Golang 接口自动化01】使用标准库net/http发送Get请求

目录 发送Get请求 响应信息 拓展 资料获取方法 发送Get请求 使用Golang发送get请求很容易&#xff0c;我们还是使用http://httpbin.org作为服务端来进行演示。 package mainimport ("bytes""fmt""log""net/http""net/url&qu…

echarts图表基本使用

折线图 import * as echarts from echarts;const chartDom document.getElementById(main); const myChart echarts.init(chartDom); const option {xAxis: {type: category,data: [Mon, Tue, Wed, Thu, Fri, Sat, Sun]},yAxis: {type: value},series: [{data: [820, 932, …

【HarmonyOS】键盘遮挡输入框时,实现输入框显示在键盘上方

【关键字】 harmonyOS、键盘遮挡input&#xff0c;键盘高度监听 【写在前面】 在使用API6、API7开发HarmonyOS应用时&#xff0c;常出现页面中需要输入input&#xff0c;但是若input位置在页面下方&#xff0c;在input获取焦点的时候&#xff0c;会出现软键盘挡住input情况&a…

【JAVA】String ,StringBuffer 和 StringBuilder 三者有何联系?

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言StringBufferStringBuffer方法 StringBuilderStringBuilder方法 String &#xff0c;StringBuffer 和 StringBuilder的区别String和StringBuffer互相转换 前言 在之前的文章…

解密Redis:应对面试中的缓存相关问题

文章目录 1. 缓存穿透问题及解决方案2. 缓存击穿问题及解决方案3. 缓存雪崩问题及解决方案4. Redis的数据持久化5. Redis的过期删除策略和数据淘汰策略6. Redis分布式锁和主从同步7. Redis集群方案8. Redis的数据一致性保障和高可用性方案 导语&#xff1a; 在面试过程中&#…

mysql 锁

1. 概述 锁 是计算机协调多个进程或线程 并发访问某一资源 的机制。在程序开发中会存在多线程同步的问题&#xff0c;当多个线程并发访问某个数据的时候&#xff0c;尤其是针对一些敏感数据&#xff08;比如订单&#xff0c;金额等&#xff09;&#xff0c;我们就需要保证这个数…

【redis】创建集群

这里介绍的是创建redis集群的方式&#xff0c;一种是通过create-cluster配置文件创建部署在一个物理机上的伪集群&#xff0c;一种是先在不同物理机启动单体redis&#xff0c;然后通过命令行使这些redis加入集群的方式。 一&#xff0c;通过配置文件创建伪集群 进入redis源码…

R语言【Tidyverse、Tidymodel】的机器学习方法

机器学习已经成为继理论、实验和数值计算之后的科研“第四范式”&#xff0c;是发现新规律&#xff0c;总结和分析实验结果的利器。机器学习涉及的理论和方法繁多&#xff0c;编程相当复杂&#xff0c;一直是阻碍机器学习大范围应用的主要困难之一&#xff0c;由此诞生了Python…

牛客网Verilog刷题——VL55

牛客网Verilog刷题——VL55 题目答案 题目 请用Verilog实现4位约翰逊计数器&#xff08;扭环形计数器&#xff09;&#xff0c;计数器的循环状态如下&#xff1a;   电路的接口如下图所示&#xff1a; 输入输出描述&#xff1a; 信号类型输入/输出位宽描述clkwireInput1系统…

数据库的分库分表

#!/bin/bash ######################### #File name:db_fen.sh #Version:v1.0 #Email:admintest.com #Created time:2023-07-29 09:18:52 #Description: ########################## MySQL连接信息 db_user"root" db_password"RedHat123" db_cmd"-u${…

Gitlab CI/CD笔记-第一天-GitOps和以前的和jenkins的集成的区别

一、GitOps-CI/CD的流程图 简单解释&#xff1a; 1.提交代码 2.编译构建 3.测试 4.部署 二、gitlab的实现 1、Runer 1.这个就是jenkins里的worker-slave的角色&#xff0c; 2.git-lab server 下发任务&#xff0c;Runner执行。 3.这个R…

windows上给oracle打补丁注意事项

打补丁的过程 1、升级opatch工具&#xff0c;检查剩余空间用于存放ORACLE_HOME的备份&#xff0c;设置oracle_home环境变量,通过readme中的先决条件来检查现有补丁是否和本次补丁冲突 2、opatch apply 升级数据库软件&#xff0c;这个必须数据库文件不要被进程调用 在windows上…

路由动态选择协议之RIP(路由信息协议)

软件&#xff1a;cicso packet tracer 8.0 拓扑图&#xff1a;路由器&#xff1a;Router-PT、连接线&#xff1a;Serial DTE、连接口&#xff1a;Serial口 1、配置基础ip R1配置&#xff1a;虚拟接口——1.1.1.1&#xff1b;S3/0——192.168.1.1 R1(config)#int s3/0 R1(con…

Spring之BeanDefinition(三)

Spring之BeanDefinition&#xff08;三&#xff09; 文章目录 Spring之BeanDefinition&#xff08;三&#xff09;一、Spring的启动类三行代码研究二、Spring创建工厂类型和属性三、Spring中内置的BeanDefinition四、注册配置类五、BeanDefinition总结 一、Spring的启动类三行代…

C++ 对象的生存期

对象&#xff08;包括简单变量&#xff09;都有诞生和消失的时刻。对象诞生到结束的这段时间就是它的生存期。在生存期内&#xff0c;对象将保持它的状态&#xff08;即数据成员的值&#xff09;&#xff0c;变量也将保持它的值不变&#xff0c;直到它们被更新为止。对象的生存…