上篇:Netty学习——源码篇6 Pipeline设计原理 已经知道AbstractChannelHandlerContext中有Inbound和Outbound两个boolean变量,分别用于识别Context所对应的Handler的类型。
1、Inbound为true时,表示其对应的ChannelHandler是ChannelInboundHandler的子类。
2、Outbound为true时,表示其对应的ChannelHandler是ChannelOutboundHandler的子类。
这两个属性到底有什么作用呢?还要从ChannelPipeline的事件传播类型说起。Netty中的传播事件可以分为两种:Inbound事件和Outbound事件。以下是Netty官网针对这两个事件的说明。
由上可以看出,Inbound和Outbound事件的流向是不一样的,Inbound事件的流向是从下至上的,而Outbound恰好相反,是从下到上。并且Inbound方法是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法来传递的,而Outbound方法是通过ChannelHandlerContext的fireChannelRegister()调用会发送一个ChannelRegistered的Inbound给下一个ChannelHandlerContext,而ChannelHandlerContext的bind()方法调用时会发送一个bind的Outbound事件给下一个ChannelHandlerContext。
Inbound事件传播方法代码如下:
public interface ChannelInboundHandler extends ChannelHandler {
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
Outbound事件传播方法的代码如下:
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a connect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a close operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
*
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error accour
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages
* that are pending.
*
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error accour
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
可以发现,Inbound类似于事件回调(响应请求的事件),而Outbound类似于主动触发(发起请求的事件)。注意,如果捕获了一个事件,并且想让这个事件继续传递下去,需要调用Context对应的fireXXX()方法。
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
System.out.println("连接成功");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception{
System.out.println("客户端关闭");
ctx.close(promise);
}
}
如上面的代码所示,MyInboundHandler收到了一个channelActive事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()方法。
下面用一个代码案例了解一下Pipeline的传播机制。分别编写InboundHandlerA、InboundHandlerB、InboundHandlerC和OutboundandlerA、OutboundandlerB、OutboundandlerC类。
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx,Object msg){
System.out.println("InboundHandlerA");
ctx.fireChannelRead(msg);
}
}
public class InboundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("InboundHandlerB");
ctx.fireChannelRead(msg);
}
}
public class InboundHandlerC extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
System.out.println("InboundHandlerC");
ctx.fireChannelRead(msg);
}
}
以上三个类都调用了ctx.fireChannelRead()方法向下传播。
public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
System.out.println("OutboundHandlerA write");
ctx.write(msg,promise);
}
}
public class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
System.out.println("OutboundHandlerB write");
ctx.write(msg,promise);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception{
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.channel().write("hello");
}
},3, TimeUnit.SECONDS);
}
}
public class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
System.out.println("OutboundHandlerC write");
ctx.write(msg,promise);
}
}
上面的三个类都调用了ctx.write()方法。下面编写测试代码,来了解其传播顺序。先编写服务端代码。PipelineServer类主要完成Pipeline的注册工作,代码如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class PipelineServer {
public void start(int port) throws Exception{
NioEventLoopGroup bossGroup= new NioEventLoopGroup();
NioEventLoopGroup workerGroup= new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
//InboundHandler的执行顺序,应该是 A B C
ch.pipeline().addLast(new InboundHandlerA());
ch.pipeline().addLast(new InboundHandlerB());
ch.pipeline().addLast(new InboundHandlerC());
//Outbound的执行顺序应该是C B A
ch.pipeline().addLast(new OutboundHandlerA());
ch.pipeline().addLast(new OutboundHandlerB());
ch.pipeline().addLast(new OutboundHandlerC());
}
}).option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
PipelineServer server = new PipelineServer();
server.start(8080);
}
}
PipelineClient类,与服务端建立连接并向服务端发送数据,代码如下:
public class PipelineClient {
public void connect(String host,int port) throws Exception{
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE,true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new ClientInhandler());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
PipelineClient client = new PipelineClient();
client.connect("127.0.0.1",8080);
}
}
ClientHandler类,完成向服务端发送数据的动作,代码如下:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ClientInhandler extends ChannelInboundHandlerAdapter {
//读取服务端的消息
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("clientInHandler.channelRead");
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
result.release();
ctx.close();
System.out.println("server infomation:" + new String(result1));
}
//当连接建立的时候向服务端发送消息,channelActive 事件在连接建立的时候会被触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
System.out.println("ClientHandler.channelActive");
String msg = "are you ok";
ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
encoded.writeBytes(msg.getBytes());
ctx.write(encoded);
ctx.flush();
}
}
接下来运行测试代码,分别启动PipelineServer和PipelineClient,得到的运行结果如下图:
从运行结果上看,Handler的传播顺序:从Inbound开始顺序执行,然后从Outbound逆序执行。