Netty的解码器和编码器

链路图

一个完整的RPC请求中,netty对请求数据和响应数据的处理流程如下图所示

a6c09e60ef19479a97f2e519d10006ad.png

网络线路中传输的都是二进制数据,之后netty将二进制数据解码乘POJO对象,让客户端或者服务端程序处理。

解码的工具称为解码器,是一个入站处理器InBound。

编码的工具称为编码器,是一个处长处理器OutBound。

解码器

原理

解码器作为一个入站处理器,它需要将上一个入站处理器传过来的输入数据进行数据的编码或者格式转换,然后输出到下一站的入站处理器。

通常使用的ByteToMessageDecoder解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的POJO对象。

ByteToMessageDecoder是一个抽象类,继承关系如图

4ae04200c5d24576803704660b8443a8.png

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageDecoder使用了模板模式,只定义了解码的流程,具体的解码逻辑由子类完成。也就是开放了decode解码方法,由具体的解码器实现。

重申一下Netty对于handler的管理是通过通道pipeline完成的,所以解码器后面的处理器可以是业务处理器。

业务处理器接收解码结果,进行业务处理。

解码器中有一个比较重要的实现是ReplayingDecoder(也是一个抽象类),它在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节,如果缓冲区中字节足够,则会正常读取,反之,则会停止解码。等待下一次IO时间到来时再读取。

ReplayingDecoder在内部定义了一个新的二进制缓冲区类,对ByteBuf缓冲区进行了修饰,也就是ReplayingDecoderBuffer。

也就是说,继承ReplayingDecoder的子类解码器收到的二进制数据是经过ReplayingDecoderBuffer修饰过,判断过的。不是直接读取的ByteBuf中的数据。

ReplayingDecoder除了对ByteBuf数组的修饰以外,另一个作用,也更重要的作用是做分包传输。

我们知道底层通信协议是分包传输的。也就是我们预期的包大小和顺序可能和实际的并不一样,这时候就可以通过ReplayingDecoder来处理,ReplayingDecoder通过state属性来控制状态变化。比如如下sock鉴权解码器

public class SocksAuthRequestDecoder extends ReplayingDecoder<State> {

    private String username;

    public SocksAuthRequestDecoder() {
        super(State.CHECK_PROTOCOL_VERSION);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        switch (state()) {
            case CHECK_PROTOCOL_VERSION: {
                if (byteBuf.readByte() != SocksSubnegotiationVersion.AUTH_PASSWORD.byteValue()) {
                    out.add(SocksCommonUtils.UNKNOWN_SOCKS_REQUEST);
                    break;
                }
                checkpoint(State.READ_USERNAME);
            }
            case READ_USERNAME: {
                int fieldLength = byteBuf.readByte();
                username = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);
                checkpoint(State.READ_PASSWORD);
            }
            case READ_PASSWORD: {
                int fieldLength = byteBuf.readByte();
                String password = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);
                out.add(new SocksAuthRequest(username, password));
                break;
            }
            default: {
                throw new Error();
            }
        }
        ctx.pipeline().remove(this);
    }

    @UnstableApi
    public enum State {
        CHECK_PROTOCOL_VERSION,
        READ_USERNAME,
        READ_PASSWORD
    }
}

以上是偏分阶段解码,适用于那些固定长度的数据,比如整型等,但对于字符串来说,可长可短,没有具体的长度限制。如果用ReplayingDecoder来实现

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        switch (state()) {
            case PARSE_1: {
                //基于Header-Content协议传输,Header中带有content长度,用一个int长度标识即可
                length = in.readInt();
                inBytes = new byte[];
                break;
            }
            case PARSE_2: {
                in.readBytes(inBytes,0,length);
                out.add(new String(inBytes,"UTF-8"));
            }
            default: {
                throw new Error();
            }
        }
        ctx.pipeline().remove(this);
    }

但其实对于比较复杂的业务场景中,不太建议使用ReplayingDecoder,主要原因是ReplayingDecoer在解析速度上相对较差,试想一下,replayingDecoder长度不够时,会停止解码。也就是说一个请求会被解码多次才可能最终完成。

对于字符串分包传输来说,更适合直接继承ByteToMessageDecoder基类来完成Header-Content协议的解析

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        if(buf.readableBytes()<4){
            //可读字节小于4,消息头还没读满,返回。(假设Header是一个int的数据
            return;        
        }
        buf.markReaderIndex();
        int length = buf.readInt();
        if(buf.readableBytes()<length){
            buf.resetReaderIndex();        
        }
        byte[] inBytes = new byte[length];
        buf.readBytes(inBytes,0,length);
        out.add(new String(inBytes,"UTF-8"));
    }

除了ByteToMessageDecoder这种将二进制数据转化为POJO对象的解码器以外,还有将一种POJO转为另一种POJO对象的解码器,MessageToMessageDecoder,不同的是,后者需要指明泛型类型。比如Integer转为String,这时候泛型类型为Integer。

Netty内置的开箱即用的Decoder

FixedLengthFrameDecoder-固定长度数据包解码器

他会把入站ByteBuf数据包拆分成一个个长度为n的数据包,然后发往下一个channelHandler入站处理器

LineBasedFrameDecoder-行分割数据包解码器

如果ByteBuf数据包使用换行符/回车符作为数据包的边界分隔符。这时他会把数据包按换行符/回车符拆分成一个个数据包。

有一个行最大长度限制,如果超过这个长度还没有发现分隔符,会抛出异常

DelimiterFrameDecoder-自定义分隔符数据包解码器

他会按照自定义分隔符将ByteBuf数据包进行拆分

LengthFieldBasedFrameDecoder-自定义长度数据包解码器

基于灵活长度的数据包,在ByteBuf数据包中,加了一个长度字段,保存了原始数据包长度,解码的时候,会按照这个长度进行原始数据包的提取。

一般基于Header-Content协议的数据包,都建议使用这个解码器

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {

   
    private final int maxFrameLength;        //发送的数据包最大长度
    private final int lengthFieldOffset;     //长度字段偏移量
    private final int lengthFieldLength;     //长度字段自己占用的字节数
    private final int lengthAdjustment;      //长度字段的偏移量矫正,比如长度后面还有两个字节用于存储别的信息,那么该值为2
    private final int initialBytesToStrip;   //丢弃的起始字节数
    ...
}

编码器

原理

所谓的编码器就是服务端应用程序处理完之后,一般会有一个响应结果Response。也就是一个Java POJO对象。需要将他编码为最终ByteBuf二进制类型。通过流水线写入到底层的Java通道。

上面说,解码器是一个入站处理器,那么编码器就是一个出站处理器。也就是OutboundHandler。处理逻辑为每个出站处理器会将上一个出站处理器的结果作为输入,经过处理后,传递给下一个出站处理器,直至最后写入Java通道。

由于出站处理器是从后向前执行的,所以第一个处理器一定是需要将结果处理成ByteBuf类型的数据。

MessageToByteEncoder同ByteToMessageDecoder一样都是一个抽象类,用模板模式。其中encode方法由子类实现。

在最后一步之前,可能会需要将一种POJO对象转成另一种POJO对象,就像解码器中的MessageToMessageDecoder一样,编码器也有同样的MessageToMessageEncoder解码器抽象类。

编解码器

所谓的编解码器也就是把解码器和编码器放在同一个类中,这个类就叫做ByteToMessageCodec,需要同时实现encode和decode方法。

不过这样的话,解码和编码的不同的代码就会出现在一个类中。出现逻辑混乱。Netty提供了另一种方式可以让编码代码和解码代码放在两个类,同时把编码工作和解码工作组合起来

编解码组合器

这个编解码组合器称为CombinedChanneldDuplexHandler组合器,比如客户端的编解码组合器就是用的这种方式

public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
        implements HttpClientUpgradeHandler.SourceCodec {
            
            ...
}

public class HttpResponseDecoder extends HttpObjectDecoder {
    ...
}

public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
    
    private enum State {
        SKIP_CONTROL_CHARS,
        READ_INITIAL,
        READ_HEADER,
        READ_VARIABLE_LENGTH_CONTENT,
        READ_FIXED_LENGTH_CONTENT,
        READ_CHUNK_SIZE,
        READ_CHUNKED_CONTENT,
        READ_CHUNK_DELIMITER,
        READ_CHUNK_FOOTER,
        BAD_MESSAGE,
        UPGRADED
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (resetRequested) {
            resetNow();
        }

        switch (currentState) {
        case SKIP_CONTROL_CHARS:
            // Fall-through
        case READ_INITIAL: try {
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            String[] initialLine = splitInitialLine(line);
            if (initialLine.length < 3) {
                // Invalid initial line - ignore.
                currentState = State.SKIP_CONTROL_CHARS;
                return;
            }

            message = createMessage(initialLine);
            currentState = State.READ_HEADER;
            // fall-through
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_HEADER: try {
            State nextState = readHeaders(buffer);
            if (nextState == null) {
                return;
            }
            currentState = nextState;
            switch (nextState) {
            case SKIP_CONTROL_CHARS:
                // fast-path
                // No content is expected.
                out.add(message);
                out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                resetNow();
                return;
            case READ_CHUNK_SIZE:
                if (!chunkedSupported) {
                    throw new IllegalArgumentException("Chunked messages not supported");
                }
                // Chunked encoding - generate HttpMessage first.  HttpChunks will follow.
                out.add(message);
                return;
            default:
                /**
                 * <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
                 * request does not have either a transfer-encoding or a content-length header then the message body
                 * length is 0. However for a response the body length is the number of octets received prior to the
                 * server closing the connection. So we treat this as variable length chunked encoding.
                 */
                long contentLength = contentLength();
                if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
                    out.add(message);
                    out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                    resetNow();
                    return;
                }

                assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
                        nextState == State.READ_VARIABLE_LENGTH_CONTENT;

                out.add(message);

                if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
                    // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
                    chunkSize = contentLength;
                }

                // We return here, this forces decode to be called again where we will decode the content
                return;
            }
        } catch (Exception e) {
            out.add(invalidMessage(buffer, e));
            return;
        }
        case READ_VARIABLE_LENGTH_CONTENT: {
            // Keep reading data as a chunk until the end of connection is reached.
            int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
            if (toRead > 0) {
                ByteBuf content = buffer.readRetainedSlice(toRead);
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        case READ_FIXED_LENGTH_CONTENT: {
            int readLimit = buffer.readableBytes();

            // Check if the buffer is readable first as we use the readable byte count
            // to create the HttpChunk. This is needed as otherwise we may end up with
            // create an HttpChunk instance that contains an empty buffer and so is
            // handled like it is the last HttpChunk.
            //
            // See https://github.com/netty/netty/issues/433
            if (readLimit == 0) {
                return;
            }

            int toRead = Math.min(readLimit, maxChunkSize);
            if (toRead > chunkSize) {
                toRead = (int) chunkSize;
            }
            ByteBuf content = buffer.readRetainedSlice(toRead);
            chunkSize -= toRead;

            if (chunkSize == 0) {
                // Read all content.
                out.add(new DefaultLastHttpContent(content, validateHeaders));
                resetNow();
            } else {
                out.add(new DefaultHttpContent(content));
            }
            return;
        }
        /**
         * everything else after this point takes care of reading chunked content. basically, read chunk size,
         * read chunk, read and ignore the CRLF and repeat until 0
         */
        case READ_CHUNK_SIZE: try {
            AppendableCharSequence line = lineParser.parse(buffer);
            if (line == null) {
                return;
            }
            int chunkSize = getChunkSize(line.toString());
            this.chunkSize = chunkSize;
            if (chunkSize == 0) {
                currentState = State.READ_CHUNK_FOOTER;
                return;
            }
            currentState = State.READ_CHUNKED_CONTENT;
            // fall-through
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case READ_CHUNKED_CONTENT: {
            assert chunkSize <= Integer.MAX_VALUE;
            int toRead = Math.min((int) chunkSize, maxChunkSize);
            if (!allowPartialChunks && buffer.readableBytes() < toRead) {
                return;
            }
            toRead = Math.min(toRead, buffer.readableBytes());
            if (toRead == 0) {
                return;
            }
            HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
            chunkSize -= toRead;

            out.add(chunk);

            if (chunkSize != 0) {
                return;
            }
            currentState = State.READ_CHUNK_DELIMITER;
            // fall-through
        }
        case READ_CHUNK_DELIMITER: {
            final int wIdx = buffer.writerIndex();
            int rIdx = buffer.readerIndex();
            while (wIdx > rIdx) {
                byte next = buffer.getByte(rIdx++);
                if (next == HttpConstants.LF) {
                    currentState = State.READ_CHUNK_SIZE;
                    break;
                }
            }
            buffer.readerIndex(rIdx);
            return;
        }
        case READ_CHUNK_FOOTER: try {
            LastHttpContent trailer = readTrailingHeaders(buffer);
            if (trailer == null) {
                return;
            }
            out.add(trailer);
            resetNow();
            return;
        } catch (Exception e) {
            out.add(invalidChunk(buffer, e));
            return;
        }
        case BAD_MESSAGE: {
            // Keep discarding until disconnection.
            buffer.skipBytes(buffer.readableBytes());
            break;
        }
        case UPGRADED: {
            int readableBytes = buffer.readableBytes();
            if (readableBytes > 0) {
                // Keep on consuming as otherwise we may trigger an DecoderException,
                // other handler will replace this codec with the upgraded protocol codec to
                // take the traffic over at some point then.
                // See https://github.com/netty/netty/issues/2173
                out.add(buffer.readBytes(readableBytes));
            }
            break;
        }
        default:
            break;
        }
    }
    ...
}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/352855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BAT学习笔记:详解环境变量及其所有创建方法

文章目录 一、初识环境变量二、什么是环境变量三、为什么需要环境变量四、环境变量的分类五、环境变量的设置 一、初识环境变量 1.windows 的搜索框中输入 查看高级系统设置。点击打开系统属性窗口。 2. 在系统属性窗口中&#xff0c;点击右下方的“环境变量”打开环境变量设…

Linux服务器配置与管理(第二次实验)

实验目的及具体要求 目的 1.掌握基于命令行的文件操作 2.掌握基于命令行的目录操作 3.掌握用户账户的命令行操作 4.掌握组账户的命令行操作 5.熟悉磁盘分区操作 6.掌握调整优先级的方法 具体要求 1.掌握基于命令行的文件和目录操作 ①创建测试目录 ②创建文件 ③复…

解析MySQL生产环境CPU使用率过高的排查与解决方案

引言 在生产环境中&#xff0c;MySQL作为一个关键的数据库组件&#xff0c;其性能对整个系统的稳定性至关重要。然而&#xff0c;有时候我们可能会遇到MySQL CPU使用率过高的问题&#xff0c;这可能导致系统性能下降&#xff0c;应用页面访问减慢&#xff0c;甚至影响到用户体…

代码随想录算法训练营第十七天 |110.平衡二叉树,257.二叉树的所有路径,404.左叶子之和(待补充)

110.平衡二叉树 1、题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 2、文章讲解&#xff1a;代码随想录 3、题目&#xff1a; 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二…

前端工程化之:webpack1-6(编译过程)

一、webpack编译过程 webpack 的作用是将源代码编译&#xff08;构建、打包&#xff09;成最终代码。 整个过程大致分为三个步骤&#xff1a; 初始化编译输出 1.初始化 初始化时我们运行的命令 webpack 为核心包&#xff0c; webpack-cli 提供了 webpack 命令&#xff0c;通过…

Go 命令行解析 flag 包之快速上手

本篇文章是 Go 标准库 flag 包的快速上手篇。 概述 开发一个命令行工具&#xff0c;视复杂程度&#xff0c;一般要选择一个合适的命令行解析库&#xff0c;简单的需求用 Go 标准库 flag 就够了&#xff0c;flag 的使用非常简单。 当然&#xff0c;除了标准库 flag 外&#x…

架构整洁之道——价值维度与编程范式

1 设计与架构究竟是什么 结论&#xff1a;二者没有任何区别&#xff0c;一丁点区别都没有。 架构图里实际上包含了所有底层设计细节&#xff0c;这些细节信息共同支撑了顶层的架构设计&#xff0c;底层设计信息和顶层架构设计共同组成了整个架构文档。底层设计细节和高层架构信…

Neo4j 国内镜像下载与安装

Neo4j 5.x 简体中文版指南 社区版&#xff1a;https://neo4j.com/download-center/#community 链接地址&#xff08;Linux版&#xff09;&#xff1a;https://neo4j.com/artifact.php?nameneo4j-community-3.5.13-unix.tar.gz 链接地址&#xff08;Windows&#xff09;&#x…

如何使用react框架进行两个html页面的切换?

如何使用react框架进行两个html页面的切换? 项目背景首先是古老的做法login.htmlindex.html 正文->react框架如何设置两个页面的跳转?配置react框架的环境react框架如何实现两个页面的跳转? 项目背景 古老的html页面跳转的做法无法在react框架中直接适配,所以非常有必要…

MySQL-进阶-索引

一、索引概述 1、介绍 2、有误索引搜索效率演示 3、优缺点 二、索引结构 1、B-Tree&#xff08;多路平衡查找树&#xff09; 2、BTree 3、Hash 三、索引分类 四、索引语法 1、语法 2、案例 五、SQL性能分析 1、查看执行频次 2、慢查询日志 3、show-profile 4、explain 六、索…

redis 入门

一、什么是redis? redis是c语言编写的高性能(读的速度是110000次/s,写的速度是81000次/s)的k-v形式的数据库&#xff0c;数据存在内存中 二、redis的使用场景&#xff1f; 数据量小&#xff0c;访问量大 三、redis的启动和关闭 启动&#xff1a; 打开cmd&…

2. HarmonyOS应用开发DevEcoStudio准备-1

2. HarmonyOS应用开发DevEcoStudio准备-1 下载 DevEco Studio 进入HUAWEI DevEco Studio产品页产品页。 单击下载列表右侧的按钮&#xff0c;下载 DevEco Studio。 安装 DevEco Studio 下载完成后&#xff0c;双击下载的 deveco-studio-xxxx.exe&#xff0c;进入 DevEco St…

gitee建库并git

箴言&#xff1a;书山有路勤为径 文章目录 前言一、gitee导入ssh二、gitee建库三、克隆到本地四、关联本地工程到远程仓库五、push流程总结 前言 nodejs每天的学习都有代码产出&#xff0c;转念一想不如在码云上面搞个仓库&#xff0c;也经历了些许波折&#xff0c;往常也建了…

接口测试工具开发文档

1 开发规划 1.1 开发人员 角 色 主要职责 负责模块 人员 备注 n xxx模块 xxx 1.2 开发计划 <附开发计划表> 1.3 开发环境和工具 开发工具 工具 作用 Notepad 编辑器 Perl 解释器 2 总体设计 设计思路&#xff1a;因为测试app和server。首先必须…

LeetCode.11. 盛最多水的容器

题目 题目链接 分析 这道题的意思就是让我们找两个下标&#xff0c;以这两个下标组成的线为底&#xff0c;高度取这两个位置对应数字的最小值为高&#xff0c;组成一个长方形&#xff0c;求长方形最大的面积可以为多少。 暴力的解法是什么&#xff1f;&#xff1f;&#xf…

【Linux】开始使用 vim 吧!!!

Linux 1 what is vim &#xff1f;2 vim基本概念3 vim的基本操作 &#xff01;3.1 vim的快捷方式3.1.1 复制与粘贴3.1.2 撤销与剪切3.1.3 字符操作 3.2 vim的光标操作3.3 vim的文件操作 总结Thanks♪(&#xff65;ω&#xff65;)&#xff89;感谢阅读下一篇文章见&#xff01;…

工业4.0前沿:8DI/4DO/6AI RTU在石油管道监测中的应用

在当前数字化转型的大潮下&#xff0c;石油化工行业的智能化进程正以前所未有的速度推进。其中&#xff0c;物联网技术作为连接物理世界与数字世界的桥梁&#xff0c;在管道监控与安全管理领域发挥着至关重要的作用。一款专为石油化工管道设计的全网通物联网RTU终端应运而生&am…

消息中间件之RocketMQ(五)

RocketMQ高性能背后的核心原理 1.消息主从复制 如果Broker以一个集群的方式部署&#xff0c;会有一个master节点和多个Slave节点&#xff0c;消息需要从master复制到slave上&#xff0c;而消息复制的方式分为同步复制和异步复制。 同步复制: 同步复制是等Master和Slave都写入…

为什么网页打开慢?是服务器的问题吗?

当我们遇到网页加载缓慢时&#xff0c;首先想到的可能是服务器的问题。的确&#xff0c;服务器是影响网页加载速度的一个重要因素。然而&#xff0c;这并非是唯一的原因。实际上&#xff0c;网页加载速度受多种因素影响&#xff0c;包括但不限于服务器、网络带宽、DNS解析时间、…

linux0.11源码看信号的处理流程

序 日常Linux写代码或者使用中难免会使用siganl&#xff0c;包括我们使用ctrl-c结束程序&#xff0c;使用kill命令发送信号&#xff0c;或者说程序core后操作系统向程序发送的信号&#xff0c;以及我们程序内部自定义的信号处理。 我们选择linux0.11一个原因是它比较简单&…