Netty应用(十) 之 自定义编解码器 自定义通信协议

目录

25.自定义编解码器

25.1 自定义编解码器编码

25.2 自定义编解码器的总结和补充

26.自定义通信协议

26.1 关于通信协议的关注点

26.2 自定义通信协议的格式

26.3 编解码


25.自定义编解码器

有了上面这个大体框架的流程之后,我们来聊一个非常特殊的:

比如我们在客户端想把字符串"10-20"经过编码后转为long类型,然后转为二进制存储到ByteBuf中,通过网络IO发出去,服务端接收到ByteBuf后,解码为long类型

那么在netty中是没有这么个编解码器的,所以我们需要自己去实现这个编解码器。在框架中,你要扩展自定义一个组件也好,功能也罢,需要遵循它们的规范,也就是继承它的一些接口或者实现它的一些实现类。比如:spring中的Event那个事件通知。

你自定义的东西只有实现别的规范,这样才能被框架体系发现你自定义的类,进而你自定义的东西才能和框架融为一体,融为一体后才能实现你的功能,框架才会调用你。

而对于编码器,我们需要继承的是MessageToByteEncoder,对于解码器,我们需要继承的是ByteToMessageDecoder。然后在pipline中生效,这就是它的逻辑。

于是基于这个逻辑来实现我们的自定义编解码器

25.1 自定义编解码器编码

需求:

1.我们在客户端发送一个字符串"10~20",这样的数据,然后我们希望发送出去的时候是以分隔符为界限,发出去两个long类型的数据

2.服务端接收到之后把数据从ByteBuf中取出来

需求分析:

这就是需要一组编解码器,编码器把字符串分隔开,然后转为两个long发出去

解码器把接收到的ByteBuf数据拿到手,解码转为long类型处理

具体实现如下:

  • 客户端实现
package com.messi.netty_core_02.netty10;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

public class MyNettyClient {


    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group);

        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                //日志输出的编码器
                pipeline.addLast(new LoggingHandler());
                //我们自定义的编码器
                pipeline.addLast(new MyLong2ByteEncoder());
            }
        });

        Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();
        channel.writeAndFlush("10-20");
        group.shutdownGracefully();
    }



}
package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyLong2ByteEncoder extends MessageToByteEncoder<String> {

    private static final Logger log = LoggerFactory.getLogger(MyLong2ByteEncoder.class);

    /**
     *
     * @param ctx           上下文对象
     * @param msg           等待编码的数据
     * @param out           编码后的数据为二进制字节流格式,都存储到out中交给Netty,netty负责把ByteBuf数据交给socket缓冲区
     *                      socket缓冲区会把数据交给网络IO通信发给服务端
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        log.info("MyLong2ByteEncoder.encode start ~~~ ") ;
        if (msg == null) {
            return;
        }
        String[] msgs = msg.split("-");
        for (String message : msgs) {
            long longMsg = Long.parseLong(message);
            //每一个long类型的数据都在ByteBuf中占用8字节的大小空间
            out.writeLong(longMsg);
        }
    }
}
  • 服务端实现
package com.messi.netty_core_02.netty10;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyNettyServer {

    private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler());
                pipeline.addLast(new MyByte2LongDecoder());
                pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        if(msg instanceof Long) {
                            Long data = (Long) msg;
                            log.info("得到的客户端输入为:{}",data) ;
                        }
                    }
                });
            }
        });

        serverBootstrap.bind(8000);
    }

}
package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MyByte2LongDecoder extends ByteToMessageDecoder {

    private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder.class);

    /**
     *
     * @param ctx          上下文对象
     * @param in           网络IO传输过来的数据,存储到ByteBuf类型的in中
     * @param out          解码处理完后,把每一个"Message"都存储到out这一集合中,便于后续Netty遍历该out集合进行执行pipeline流水线的Handler操作
     *                     Netty是以消息Message为单位进行处理数据的,有多少Message,就执行多少次pipeline
     *                     ---->
     *                     如果字节数不超过滑动窗口,socket缓冲区,ByteBuf缓冲区,客户端只发一次数据即可,为什么服务端能划分出多个Message呢?
     *                     原因很简单:每调用一次decode进行处理ByteBuf,那么就会封装成一个Message。
     *                     如果服务端一次没有处理完ByteBuf,那么会进行第二次调用decode方法的处理,则第二次处理ByteBuf,则又会生成一个新的Message
     *                     以此类推,直到ByteBuf中的数据都处理完。
     *                     调用n次decode方法,处理ByteBuf n次,存储n个Message到out集合中
     *                     ---->
     *                     啥时候意味着ByteBuf中的数据都处理完?
     *                     其实还是指针的移动,思考一下之前NIO的各种指针,当读指针到达limit指针边界后就意味着ByteBuf处理完毕
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2LongDecoder解码器start~~~");
        //获得ByteBuf中读写指针之间可读的数据长度
        int readableBytes = in.readableBytes();
        //如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题
        if (readableBytes >= 8) {
            //发送的是"10-20",第一次拿到的是10L(long类型),第二次是20L
            long reciveLong = in.readLong();
            out.add(reciveLong);
        }
    }
}
  • 测试输出

客户端输出:

由于自定义编码中去除了分隔符"-",所以只发送16字节的数据给服务端。注意:是一次性发送16字节的数据给服务端

服务端输出:

如下图所示,服务端自定义解码器调用了两次,为什么?

因为在自定义解码器的逻辑中,一次只处理了ByteBuf中8字节的数据大小,所以一次处理不完ByteBuf,所以会多次处理,多次处理会多次调用decode方法,会多次加入"Message"到out这一List集合中。由于out这一集合中有多个Message,所以pipeline流水线的Handler会执行多次。

总结:

首先明确一点,客户端对于这两个long类型的数字,1020,他是只发送了一次的,因为我们首先看到这个数据的结构,其次日志只输出了一次。

再次我们来看服务端日志,数据是处理了两次的,因为输出了两次,解码器处理了两次。那么这是为

啥。

1、首先不是半包黏包,因为我们的缓冲区不至于连两个long都拿不到。

2、因为是两个消息,我们的数据在被解码器处理之后客户端那里其实是把数据处理位两个long发过来

的,服务端这里解码处理其实得到两个消息message,添加到out集合里面是两个消息。然后就处理了两次,这里有个问题就是,当你的ByteBuf的数据一次没处理完,那就会继续调用decode方法,进行再次处理,因为我们的数据是16字节,第一次值处理了八个字节,因为我们读取的就是一个readLonglong就是读取读写指针的前八个字节,所以处理了两次。

  • 为了验证上述结论,我们更改一下服务端以及自定义解码器,其余的代码不修改:
package com.messi.netty_core_02.netty10;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyNettyServer {

    private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler());
                pipeline.addLast(new MyByte2LongDecoder());
                pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                        if(msg instanceof Long) {
//                            Long data = (Long) msg;
//                            log.info("得到的客户端输入为:{}",data) ;
//                        }
                        //验证读取n次ByteBuf,调用n次decode方法,对应有n个Message,会添加n个Message到out集合
                        //netty拿到该out集合,会遍历并且让每一个Message都执行一遍pipeline流水线
                        log.info("得到的客户端输入为:{}",msg.toString());
                    }
                });
            }
        });

        serverBootstrap.bind(8000);
    }

}
package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MyByte2LongDecoder extends ByteToMessageDecoder {

    private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder.class);

    /**
     *
     * @param ctx          上下文对象
     * @param in           网络IO传输过来的数据,存储到ByteBuf类型的in中
     * @param out          解码处理完后,把每一个"Message"都存储到out这一集合中,便于后续Netty遍历该out集合进行执行pipeline流水线的Handler操作
     *                     Netty是以消息Message为单位进行处理数据的,有多少Message,就执行多少次pipeline
     *                     ---->
     *                     如果字节数不超过滑动窗口,socket缓冲区,ByteBuf缓冲区,客户端只发一次数据即可,为什么服务端能划分出多个Message呢?
     *                     原因很简单:每调用一次decode进行处理ByteBuf,那么就会封装成一个Message。
     *                     如果服务端一次没有处理完ByteBuf,那么会进行第二次调用decode方法的处理,则第二次处理ByteBuf,则又会生成一个新的Message
     *                     以此类推,直到ByteBuf中的数据都处理完。
     *                     调用n次decode方法,处理ByteBuf n次,存储n个Message到out集合中
     *                     ---->
     *                     啥时候意味着ByteBuf中的数据都处理完?
     *                     其实还是指针的移动,思考一下之前NIO的各种指针,当读指针到达limit指针边界后就意味着ByteBuf处理完毕
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2LongDecoder解码器start~~~");
        //获得ByteBuf中读写指针之间可读的数据长度
        int readableBytes = in.readableBytes();
        //如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题
        if (readableBytes >= 8) {
            //发送的是"10-20",第一次拿到的是10L(long类型),第二次是20L
//            long reciveLong = in.readLong();
//            out.add(reciveLong);

            //如果一次读取16字节,是不是可以一次读完ByteBuf,那么就只会添加一个Message到out集合中,那么只会调用一次pipeline流水线
            out.add(in.readBytes(16));
        }
    }
}

测试输出:

客户端:客户端还是只发了一次数据就完事了

服务端:

接收到客户端一次发送的数据后存储到ByteBuf中,但是会多次解析读取该ByteBuf,多次调用decode方法,把每一次decode解码的Message存储到out集合中,把out交给netty,之后会遍历out集合,让每一个Message都对应调用一遍pipeline流水线。pipeline流水线的Handler拿到的数据就是out集合中解码后的数据

,然后我们就可以根据该解码后的数据去做一系列的操作了。

为什么只执行一次?

因为ByteBuf被一次读取完了,哈哈哈,那么就肯定只执行一次decode方法了。所以out集合中只会添加一个Message,那么只会调用一遍pipeline流水线的Handler。

25.2 自定义编解码器的总结和补充

1.自定义编码器

继承 MessageToByteEncoder,自定义实现其encode方法的逻辑

2.自定义解码器

继承 ByteToMessageDecoder,自定义实现其decode方法的逻辑

其实还有一种设计方法,我们前面说过存在一个集合体,拥有编解码器的能力的一个类。

ByteToMessageCodec,于是我们可以继承这个类,实现两个方法一个是decode一个是encode

这种方式就可以替代上述两种自定义编码器和解码器,操作步骤如下:

  • MyLongCodec类的设计
package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MyLongCodec extends ByteToMessageCodec<String> {

    private static final Logger log = LoggerFactory.getLogger(MyLongCodec.class);


    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        log.info("MyLong2ByteEncoder.encode start ~~~ ") ;
        if (msg == null) {
            return;
        }
        String[] msgs = msg.split("-");
        for (String message : msgs) {
            long longMsg = Long.parseLong(message);
            //每一个long类型的数据都在ByteBuf中占用8字节的大小空间
            out.writeLong(longMsg);
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2LongDecoder解码器start~~~");
        //获得ByteBuf中读写指针之间可读的数据长度
        int readableBytes = in.readableBytes();
        //如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题
        if (readableBytes >= 8) {
            //发送的是"10-20",第一次拿到的是10L(long类型),第二次是20L
//            long reciveLong = in.readLong();
//            out.add(reciveLong);

            //如果一次读取16字节,是不是可以一次读完ByteBuf,那么就只会添加一个Message到out集合中,那么只会调用一次pipeline流水线
            out.add(in.readBytes(16));
        }
    }
}
  • 客户端与服务端的设计如下
package com.messi.netty_core_02.netty10;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

public class MyNettyClient {


    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group);

        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                //日志输出的编码器
                pipeline.addLast(new LoggingHandler());
                //我们自定义的编码器
//                pipeline.addLast(new MyLong2ByteEncoder());
                pipeline.addLast(new MyLongCodec());
            }
        });

        Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();
        channel.writeAndFlush("10-20");
        group.shutdownGracefully();
    }



}
package com.messi.netty_core_02.netty10;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyNettyServer {

    private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler());
//                pipeline.addLast(new MyByte2LongDecoder());
                pipeline.addLast(new MyLongCodec());
                pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                        if(msg instanceof Long) {
//                            Long data = (Long) msg;
//                            log.info("得到的客户端输入为:{}",data) ;
//                        }
                        //验证读取n次ByteBuf,调用n次decode方法,对应有n个Message,会添加n个Message到out集合
                        //netty拿到该out集合,会遍历并且让每一个Message都执行一遍pipeline流水线
                        log.info("得到的客户端输入为:{}",msg.toString());
                    }
                });
            }
        });

        serverBootstrap.bind(8000);
    }

}

测试输出:

客户端:

服务端:

测试通过,成功。

其实还有一种解码器ReplayingDecoder

这个解码器是netty封装的,这个解码器比较强大,我们在使用这个解码器的时候是不需要做一些安全性校验的。比如我们之前的解码器,做了一个长度判断,如下伪代码所示:

// 获得byteBuf中读写指针之间可读的数据长度

int readableBytes = in.readableBytes();

// 我们客户端写的数据是long占八字节,所以这里判断接收到的数据长度是不是够

if(readableBytes >= 8){

这个检验是因为我们担心服务端接收到的长度不够,丢失数据精度或者是异常抛出等等。如果不做长度判断,那么这一次读取错了,以后只会越读越错误。

甚至更有可能:接收到的根本不是这个类型的数据!比如说发送过来的是:两个整型或8个Byte类型,这一共也是8字节,所以肯定会有问题。

如果出现以上问题,你读错误一次,就算你发现该错误或抛出异常,也无法挽回,为什么无法挽回呢?因为你读取ByteBuf的数据是会改变读指针的,一旦改变指针,你就无法再往回读了。

所以这里我们可以使用到之前NIO总结的一种回滚指针的解决思路,当然netty中也有对应的回滚指针解决方案,修改代码如下所示:

package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MyByte2LongDecoder2 extends ByteToMessageDecoder {

    private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder2.class);

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2LongDecoder2解码器执行");

        int readableBytes = in.readableBytes();
        if (readableBytes >= 8) {
            //标记读指针此时的位置,错误时回滚到该位置
            in.markReaderIndex();
            try {
                //发过来的是"10-20",第一次读取的是20L,第二次读取的是20L
                long reciveLong = in.readLong();
                out.add(reciveLong);
            } catch (Exception e) {
                //发生异常时,回滚到标记位置
                in.resetReaderIndex();
            }

        }
    }
}

但是你要是继承实现了ReplayingDecoder,这个解码器,那你那些安全校验,全部取消,变为如下这样。

代码如下:

package com.messi.netty_core_02.netty10;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class MyByte2LongDecoder3 extends ReplayingDecoder {

    private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder2.class);

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2LongDecoder2解码器执行");
        //发过来的是"10-20",第一次读取的是20L,第二次读取的是20L
        long reciveLong = in.readLong();
        out.add(reciveLong);
    }
    
    
}

原理很简单:底层帮你处理了所有的安全问题,重置了ByteBuf,传递给你的是一个安全的处理过的ByteBuf,不管你什么异常什么问题,都会给你解决。

# 所以你的netty编程,要考虑如下方面:

1. ByteBuf的读写指针要考虑是不是正确,错了是不是能回滚。

2. ByteBuf是不是合理释放了。

26.自定义通信协议

26.1 关于通信协议的关注点

网络传输有很多协议了,像HTTP这种都是应用层协议,但是有时候我们在做网络通信的时候,需要自己定义一些协议,我们能够自定义出来的协议都是属于上层(应用层)的协议,因为都是基于底层封装好的TCP/UDP网络协议栈的基础协议。我们自定义出一些协议,来约束双方的接收和发送,这种时候需要我们来自定义通信协议,也就是消息的格式。

说白了就是客户端与服务端传输的格式,样式

在这个数据传递的过程中,你需要注意的就是除了基本数据的定义格式,还需要注意传递数据的长度,不然会出现半包粘包问题。

而且除了你的业务数据,还需要一些元数据(如:数据长度等),这些都要考虑,来举一个登录的例子:

版本1:

我们设计一个消息结构,就是json格式的内容:{"name":"xxx","password":"xxx"}

登录信息,无外乎就是账号密码。我们这么设计没毛病,但是没意义,首先没有数据长度,会出现半包粘包问题。

于是我们需要增加一个数据长度

版本2:

协议正文,或者叫做协议体,消息正文依然是账号密码

{"name":"xxx","password":"123456"}

正文长度:告诉服务端你一次读取多少数据,避免半包粘包

魔数:每个系统之间的一个读取标识,比如用来标识我们是一个集团或系统的,你就需要携带一个CAFEBABY过来,我就知道你的消息是我们系统的了,不携带这个魔数一律认为不合规,就丢弃掉。这是一个牌号,校验使用的,不然你随意一个系统知道ip,端口号就都能给我发数据,这不就乱了。

协议版本号:协议可能升级,告诉服务端当前协议是哪一个版本,服务端就采用对应的逻辑操作

指令类型:业务操作的编号,比如1是登录,2是注册,3是注销等等,告诉服务端具体使用的是哪种操作,http设计的时候实际上不需要这个,因为它没有什么业务操作,就是收发数据,然后就没了。但是你使用http协议想要做类型区别的话,那么你要做类型区别那就在请求里面加你的参数,然后服务端后台去判断。比如你url里面是delete那么就走去delete删除业务

26.2 自定义通信协议的格式

json序列化方式:

在有了上面的考虑点,我们现在可以定义一个我们消息的格式了,在定义格式之前,我们先考虑以什么形式构建,我们最熟悉的格式就是json,用json格式来组织起来上面的注意点。我们来看一个现在的结构。

大体上我们在客户端组织的消息格式就是这样的,然后我们经过封装为ByteBuf发送出去。

在服务端,用new JsonObjectDecode完成解码,进行后续的handler业务处理。

json的设计格式还行,json的字段本身就是能随便加的,但是这个格式有问题,就是他的长度太大了,因为我们这个json格式他全是字符串,在传输过程中不如二进制,或者其他的编码格式更加精简和体量小。但是json也有他的好处,就是可读性强,内部处理很方便。所以我们要是非要用json,建议除了消息体其他都用二进制。

  • 但是一般在高并发开发中,并且如果是抛开SpringCloud-Web那一套Http协议,在netty原生开发时,我们可以自定义协议自定义序列化方式的话,我们有如下序列化方式的选择建议:

头信息(头信息是指除了消息正文content这一字段外的其他字段数据):一般都使用二进制byte直接存储

消息正文:json/java序列化/protobuf

为什么头信息不也直接使用java序列化或protobuf呢?java序列化或protobuf不也都是序列化成二进制?

答案其实很简单,直接写入二进制byte不带有对存储数据的额外描述信息。java序列化或protobuf把数据序列化成二进制,但是会带有许多额外的描述信息来保证可以反序列化成功,相比直接写入byte 占用的空间肯定要大很多。

为什么要使用java序列化或protobuf序列化取代json?

json序列化后的数据格式带有太多无用的符号,并且json序列化后的是字符串,所有的数据都是字符格式,所谓字符格式,一个普通字符占2字节,一个汉字占用3字节。而你protobuf或java序列化后的数据是二进制格式,8个二进制等于1字节,高下立判?

并且序列化后需要进行统一编码成二进制字节流存储到ByteBuf或其他应用缓冲区中,便于后续发给socket内核缓冲区。如果采用json方式,字符编码成二进制字节格式,又是很大的性能消耗。但是如果采用java序列化或protobuf方式,本身序列化后就成为了二进制字节格式,不存在字符解析的性能消耗。是不是也是很大的性能优化?

补充:但是一般来说,我们使用的protobuf方式,而不采用java默认的序列化方式,因为protobuf性能更高。

  • 由于序列化方式的变化选择,我们得出一种最终方案

使用netty做自定义协议时:

头信息:

客户端编码使用二进制直接写入,服务端收到消息后解码直接读取二进制

消息正文:

客户端使用protobuf方式序列化原生消息成二进制数据,服务端使用protobuf反序列化二进制数据成原生消息数据

  • 由于序列化方式选择的不确定性,所以客户端发消息的时候要多加一个"序列化的方式"字段

该字段是用来标识选择的序列化方式是啥?

于是协议的内容就变成如下这样:

协议正文(或叫做协议体):

消息正文依然是账号密码,{"name":"xxx","password","123456"}

正文长度:

告诉服务端:我发的数据长度。所以服务端知道一次读取多少,避免半包粘包问题的发生

魔数:

每个系统之间的一个读取标识,比如用来标识我们是一个集团或系统的,你就携带一个CAFEBABY过来,我就知道你的消息是我们系统的了,不携带这个魔数则一律认为该请求消息是不合理的,就丢弃掉,这是一个牌号,校验使用的。不然你随意一个系统要是知道服务端的ip,端口的话,那岂不是都能给服务端发数据,那么不是乱了。

协议版本号:

协议可能升级,告诉服务端当前协议是哪一个版本,服务端采用对应的逻辑操作

指令类型:

业务操作的编号,比如1是登录,2是注册,3是注销等等,告诉服务端具体使用哪种操作。http设计的时候实际上不需要这个,因为他没什么业务操作,就是收发数据,没了。但是你使用http协议想要做类型区别的话,那么你要做类型区别那就在请求里面加你的参数,然后服务端后台去判断。比如你url里面是delete那么就走去delete删除业务

序列化的方式:

比如:1代表json 2代表protobuf 3代表Hessian 4代表java默认的序列化方式

26.3 编解码

在设计好了编码格式之后,我们就可以来考虑编解码了。

在这个过程中,我们的数据被封装成ByteBuf就发送出去了,但是思考一个问题,作为客户端你是写业务代码的,你发出去的应该就是一个纯净的对象,你不能在对象数据的属性中加上一个魔数,版本号什么的。你需要把对象数据之外的元数据加在对象外面并且协同对象一起发送过去,这个封装的过程我们就需要交给客户端对应的编码器去做。ofcourse,解析这个封装好的消息数据的过程肯定是交给接收消息的服务端解码器去做,服务端解析得到消息正文以及元数据,便于做之后的一系列业务操作。

编码过程如下

  • 编写消息类Message
package com.messi.netty_core_02.netty11;

import java.io.Serializable;

public class Message implements Serializable {

    private String username;

    private String password;

    public Message() {

    }

    public Message(String username,String password) {
        this.username = username ;
        this.password = password ;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "Message{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}
  • 客户端自定义编码器
package com.messi.netty_core_02.netty11;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

public class MyMessage2ByteEncoder extends MessageToByteEncoder<Message> {

    private static final Logger log = LoggerFactory.getLogger(MyMessage2ByteEncoder.class);

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        log.info("MyMessage2ByteEncoder.encode");

        //1.魔数 8字节
        out.writeBytes("leomessi".getBytes());
        //2.版本 1字节
        out.writeByte(1);
        //3.序列化方式 1字节 1代表json 2代表protobu 3代表hessian
        out.writeByte(1);
        //4.指令功能 1字节 1代表登录 2代表注册
        out.writeByte(1);
        //5.正文长度 4字节 使用int整型标识
        ObjectMapper objectMapper = new ObjectMapper();
        String jsonContent = objectMapper.writeValueAsString(msg);
        out.writeInt(jsonContent.length());
        //6.正文,直接写char序列
        out.writeCharSequence(jsonContent, Charset.defaultCharset());
    }
}
  • 客户端
package com.messi.netty_core_02.netty11;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

public class MyNettyClient {

    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LoggingHandler());
                pipeline.addLast(new MyMessage2ByteEncoder());
            }
        });
        Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();
        channel.writeAndFlush(new Message("leo","101010"));

        System.out.println("MyNettyClient.main");
        group.shutdownGracefully();
    }


}
  • 服务端自定义解码器
package com.messi.netty_core_02.netty11;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.List;

public class MyByte2MessageDecoder extends ByteToMessageDecoder {

    private static final Logger log = LoggerFactory.getLogger(MyByte2MessageDecoder.class);

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("MyByte2MessageDecoder.decode");

        //魔数
        ByteBuf magic = in.readBytes(8);
        log.info("魔数为:{}",magic);

        //协议版本号
        byte version = in.readByte();
        log.info("协议版本号为:{}",version);

        //序列化方式
        byte serializableType = in.readByte();
        log.info("序列化方式为:{}",serializableType) ;

        //指令功能
        byte funcNo = in.readByte();
        log.info("指令功能为:{}",funcNo) ;

        //正文长度
        int contentLength = in.readInt();
        log.info("正文长度为:{}",contentLength) ;

        Message message = null ;

        if (serializableType == 1 ){
            //说明序列化方式为json,此时服务端就要使用json反序列化方式进行反序列化
            ObjectMapper objectMapper = new ObjectMapper();
            message = objectMapper.readValue(in.readCharSequence(contentLength, Charset.defaultCharset()).toString(),Message.class);
        }

        out.add(message);

    }
}
  • 服务端
package com.messi.netty_core_02.netty11;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Description TODO
 * @Author etcEriksen
 * @Date 2023/12/20 9:43
 * @Version 1.0
 */
public class MyNettyServer {

    private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);

    public static void main(String[] args) {

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                /**
                 * maxFrameLength:数据包最大长度为1024字节
                 * lengthFieldOffset:数据长度字段前面有多少字节的偏移?
                 * lengthAdjustment:数据长度字段与真实数据体之间有多少距离?
                 * initialBytesToStrip:最终服务端输出的数据去除前面多少字节的长度?
                 * 具体见:Netty应用04-Netty这一笔记
                 */
                pipeline.addLast(new LoggingHandler());
                pipeline.addLast(new LengthFieldBasedFrameDecoder
                        (1024,11,4,0,0));
                pipeline.addLast(new MyByte2MessageDecoder());
                pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        Message message = (Message) msg ;
                        log.info("服务端接收到客户端的数据为:{}",message) ;
                    }
                });
            }
        });

        serverBootstrap.bind(8000);
        System.out.println("MyNettyServer.main");
    }

}
  • 输出

客户端输出:

服务端输出:

  • 补充

总结:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/385465.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

用脑想问题还是用心驱动脑?

昨天回答了几个朋友的问题&#xff0c;我发现提问题的人很少&#xff0c;这让我想起之前讲的小妞子的故事&#xff0c;我问了她好几个月的同一句话&#xff1a;你有问题吗&#xff1f; 结果她很反感&#xff0c;嘿嘿。其实吧&#xff0c;我讲的很多东西都是实的&#xff0c;反而…

【新手必看】解决GitHub打不开问题,亲测有效

&#x1f44b; Hi, I’m 货又星&#x1f440; I’m interested in …&#x1f331; I’m currently learning …&#x1f49e; I’m looking to collaborate on …&#x1f4eb; How to reach me … README 目录&#xff08;持续更新中&#xff09; 各种错误处理、爬虫实战及模…

冰雪遮盖着伏尔加河

三套车 - 杨洪基词&#xff1a;李幼客 曲&#xff1a;彼得格鲁波基 冰雪遮盖着伏尔加河 冰河上跑着三套车 有人在唱着忧郁的歌 唱歌的是那赶车的人小伙子你为什么忧愁 为什么低着你的头是谁叫你这样伤心 问他的是那乘车的人 你看吧这匹可怜的老马 它跟我走遍天涯可恨那财主要把…

MQTT的学习与应用

文章目录 一、什么是MQTT二、MQTT协议特点三、MQTT应用领域四、安装Mosquitto五、如何学习 MQTT 一、什么是MQTT MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的消息传输协议&#xff0c;设计用于在低带宽、不稳定的网络环境中进行高效的通信…

【头歌·计组·自己动手画CPU】二、运算器设计(讲解版) 【计算机硬件系统设计】

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

蓝桥杯——第 5 场 小白入门赛(c++详解!!!)

文章目录 1 十二生肖基本思路&#xff1a; 2 欢迎参加福建省大学生程序设计竞赛基本思路&#xff1a;代码&#xff1a; 3 匹配二元组的数量基本思路&#xff1a;代码: 4 元素交换基本思路&#xff1a;代码&#xff1a; 5 下棋的贝贝基本思路&#xff1a;代码&#xff1a; 6 方程…

8868体育助力西甲皇家马德里足球俱乐部 皇马占据争冠优势

西甲的皇家马德里足球俱乐部是8868体育合作的俱乐部之一&#xff0c;这支拥有悠久历史和辉煌成就的豪门球队&#xff0c;本赛季再次展现了它的强大实力。18胜4平1负&#xff0c;暂居榜首&#xff0c;这样的成绩足以让任何对手望而却步。然而&#xff0c;足球场上的对决永远充满…

【开源】JAVA+Vue.js实现海南旅游景点推荐系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四、核心代码4.1 随机景点推荐4.2 景点评价4.3 协同推荐算法4.4 网站登录4.5 查询景点美食 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的海南旅游推荐系统&#xff…

任务调度

1.学习目标 1.1 定时任务概述 1.2 jdk实现任务调度 1.3 SpringTask实现任务调度 1.4 Spring-Task 分析 1.5 Cron表达式 https://cron.qqe2.com/ 2. Quartz 基本应用 2.1 Quartz 基本介绍 2.2 Quartz API介绍 2.3 入门案例 <dependency> <groupId>org.springframe…

春晚联排第四次,明星们的行头成焦点。央视门前的星光熠熠。

♥ 为方便您进行讨论和分享&#xff0c;同时也为能带给您不一样的参与感。请您在阅读本文之前&#xff0c;点击一下“关注”&#xff0c;非常感谢您的支持&#xff01; 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 近期&#xff0c;随着2024龙年央视春晚的彩排逐渐进入高潮&am…

【c语言】字符串常见函数 上

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;c语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&a…

SSM+SpringBoot框架

单例bean是线程安全的吗 AOP Spring事务失效 第四种&#xff0c;在方法内部使用&#xff0c;需要用代理类调用此方法 bean生命周期 bean的循环依赖 SpringMVC执行流程 、 SpringBoot自动配置原理 Spring常见注解 MyBatis执行流程 MyBatis延迟加载 MyBatis缓存

代码随想录算法训练营Day56|583. 两个字符串的删除操作、72. 编辑距离

目录 583. 两个字符串的删除操作 前言 思路 算法实现 法二 72. 编辑距离 前言 思路 算法实现 总结 583. 两个字符串的删除操作 题目链接 文章链接 前言 本题与上一题不同的子序列相比&#xff0c;变化就是两个字符串都可以进行删除操作了。 思路 利用动规五部曲进…

【Vitis】Vitis HLS简介

Vitis HLS简介 Vitis™HLS是一种高层次综合工具&#xff0c;支持将C、C和OpenCL™函数硬连线到器件逻辑互连结构和RAM/DSP块上。 Vitis HLS可在Vitis应用加速开发流程中实现硬件内核&#xff0c;并使用C/C语言代码在VivadoDesign Suite中为赛灵思器件设计开发RTL IP。 【Vitis…

【Ubuntu】在.bashrc文件中误设置环境变量补救方法

这里是vim也不在PATH中了&#xff0c;因为 解决方法就是在输入vim之后提示的vim路径下用vim打开该文件&#xff0c;然后改回来

C++ Primer 第 5 版 第 6 章习题答案

文章目录 6.16.26.36.46.56.66.76.86.106.116.126.136.146.156.166.176.186.196.206.216.226.236.246.256.266.276.286.296.306.316.326.336.346.356.366.376.386.396.406.416.426.436.446.456.466.476.486.496.506.516.526.536.546.556.56 6.1 形参出现在函数定义的地方&…

Vue3.0(六):VueX 4.x详解

Vuex4状态管理 什么是状态管理 在开发中&#xff0c;我们的应用程序需要处理各种各样的数据&#xff0c;这些数据需要保存在应用程序的某一个位置&#xff0c;对于这些数据的管理&#xff0c;就是 状态管理目前前端项目越来越复杂&#xff0c;多组件共享同一数据的状态很常见…

嵌入式Qt Qt Creator安装与工程介绍

一.Qt概述 什么是Qt&#xff1a;Qt是一个跨平台的C图形用户界面应用程序框架。它为应用程序开发者提供建立图形界面所需的所有功能。它是完全面向对象的&#xff0c;很容易扩展&#xff0c;并且允许真正的组件编程。 二.Qt Creator下载安装 下载地址&#xff1a;Index of /a…

compile error ESP32cam.h no such file or directory

解决方法 可以参考这篇文章: But first, you will need to download the esp32cam.h library. For this go to Github and download the esp32cam Zip. GitHub - yoursunny/esp32cam: OV2640 camera on ESP32-CAM, Arduino library 具体就是下面的这篇重要的文章 :