netty http2 server侧的核心逻辑个人认为,主要在编解码处理器和Stream Transform Channel这块,分别处理Http2 消息帧的编解码,以及连接的多流处理机制。对应用的处理类分别:
ChannelHandler | Desc |
---|---|
io.netty.handler.codec.http2.Http2FrameCodec | 负责http2帧和消息的编解码 |
io.netty.handler.codec.http2.Http2MultiplexHandler | 负责流的连接通道复用,将Stream转为Http2MultiplexHandlerStreamChannel |
对于netty http2 server的搭建代码,可以见前面的文章:HTTP2: netty http2 server demo
Http2FrameCodec 和Http2MultiplexHandler源码分析
Http2FrameCodec会对Header和Data帧进行解码,先看看调用栈:
获取StreamId
对帧的解码第一个核心逻辑是在io.netty.handler.codec.http2.DefaultHttp2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener),此时ChannelHandlerContext对应的Channel是NioSocketChannel,还没有到StreamChannel。看看该方法的代码逻辑:
@Override
public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)
throws Http2Exception {
if (readError) {
input.skipBytes(input.readableBytes());
return;
}
try {
do {
if (readingHeaders) {
processHeaderState(input);
if (readingHeaders) {
// Wait until the entire header has arrived.
return;
}
}
// The header is complete, fall into the next case to process the payload.
// This is to ensure the proper handling of zero-length payloads. In this
// case, we don't want to loop around because there may be no more data
// available, causing us to exit the loop. Instead, we just want to perform
// the first pass at payload processing now.
processPayloadState(ctx, input, listener);
if (!readingHeaders) {
// Wait until the entire payload has arrived.
return;
}
} while (input.isReadable());
} catch (Http2Exception e) {
readError = !Http2Exception.isStreamError(e);
throw e;
} catch (RuntimeException e) {
readError = true;
throw e;
} catch (Throwable cause) {
readError = true;
PlatformDependent.throwException(cause);
}
}
该方法会对Socket中的字节进行解码,首先是处理帧头的数据,由processHeaderState(ByteBuf)处理,该方法会读取payloadLength、frameType、flags以及streamId信息,并进行正确性校验,如果校验失败,会抛出相应的Http2Exception异常。代码如下:
private void processHeaderState(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < FRAME_HEADER_LENGTH) {
// Wait until the entire frame header has been read.
return;
}
// Read the header and prepare the unmarshaller to read the frame.
payloadLength = in.readUnsignedMedium();
if (payloadLength > maxFrameSize) {
throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
maxFrameSize);
}
frameType = in.readByte();
flags = new Http2Flags(in.readUnsignedByte());
streamId = readUnsignedInt(in);
// We have consumed the data, next time we read we will be expecting to read the frame payload.
readingHeaders = false;
switch (frameType) {
case DATA:
verifyDataFrame();
break;
case HEADERS:
verifyHeadersFrame();
break;
case PRIORITY:
verifyPriorityFrame();
break;
case RST_STREAM:
verifyRstStreamFrame();
break;
case SETTINGS:
verifySettingsFrame();
break;
case PUSH_PROMISE:
verifyPushPromiseFrame();
break;
case PING:
verifyPingFrame();
break;
case GO_AWAY:
verifyGoAwayFrame();
break;
case WINDOW_UPDATE:
verifyWindowUpdateFrame();
break;
case CONTINUATION:
verifyContinuationFrame();
break;
default:
// Unknown frame type, could be an extension.
verifyUnknownFrame();
break;
}
}
当帧头信息处理完后,就处理fram payload了,这个逻辑由processPayloadState(ChannelHandlerContext, ByteBuf, Http2FrameListener)方法完成。它会根据frameType进行执行相应解码方法,当把帧的数据处理好后,再调用Http2FrameListener对应的onxxxx()方法,执行下一步的逻辑。processPayloadState(ChannelHandlerContext, ByteBuf, Http2FrameListener)方法的代码逻辑如下:
private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
throws Http2Exception {
if (in.readableBytes() < payloadLength) {
// Wait until the entire payload has been read.
return;
}
// Only process up to payloadLength bytes.
int payloadEndIndex = in.readerIndex() + payloadLength;
// We have consumed the data, next time we read we will be expecting to read a frame header.
readingHeaders = true;
// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
//处理data帧数据
readDataFrame(ctx, in, payloadEndIndex, listener);
break;
case HEADERS:
//处理Header首帧数据
readHeadersFrame(ctx, in, payloadEndIndex, listener);
break;
...
case CONTINUATION:
//处理Header或PushPromise的连续帧数据
readContinuationFrame(in, payloadEndIndex, listener);
break;
default:
readUnknownFrame(ctx, in, payloadEndIndex, listener);
break;
}
in.readerIndex(payloadEndIndex);
}
处理Header帧数据
Header帧体的处理分两种情况,分别是Header首帧和首帧后面跟着的CONTINUATION Header帧, 两种处理方法在processHeaderState()方法中的校验逻辑也不同,如下:
对于Header首帧的校验:
// Header首帧校验
private void verifyHeadersFrame() throws Http2Exception {
verifyAssociatedWithAStream();
verifyNotProcessingHeaders();
verifyPayloadLength(payloadLength);
int requiredLength = flags.getPaddingPresenceFieldLength() + flags.getNumPriorityBytes();
if (payloadLength < requiredLength) {
throw streamError(streamId, FRAME_SIZE_ERROR,
"Frame length too small." + payloadLength);
}
}
//Continuation 帧的校验
private void verifyContinuationFrame() throws Http2Exception {
verifyAssociatedWithAStream();
verifyPayloadLength(payloadLength);
if (headersContinuation == null) {
throw connectionError(PROTOCOL_ERROR, "Received %s frame but not currently processing headers.",
frameType);
}
if (streamId != headersContinuation.getStreamId()) {
throw connectionError(PROTOCOL_ERROR, "Continuation stream ID does not match pending headers. "
+ "Expected %d, but received %d.", headersContinuation.getStreamId(), streamId);
}
if (payloadLength < flags.getPaddingPresenceFieldLength()) {
throw streamError(streamId, FRAME_SIZE_ERROR,
"Frame length %d too small for padding.", payloadLength);
}
}
private void verifyNotProcessingHeaders() throws Http2Exception {
if (headersContinuation != null) {
throw connectionError(PROTOCOL_ERROR, "Received frame of type %s while processing headers on stream %d.",
frameType, headersContinuation.getStreamId());
}
}
private void verifyPayloadLength(int payloadLength) throws Http2Exception {
if (payloadLength > maxFrameSize) {
throw connectionError(PROTOCOL_ERROR, "Total payload length %d exceeds max frame length.", payloadLength);
}
}
private void verifyAssociatedWithAStream() throws Http2Exception {
if (streamId == 0) {
throw connectionError(PROTOCOL_ERROR, "Frame of type %s must be associated with a stream.", frameType);
}
}
Header首帧处理
Header首帧校验项:
Item | Method | Exception |
---|---|---|
streamId是否正常(!=0) | verifyAssociatedWithAStream | 异常:streamId==0 |
headersContinuation 是否为空 | verifyNotProcessingHeaders | headersContinuation!=null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation!=null时,说明流的帧数据出现错乱 |
payloadLength<=maxFrameSize | verifyPayloadLength | 帧的大小超出了上限 |
payloadLength >= (flags.getPaddingPresenceFieldLength() + flags.getNumPriorityBytes()) | 帧体长度太小 |
Header帧的处理逻辑在readHeadersFrame(ChannelHandlerContext, ByteBuf, int, Http2FrameListener)方法中, 该方法有个priority information的处理分支,这个暂时不看。代码逻辑如下:
private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
final int headersStreamId = streamId;
final Http2Flags headersFlags = flags;
final int padding = readPadding(payload);
verifyPadding(padding);
// The callback that is invoked is different depending on whether priority information
// is present in the headers frame.
if (flags.priorityPresent()) {
...
return;
}
// The priority fields are not present in the frame. Prepare a continuation that invokes
// the listener callback without priority information.
headersContinuation = new HeadersContinuation() {
@Override
public int getStreamId() {
return headersStreamId;
}
@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,
headersFlags.endOfStream());
}
}
};
// Process the initial fragment, invoking the listener's callback if end of headers.
int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}
该方法会创建一个HeadersContinuation对象,主要是用于对Continuation Header帧数据的累加。
帧数据的读取的代码逻辑是通过headersContinuation.processFragment()方法去读取,并继续解析向下调用,处理完后关闭并销毁headersContinuation对象。代码逻辑:
int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
Header 处理完成后关闭并销毁headersContinuation对象:
private void resetHeadersContinuationIfEnd(boolean endOfHeaders) {
if (endOfHeaders) {
closeHeadersContinuation();
}
}
private void closeHeadersContinuation() {
if (headersContinuation != null) {
headersContinuation.close();
headersContinuation = null;
}
}
HeadersContinuation.processFragment()方法会调用HeadersBlockBuilder.addFragment()方法将Header帧的数据读入另一个ByteBuf对象headerBlock中,在Continuation Header帧数据读取时,也是同样的逻辑会读入到headerBlock中,这样Header完整的字节数据都会在headerBlock中。
代码逻辑:
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,
headersFlags.endOfStream());
}
}
HeadersBlockBuilder的代码逻辑:
final HeadersBlockBuilder headersBlockBuilder() {
return builder;
}
final void addFragment(ByteBuf fragment, int len, ByteBufAllocator alloc,
boolean endOfHeaders) throws Http2Exception {
if (headerBlock == null) {
if (len > headersDecoder.configuration().maxHeaderListSizeGoAway()) {
headerSizeExceeded();
}
if (endOfHeaders) {
// Optimization - don't bother copying, just use the buffer as-is. Need
// to retain since we release when the header block is built.
headerBlock = fragment.readRetainedSlice(len);
} else {
headerBlock = alloc.buffer(len).writeBytes(fragment, len);
}
return;
}
if (headersDecoder.configuration().maxHeaderListSizeGoAway() - len <
headerBlock.readableBytes()) {
headerSizeExceeded();
}
if (headerBlock.isWritable(len)) {
// The buffer can hold the requested bytes, just write it directly.
headerBlock.writeBytes(fragment, len);
} else {
// Allocate a new buffer that is big enough to hold the entire header block so far.
ByteBuf buf = alloc.buffer(headerBlock.readableBytes() + len);
buf.writeBytes(headerBlock).writeBytes(fragment, len);
headerBlock.release();
headerBlock = buf;
}
}
HeadersContinuation.processFragment()方法中,在HeadersBlockBuilder.addFragment()方法将Header帧的数据读取并加入到headerBlock中后,会判断Header帧是否结束,如果结束了,则会执行hdrBlockBuilder.headers()将headerBlock中的字节解码在io.netty.handler.codec.http2.Http2Headers对象 ,再调用Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法,继续执行后面的Header解析、创建StreamChannel、处理Header消息等逻辑。
Continuation Header帧处理
Continuation Header帧是Header帧的数据补充,另外Continuation帧还可以对PushPromise帧进行数据补充。Continuation Header帧的处理方式跟Header帧没多大差别。
Continuation Header帧校验项:
Item | Method | Exception |
---|---|---|
streamId是否正常(!=0) | verifyAssociatedWithAStream | 异常:streamId==0 |
payloadLength<=maxFrameSize | verifyPayloadLength | 帧的大小超出了上限 |
headersContinuation 是否不为空 | headersContinuation==null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation=null时,说明流的帧数据出现错乱 | |
streamId != headersContinuation.getStreamId() | 如果当前帧的streamId与headersContinuation的streamId不同, 流的帧传输或读取出现了乱序 | |
payloadLength >= flags.getPaddingPresenceFieldLength() | 帧体长度太小 |
代码逻辑如下:
private void verifyContinuationFrame() throws Http2Exception {
verifyAssociatedWithAStream();
verifyPayloadLength(payloadLength);
if (headersContinuation == null) {
throw connectionError(PROTOCOL_ERROR, "Received %s frame but not currently processing headers.",
frameType);
}
if (streamId != headersContinuation.getStreamId()) {
throw connectionError(PROTOCOL_ERROR, "Continuation stream ID does not match pending headers. "
+ "Expected %d, but received %d.", headersContinuation.getStreamId(), streamId);
}
if (payloadLength < flags.getPaddingPresenceFieldLength()) {
throw streamError(streamId, FRAME_SIZE_ERROR,
"Frame length %d too small for padding.", payloadLength);
}
}
Continuation Header帧的数据读入更是同Header帧的数据读入没什么区别,都是调用HeadersContinuation.processFragment()方法,通过HeadersBlockBuilder.addFragment()方法将Continuation Header帧的数据读取并加入到headerBlock中,然后判断Header帧是否结束,如果结束了,则会执行hdrBlockBuilder.headers()将headerBlock中的字节解码在io.netty.handler.codec.http2.Http2Headers对象 ,再调用Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法,继续执行后面的Header解析、创建StreamChannel、处理Header消息等逻辑。
代码逻辑如下:
private void readContinuationFrame(ByteBuf payload, int payloadEndIndex, Http2FrameListener listener)
throws Http2Exception {
// Process the initial fragment, invoking the listener's callback if end of headers.
headersContinuation.processFragment(flags.endOfHeaders(), payload,
payloadEndIndex - payload.readerIndex(), listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}
疑问
这里其实有个疑问,为什么headersContinuation是做为实例对象放在DefaultHttp2FrameReader中,Http2的多流之间的帧数据是可以中间穿插串起来的,如果相连的两个Header帧和Continuation Header帧分别是不同流的,那不是就出问题了么?
后来我明白过来了,虽然多流之间的帧数据是可以中间穿插串起来,但是仍是使用同一个NioChannel,在写数据时保证Header帧和Continuation Header帧是紧紧相连一起写出去的,那么出现问题的可能性就几乎没有了。
Header解码
Header解码的动作入口是在headersContinuation.processFragment() -> hdrBlockBuilder.headers() -> DefaultHttp2HeadersDecoder.decodeHeaders() -> HpackDecoder.decode()。所以最终的解码工作是由io.netty.handler.codec.http2.HpackDecoder.decode()方法完成的。
调用栈如下:
HpackDecoder维护了一组状态常量,代表的是当前对Header的读取状态,不同的状态做的事情是不一样的,HpackDecoder通过一个While循环来读取Header,因为一个Frame里面包含若干个Header,根据读取到的数据判断,State会不断变化流转。
State | 说明 |
---|---|
READ_HEADER_REPRESENTATION | 读取header的初试状态 |
READ_INDEXED_HEADER | 读取被索引的完整header,即name和value均被索引 |
READ_INDEXED_HEADER_NAME | 读取被索引的header name |
READ_LITERAL_HEADER_NAME_LENGTH_PREFIX | name未被索引,读取name长度前缀,判断是否使用哈夫曼编码 |
READ_LITERAL_HEADER_NAME_LENGTH | name未被索引,读取name长度 |
READ_LITERAL_HEADER_NAME | name未被索引,读取header name |
READ_LITERAL_HEADER_VALUE_LENGTH_PREFIX | 读取value长度前缀,判断是否使用哈夫曼编码 |
READ_LITERAL_HEADER_VALUE_LENGTH | 读取value长度 |
READ_LITERAL_HEADER_VALUE | value未被索引,读取value |
更多详细的解读见Netty对HPACK头部压缩的支持
与Http2MultiplexHandler 的集成处理
当HttpFrameCodec将帧解码之后,会通过Http2FrameListener进行封装处理,然后再调用netty的相应逻辑,与Http2MultiplexHandler的userEventTriggered()方法和channelRead()打通连起来。
Http2MultiplexHandler的两个方法主要逻辑:
Method | Desc |
---|---|
userEventTriggered() | 将Http2FrameCodec.DefaultHttp2FrameStream对象转换成Http2MultiplexHandlerStreamChannel对象,将触发Http2MultiplexHandlerStreamChannel对应pipeline的fireChannelRegistered()方法 |
channelRead() | 将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行AbstractHttp2StreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换 |
创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent)
HeadersContinuation.processFragment()方法调用的Http2FrameListener.onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream)方法逻辑中,Http2FrameListener是DefaultHttp2ConnectionDecoder$FrameReadListener对象,在该对象的onHeadersRead()方法中,会在Http2Connection先通过streamId查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活。
调用栈如下:
DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()方法代码逻辑如下:
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
boolean allowHalfClosedRemote = false;
if (stream == null && !connection.streamMayHaveExisted(streamId)) {
// 创建Http2Stream
stream = connection.remote().createStream(streamId, endOfStream);
// Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
}
if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, "HEADERS")) {
return;
}
boolean isInformational = !connection.isServer() &&
HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {
throw streamError(streamId, PROTOCOL_ERROR,
"Stream %d received too many headers EOS: %s state: %s",
streamId, endOfStream, stream.state());
}
switch (stream.state()) {
case RESERVED_REMOTE:
stream.open(endOfStream);
break;
case OPEN:
case HALF_CLOSED_LOCAL:
// Allowed to receive headers in these states.
break;
case HALF_CLOSED_REMOTE:
if (!allowHalfClosedRemote) {
throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
stream.id(), stream.state());
}
break;
case CLOSED:
throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
stream.id(), stream.state());
default:
// Connection error.
throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
stream.state());
}
stream.headersReceived(isInformational);
encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
//触发...
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream);
// If the headers completes this stream, close it.
if (endOfStream) {
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
}
}
Http2Connection.DefaultEndpoint.createStream() 方法代码逻辑如下:
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
State state = activeState(streamId, IDLE, isLocal(), halfClosed);
checkNewStreamAllowed(streamId, state);
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, state);
incrementExpectedStreamId(streamId);
addStream(stream);
stream.activate();
return stream;
}
创建好Http2Stream后,同时调用Http2Stream.activate()方法进行激活。
关键调用栈
触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent)方法的调用链:
Method | Desc | |
---|---|---|
- | DefaultHttp2FrameReader$HeadersContinuation.processFragment() | 从IO中读取帧数据到添加到DefaultHttp2FrameReader$HeadersBlockBuilder.headerBlock中,当Header数据结束时,调用 Http2FrameListener.onHeadersRead()方法 |
- | DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead() | 根据streamId从Http2Connection查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活 (详情见上面的创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent))。再然后调用Http2EmptyDataFrameListener.onHeadersRead()方法向下执行 |
1 | Http2Connection.DefaultEndpoint.createStream() | 创建DefaultHttp2Connection.DefaultStream(Http2Stream)对象 |
2 | DefaultHttp2Connection.DefaultStream.activate() | 流的激活方法 |
3 | DefaultHttp2Connection.ActiveStreams.activate() | 激活流 |
4 | DefaultHttp2Connection.ActiveStreams.addToActiveStreams() | 添加到集合DefaultHttp2Connection.ActiveStreams.streams中,并调用Http2FrameCodec$ConnectionListener.onStreamActive()方法 |
5 | Http2FrameCodec.onStreamActive0() | 将DefaultHttp2Connection.DefaultStream对象包装成Http2FrameCodec.DefaultHttp2FrameStream对象 |
6 | Http2FrameCodec.onHttp2StreamStateChanged() | 调用ChannelHandlerContext.fireUserEventTriggered()方法,触发Http2FrameStreamEvent |
7 | ChannelHandlerContext.fireUserEventTriggered() | |
8 | ChannelHandlerContext.invokeUserEventTriggered() | 触发下一个AbstractChannelHandlerContext.invokeUserEventTriggered()并使用下一个ChannelHandler绑定的EventExecutor执行。 相当于触发ChannelInboundHandler链的userEventTriggered() |
9 | ChannelInboundHandler.userEventTriggered() | |
10 | Http2MultiplexHandler.userEventTriggered() | 将Http2FrameCodec.DefaultHttp2FrameStream对象转换成Http2MultiplexHandlerStreamChannel对象,并将新建的Http2MultiplexHandlerStreamChannel对象注册到EventLoop中 |
11 | SingleThreadEventLoop.register() | |
12 | AbstractHttp2StreamChannel.Http2ChannelUnsafe.register() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法,但并没有使用上面传递过来的EventLoop |
13 | DefaultChannelPipeline.fireChannelRegistered() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRegistered()方法 |
14 | ChannelInboundHandler.channelRegistered() | 执行ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法 |
关键代码逻辑
Http2MultiplexHandler.userEventTriggered()方法会将Stream转换为Http2MultiplexHandlerStreamChannel,实现一个连接的多流Channel隔离,每个流可以对数据字节(ByteBuf)单独处理。
代码逻辑如下:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof Http2FrameStreamEvent) {
Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
if (event.type() == Http2FrameStreamEvent.Type.State) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
// Ignore everything which was not caused by an upgrade
break;
}
// fall-through
case HALF_CLOSED_REMOTE:
// fall-through
case OPEN:
if (stream.attachment != null) {
// ignore if child channel was already created.
break;
}
final AbstractHttp2StreamChannel ch;
// We need to handle upgrades special when on the client side.
if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
// We must have an upgrade handler or else we can't handle the stream
if (upgradeStreamHandler == null) {
throw connectionError(INTERNAL_ERROR,
"Client is misconfigured for upgrade requests");
}
// 客户端升级流场景,将Stream转换创建Http2MultiplexHandlerStreamChannel
ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
ch.closeOutbound();
} else {
//将Stream转换创建Http2MultiplexHandlerStreamChannel
ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
}
// 触发Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRegistered()方法
ChannelFuture future = ctx.channel().eventLoop().register(ch);
if (future.isDone()) {
registerDone(future);
} else {
future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
}
break;
case CLOSED:
AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
if (channel != null) {
channel.streamClosed();
}
break;
default:
// ignore for now
break;
}
}
return;
}
ctx.fireUserEventTriggered(evt);
}
接收并处理Http2HeadersFrame消息
最终Http2Headers对象会转换为DefaultHttp2HeadersFrame对象,经由Http2MultiplexHandler.channelRead()的处理,调用AbstractHttp2StreamChannel.pipeline的fireChannelRead()方法,DefaultHttp2HeadersFrame对象对象做为参数,执行 AbstractHttp2StreamChannel.pipeline中ChannelInboundHandler的channelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换。
调用栈如下:
关键调用栈如下
Method | Desc | |
---|---|---|
1 | DefaultHttp2FrameReader$HeadersContinuation.processFragment() | 从IO中读取帧数据到添加到DefaultHttp2FrameReader$HeadersBlockBuilder.headerBlock中,当Header数据结束时,调用 Http2FrameListener.onHeadersRead()方法 |
2 | DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead() | 根据streamId从Http2Connection查找Http2Stream对象,如果没有,则执行Http2Connection.DefaultEndpoint.createStream()新建一个Http2Stream对象,并将其激活 (详情见上面的创建Http2Stream并触发ChannelHandlerContext.fireUserEventTriggered(Http2FrameStreamEvent))。再然后调用Http2EmptyDataFrameListener.onHeadersRead()方法向下执行 |
3 | Http2EmptyDataFrameListener.onHeadersRead() | 调用 Http2FrameCodec$FrameListener.onHeadersRead()方法 |
4 | Http2FrameCodec$FrameListener.onHeadersRead() | 将Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并调用onHttp2Frame()方法继续向下执行 |
5 | Http2FrameCodec.onHttp2Frame() | 将DefaultHttp2HeadersFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法 |
6 | AbstractChannelHandlerContext.fireChannelRead() | 触发DefaultChannelHandlerContext.invokeChannelRead()方法 |
7 | Http2MultiplexHandler.channelRead() | 将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换 |
8 | AbstractHttp2StreamChannel.fireChildRead() | |
9 | AbstractHttp2StreamChannel.Http2ChannelUnsafe.doRead0() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法 |
10 | DefaultChannelPipeline.fireChannelRead() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRead()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRead()方法 |
AbstractChannelHandlerContext.invokeChannelRead() | netty 逻辑 | |
AbstractChannelHandlerContext.fireChannelRead() | netty 逻辑 | |
AbstractChannelHandlerContext.invokeChannelRead() | netty 逻辑 | |
ChannelInboundHandlerAdapter.channelRead() | netty 逻辑 |
关键代码逻辑
DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead()相关代码逻辑:
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
boolean allowHalfClosedRemote = false;
if (stream == null && !connection.streamMayHaveExisted(streamId)) {、
//创建Http2Stream,并触发Http2MultiplexHandlerStreamChannel.pipeline的fireUserEventTriggered()和fireChannelRegistered()
stream = connection.remote().createStream(streamId, endOfStream);
// Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
}
...
stream.headersReceived(isInformational);
encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
//Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并将DefaultHttp2HeadersFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法,然后调用Http2MultiplexHandler.channelRead()方法
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream);
// If the headers completes this stream, close it.
if (endOfStream) {
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
}
}
Http2FrameCodec$FrameListener.onHeadersRead() 方法逻辑:
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) {
// 将Http2Headers对象转换为DefaultHttp2HeadersFrame对象,并调用onHttp2Frame()方法继续向下执行
onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
.stream(requireStream(streamId)));
}
Http2FrameCodec.onHttp2Frame()方法逻辑:
void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
ctx.fireChannelRead(frame);
}
Http2MultiplexHandler.channelRead()方法逻辑:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
parentReadInProgress = true;
if (msg instanceof Http2StreamFrame) {
if (msg instanceof Http2WindowUpdateFrame) {
// We dont want to propagate update frames to the user
return;
}
Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
DefaultHttp2FrameStream s =
(DefaultHttp2FrameStream) streamFrame.stream();
AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
if (msg instanceof Http2ResetFrame) {
// Reset frames needs to be propagated via user events as these are not flow-controlled and so
// must not be controlled by suppressing channel.read() on the child channel.
channel.pipeline().fireUserEventTriggered(msg);
// RST frames will also trigger closing of the streams which then will call
// AbstractHttp2StreamChannel.streamClosed()
} else {
// 将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。
//完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换
channel.fireChildRead(streamFrame);
}
return;
}
if (msg instanceof Http2GoAwayFrame) {
// goaway frames will also trigger closing of the streams which then will call
// AbstractHttp2StreamChannel.streamClosed()
onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
}
// Send everything down the pipeline
ctx.fireChannelRead(msg);
}
处理Data帧
HttpFrameCodec读取Data帧数据跟的逻辑跟读取Header帧数据的逻辑非常相似,不同的有几点:
- Data帧没有Continuation帧需要处理
- Data帧不会创建Http2Stream使用,而是使用之前处理Header帧时已经创建好的Http2Stream对象
- Data帧的数据读取后,不需要解码,直接包装成往下传DefaultHttp2DataFrame对象往下传递
调用栈如下:
跟读取Header帧和Continuation帧一样,读取入口也是在DefaultHttp2FrameReader.readFrame()方法中。主要代码逻辑如下:
public void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)
throws Http2Exception {
if (readError) {
input.skipBytes(input.readableBytes());
return;
}
try {
do {
if (readingHeaders) {
//校验
processHeaderState(input);
if (readingHeaders) {
// Wait until the entire header has arrived.
return;
}
}
// the first pass at payload processing now.
//读取
processPayloadState(ctx, input, listener);
if (!readingHeaders) {
// Wait until the entire payload has arrived.
return;
}
} while (input.isReadable());
} catch (Http2Exception e) {
readError = !Http2Exception.isStreamError(e);
throw e;
} catch (RuntimeException e) {
readError = true;
throw e;
} catch (Throwable cause) {
readError = true;
PlatformDependent.throwException(cause);
}
}
private void processHeaderState(ByteBuf in) throws Http2Exception {
if (in.readableBytes() < FRAME_HEADER_LENGTH) {
// Wait until the entire frame header has been read.
return;
}
// Read the header and prepare the unmarshaller to read the frame.
payloadLength = in.readUnsignedMedium();
if (payloadLength > maxFrameSize) {
throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
maxFrameSize);
}
frameType = in.readByte();
flags = new Http2Flags(in.readUnsignedByte());
streamId = readUnsignedInt(in);
// We have consumed the data, next time we read we will be expecting to read the frame payload.
readingHeaders = false;
switch (frameType) {
case DATA:
verifyDataFrame();
break;
...
default:
// Unknown frame type, could be an extension.
verifyUnknownFrame();
break;
}
}
private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
throws Http2Exception {
if (in.readableBytes() < payloadLength) {
// Wait until the entire payload has been read.
return;
}
// Only process up to payloadLength bytes.
int payloadEndIndex = in.readerIndex() + payloadLength;
readingHeaders = true;
// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
//读取Data帧数据
readDataFrame(ctx, in, payloadEndIndex, listener);
break;
...
}
in.readerIndex(payloadEndIndex);
}
//Data帧校验方法
private void verifyDataFrame() throws Http2Exception {
verifyAssociatedWithAStream();
verifyNotProcessingHeaders();
verifyPayloadLength(payloadLength);
if (payloadLength < flags.getPaddingPresenceFieldLength()) {
throw streamError(streamId, FRAME_SIZE_ERROR,
"Frame length %d too small.", payloadLength);
}
}
//Data帧数据读取方法
private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
int padding = readPadding(payload);
verifyPadding(padding);
// Determine how much data there is to read by removing the trailing
// padding.
int dataLength = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
//将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递
ByteBuf data = payload.readSlice(dataLength);
listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());
}
Data帧校验项
Item | Method | Exception |
---|---|---|
streamId是否正常(!=0) | verifyAssociatedWithAStream | 异常:streamId==0 |
headersContinuation 是否为空 | verifyNotProcessingHeaders | headersContinuation!=null,headersContinuation是处理continuation header帧的一种手段,出现headersContinuation!=null时,说明流的帧数据出现错乱 |
payloadLength<=maxFrameSize | verifyPayloadLength | 帧的大小超出了上限 |
payloadLength >= flags.getPaddingPresenceFieldLength() | 帧体长度太小 |
关键调用栈如下
Method | Desc | |
---|---|---|
1 | DefaultHttp2FrameReader.readDataFrame() | 将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递 |
2 | DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead() | |
3 | Http2FrameCodec$FrameListener.onDataRead() | 将ByteBuf类型的frame payload数据包装成DefaultHttp2DataFrame对象,向下传递执行 |
4 | Http2FrameCodec.onHttp2Frame() | 将DefaultHttp2DataFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法 |
5 | AbstractChannelHandlerContext.fireChannelRead() | 触发DefaultChannelHandlerContext.invokeChannelRead()方法 |
6 | Http2MultiplexHandler.channelRead() | 将Http2StreamFrame对象(DefaultHttp2HeadersFrame/DefaultHttp2DataFrame)传为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换 |
7 | AbstractHttp2StreamChannel.fireChildRead() | |
8 | AbstractHttp2StreamChannel.Http2ChannelUnsafe.doRead0() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRegistered()方法 |
9 | DefaultChannelPipeline.fireChannelRead() | 触发AbstractHttp2StreamChannel.pipeline 的fireChannelRead()方法,并执行第一个DefaultChannelPipeline.AbstractChannelHandlerContext的invokeChannelRead()方法 |
AbstractChannelHandlerContext.invokeChannelRead() | netty 逻辑 | |
AbstractChannelHandlerContext.fireChannelRead() | netty 逻辑 | |
AbstractChannelHandlerContext.invokeChannelRead() | netty 逻辑 | |
ChannelInboundHandlerAdapter.channelRead() | netty 逻辑 |
关键代码逻辑
DefaultHttp2FrameReader.readDataFrame():
private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
int padding = readPadding(payload);
verifyPadding(padding);
// Determine how much data there is to read by removing the trailing
// padding.
int dataLength = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
ByteBuf data = payload.readSlice(dataLength);
listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());
}
将frame payload部分的数据切片之后返回新的一个ByteBuf对象直接向下传递。
Http2FrameCodec$FrameListener.onDataRead() 方法的代码逻辑:
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) {
onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
.stream(requireStream(streamId)).retain());
// We return the bytes in consumeBytes() once the stream channel consumed the bytes.
return 0;
}
ByteBuf类型的frame payload数据包装成DefaultHttp2DataFrame对象,调用Http2FrameCodec.onHttp2Frame() 将DefaultHttp2DataFrame对象做为参数触发DefaultChannelHandlerContext.fireChannelRead()方法,这里的逻辑跟处理Header帧时一样都 是进入netty的逻辑了,会调用 io.netty.channel.ChannelHandlerContext.fireChannelRead(),直到运行到Http2MultiplexHandler.channelRead()方法中,根据Http2Stream拿到对应的Http2MultiplexHandlerStreamChannel对象,将DefaultHttp2DataFrame对象做为参数,执行Http2MultiplexHandler.Http2MultiplexHandlerStreamChannel.pipeline的fireChannelRead()方法。完成从NioSocketChannel.ChannelHandler到AbstractHttp2StreamChannel.ChannelHandler的转换。
参考
Netty对HPACK头部压缩的支持