接上篇:Netty学习——高级篇1 拆包 、粘包与编解码技术,本章继续介绍Netty的其他解码器
1 DelimiterBasedFrameDecoder分隔符解码器
DelimiterBasedFrameDecoder 分隔符解码器是按照指定分隔符进行解码的解码器,通过分隔符可以将二进制流拆分成完整的数据包。回车换行符解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。
分隔符解码器在实际工作中有很广泛的应用,很多简单的文本私有协议都以特殊的分隔符作为消息结束的标识,特别是那些使用长连接的基于文本的私有协议。关于分隔符的指定,并非以Char或者String作为构造参数,而是以Bytebuf。下面就结合实际例子给出它的用法。例如消息是以“$_”作为分隔符,服务端或者客户端初始化ChannelPipeline的代码实例如下:
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); ch.pipeline().addLast(new StringDecoder());
DelimiterBasedFrameDecoder同样继承了ByteToMessageDecoder并重写了decode方法,来看其中的一个构造方法。
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { this(maxFrameLength, true, delimiter); }
其中,参数 maxFrameLength 代表最大长度,delimiter是个可变参数,可以支持多个分隔符进行解码。跟进decode方法。
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
这里同样调用了重载的decode方法并将解析好的数据添加到集合List中,其父类就可以遍历Out,并将内容传播。重载的decode方法代码如下:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
//行处理器(1)
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
//找到最小长度的分隔符
for (ByteBuf delim: delimiters) {
//每个分隔符分隔的数据包长度
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
//解码(3)
//已经找到分隔符
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
//当前分隔符是否处于丢弃模式
if (discardingTooLongFrame) {
// 首先设置为非丢弃模式
discardingTooLongFrame = false;
//丢弃
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
//处于非丢弃模式
//当前找到的数据包,大于允许的数据包
if (minFrameLength > maxFrameLength) {
// 当前数据包+最小分隔符的长度 全部丢弃.
buffer.skipBytes(minFrameLength + minDelimLength);
//传递异常事件
fail(minFrameLength);
return null;
}
//如果是正常的长度
//解析出来的数据包是否忽略分隔符
if (stripDelimiter) {
//如果不包含分隔符
//截取
frame = buffer.readRetainedSlice(minFrameLength);
//跳过分隔符
buffer.skipBytes(minDelimLength);
} else {
//截取包含分隔符的长度
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
//如果没有找到分隔符
//飞丢弃模式
if (!discardingTooLongFrame) {
//可读字节大于允许的解析出来的长度
if (buffer.readableBytes() > maxFrameLength) {
// 将这个长度记录下
tooLongFrameLength = buffer.readableBytes();
//跳过这段长度
buffer.skipBytes(buffer.readableBytes());
//标记当前处于丢弃状态
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
这个方法较长,通过拆分进行剖析:
1、行处理器;2、找到最小长度分隔符;3、解码。首先看第一步,行处理器的代码:
if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); }
首先判断成员变量linebasedDecoder 是否为空,如果不为空则直接调用lineBasedDecoder的decode方法进行解析,lineBasedDecoder实际上就是LineBasedFrameDecoder解码器。这个成员变量会在分隔符是\r\n的时候进行初始化。看一下初始化该属性的构造方法:
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) { validateMaxFrameLength(maxFrameLength); if (delimiters == null) { throw new NullPointerException("delimiters"); } if (delimiters.length == 0) { throw new IllegalArgumentException("empty delimiters"); } //如果是基于行的分隔 if (isLineBased(delimiters) && !isSubclass()) { //初始化行处理器 lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else { this.delimiters = new ByteBuf[delimiters.length]; for (int i = 0; i < delimiters.length; i ++) { ByteBuf d = delimiters[i]; validateDelimiter(d); this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes()); } lineBasedDecoder = null; } this.maxFrameLength = maxFrameLength; this.stripDelimiter = stripDelimiter; this.failFast = failFast; }
这里的 isLineBased(delimiters) 会判断是否是基于行的分隔,代码如下:
private static boolean isLineBased(final ByteBuf[] delimiters) { //分隔符长度不为2 if (delimiters.length != 2) { return false; } //获得第一个分隔符 ByteBuf a = delimiters[0]; //获得第二个分隔符 ByteBuf b = delimiters[1]; if (a.capacity() < b.capacity()) { a = delimiters[1]; b = delimiters[0]; } //确保a是\r\n分隔符,确保b是\n分隔符 return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == '\r' && a.getByte(1) == '\n' && b.getByte(0) == '\n'; }
首先判断长度不等于2,直接返回false。然后获取第一个ByteBuf和第二个ByteBuf,判断a的第一个分隔符是不是\r,a第二个分隔符是不是\n,b的第一个分隔符是不是\n,如果都为true,则条件成立。回到decode中,看第二步,找到最小长度的分隔符。这里最小长度的分隔符,就是从读指针开始找到最近的分隔符,代码如下:
for (ByteBuf delim: delimiters) { //每个分隔符分隔的数据包长度 int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } }
这里会遍历所有的分隔符,找到每个分隔符到读指针的数据包长度。在通过if判断,找到长度最小的数据包的长度,然后保存当前数据包的分隔符,如下图所示:
假设A和B同为分隔符,因为A分隔符到读指针的长度小于B分隔符到读指针的长度,所以会找到最小的分隔符A,分隔符的最小长度就是readIndex到A的长度。继续看第三步,解码。
if (minDelim != null) 表示已经找到最小长度分隔符,继续看if语句块中的逻辑,代码如下:
int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { // We've just finished discarding a very large frame. // Go back to the initial state. discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } if (minFrameLength > maxFrameLength) { // Discard read frame. buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame;
if (discardingTooLongFrame) 表示当前是否处于非丢弃模式,如果是丢弃模式,则进入if块中。因为第一个不是丢弃哦是,所以先分析if后面的逻辑。if (minFrameLength > maxFrameLength) 判断当前找到的数据包长度大于最大长度,这个最大长度是创建解码器的时候设置的,如果超过了最大长度,就通过 buffer.skipBytes(minFrameLength + minDelimLength);方式跳过数据包+分隔符的长度,也就是将这部分数据进行完全丢弃。继续往下看,如果长度不大于最大允许长度,则通过 if (stripDelimiter) 判断解析出来的数据包是否包含分隔符,如果不包含分隔符,则截取数据包的长度后,跳过分隔符。
再回看 if (discardingTooLongFrame)中if里的逻辑,也就是丢弃模式。首先将discardingTooLongFrame 设置为false,标记为飞丢弃模式,然后通过buffer.skipBytes(minFrameLength + minDelimLength) 将数据包+分隔符长度的字节数跳过,也就是进行丢弃,之后抛出异常。在分析没有找到分隔符的逻辑处理,也就是if (minDelim != null) 的else语句,代码如下:
else { if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { // Discard the content of the buffer until a delimiter is found. tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { // Still discarding the buffer since a delimiter is not found. tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; }
首先通过 if (!discardingTooLongFrame) 判断是否为非丢弃模式,如果是非丢弃模式,则进入if里。在if里,通过 if (buffer.readableBytes() > maxFrameLength) 判断当前可读字节数是否大于最大允许的长度,如果大于最大允许的长度,则将可读字节数设置到tooLongFrameLength 的属性中,代表丢弃的字节数,然后通过 buffer.skipBytes(buffer.readableBytes()) 将累加器中所有的可读字节进行丢弃,最后将 discardingTooLongFrame 设置为true,也就是丢弃模式,之后抛出异常。如果 if (!discardingTooLongFrame) 结果为false,也就是当前处于丢弃模式,则追加tooLongFrameLength 也就是丢弃的字节数的长度,并通过buffer.skipBytes(buffer.readableBytes()) 将所有的字节继续进行丢弃。
2 FixedLengthFrameDecoder固定长度解码器
FixedLengthFrameDecoder 固定长度解码器能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包和拆包等问题,非常实用。
对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度长导致了空间和资源的浪费。但是它的优先也非常明显,编解码比较简单,因此在实际项目中有一定的应用场景。
利用FixedLengthFrameDecoder 解码器,无论一次接收到多少数据报文,它都会按照构造函数中设置的固定长度进行解码,如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。假如单条消息的长度是20字节,使用FixedLengthFrameDecoder 解码器的效果如下:
类的定义代码如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
//长度大小
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
//保存当前的frameLength
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//通过ByteBuf解码,解码到对象之后添加到Out上
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//字节是否小于这个固定长度
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
}
通过代码发现,FixedLengthFrameDecoder继续ByteToMessageDecoder,重写了decode方法,这个类只有一个叫 frameLength 的属性,并在构造方法中初始化了该属性。再看decode方法,在decode方法中又调用了自身另一个重载的decode方法进行解析,解析出来之后将解析后的数据放在集合Out中。再看重载的decode方法,重载的decode方法中首先判断累加器的字节数是否小于固定长度,如果小于固定长度则返回null,代表不是一个完整的数据包。如果大于等于固定长度,则直接从累加器中截取这个长度的数值,in.readRetainedSlice(frameLength) 会返回一个新的截取后的ByteBuf,并将原来的累加器读指针后移frameLength个字节。如果累加器中还有数据,则通过ByteToMessageDecoder中callDecode()方法里的while循环方式,继续进行解码。