1 Channel简介
在Netty中,Channel相当于一个Socket的抽象,它为用户提供了关于Socket状态(是连接还是断开)以及对Socket的读写等操作。每当Netty建立了一个连接,都创建一个与其对应的Channel实例。
除了TCP,Netty还支持很多其他的协议,并且每种协议还有NIO和OIO(传统的阻塞IO)版本的区别。不同协议不同阻塞类型的连接都有不同的Channel类型与之对应,下表对一些常用的Channel做了简单介绍。
来看一下Channel的总体类图,如下图:
2 NioSocketChannel的创建
Bootstrap是Netty提供的一个便利的工厂类,可以通过它来完成客户端或者服务端的Netty的初始化。
首先,从客户端开始分析。
public class ChatClient {
public ChatClient connect(int port,String host,final String name){
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("初始化channel:" + socketChannel);
}
});
//发起同步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
//关闭,释放线程资源
group.shutdownGracefully();
}
return this;
}
public static void main(String[] args) {
new ChatClient().connect(8080,"127.0.0.1","jay");
}
}
分析如下:
1、EventLoopGroup:不论是服务端还是客户端,都必须指定EventLoopGroup。在本例中,指定了NioEventLoopGroup,表示一个NIO的EventLoopGroup。
2、ChannelType:指定Channel的类型。因为是客户端,所以使用了NioSocketChannel。
3、Handler:设置处理数据的Handler。
客户端启动Bootstrap后都做了哪些工作?看一下NioSocketChannel的类层次结构图,如下:
回到在客户端连接代码的初始化Bootstrap中,该方法调用了一个channel方法,传入的参数是NioSocketChannel.class,在这个方法中其实就是初始化了一个ReflectiveChannelFactory的对象,代码实现如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
}
}
而ReflectiveChannelFactory实现了ChannelFactory接口,它提供了唯一的方法,即newChannel方法。顾名思义,ChannelFactory就是创建Channel的工厂类。进入ReflectiveChannelFactory的newChannel方法,其实现代码如下:
public T newChannel() {
try {
return (Channel)this.clazz.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
}
}
根据上面的代码,可以得出以下结论。
1、Bootstrap中的ChannelFactory实现类是ReflectiveChannelFactory。
2、通过channel方法创建的Channel具体类型是NioSocketChannel。
Channel的实例化过程其实就是调用ChannelFactory的newChannel方法,而实例化的Channel具体类型又和初始化Bootstrap时传入的channel方法的参数有关。因此对于客户端的Bootstrap而言,创建的Channel实例就是NioSocketChannel。
3 客户端Channel初始化
上面提到了如何设置一个Channel的类型,并且了解到Channel是通过ChannelFactory的newChannel方法来实例化的,那么ChannelFactory的newChannel方法在哪里调用呢?其调用链路如下图:
在AbstractBootstrap的initAndRegister方法中,调用ChannelFactory的newChannel方法来创建一个NioSocketChannel的实例,代码如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
在newChannel方法中,利用反射机制调用类对象newInstance()方法来创建一个新的Channel实例,相当于调用NioSocketChannel的默认构造方法。 NioSocketChannel默认的构造方法代码如下:
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
这里的代码比较关键,可以看到,在这个构造器中首先会调用newSocket()方法来打开一个新的Java NIO的SocketChannel对象。
private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a socket.", var2);
}
}
然后调用父类,即AbstractNioByteChannel构造器.
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, 1);
}
同时,传入参数,parent的值默认为null,ch为之前调用newSocket()方法创建的Java NIO的SocketChannel对象,因此新创建的NioSocketChannel对象中的parent暂时是null。接着会调用父类的AbstractNioChannel构造器,并传入实际参数readInterestOp=SelectionKey.OP_READ。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException var7) {
try {
ch.close();
} catch (IOException var6) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", var6);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", var7);
}
}
最后会调用父类AbstractNioChannel的构造器。
至此,NioSocketChannel就完成了初始化,总结一下NioSocketChannel初始化所做的流程:
1、调用NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的Java NioSocketChannel。
2、初始化AbstractChannel对象并给属性赋值,具体赋值的属性如下:
(1)id:每个Channel都会被分配一个唯一的id。
(2)parent:属性值默认为null。
(3)unsafe:通过调用newUnsafe()方法实例化一个Unsafe对象,它的类型是AbsractNioByteChannel.NioByteUnsafe内部类。
(4)pipeline:是通过调用new DefaultChannelPipeline(this)新创建的实例。
3、AbstractNIOChannel中被赋值的属性如下:
(1)ch:被赋值为Java 原生SocketChannel,即NioSocketChannel的newSocket()方法返回的Java NIO SocketChannel。
(2)readInterestOp:被赋值为SelectionKey.OP_READ。
(3)ch:被配置为非阻塞,即调用ch.configureBlocking(false)方法
4、NioSocketChannel中被赋值的属性:config = new NioSocketChannelConfig(this,socket.socket())。
4 ChannelPipeline的初始化
上面在分析NioSocketChannel的初始化过程中,漏掉了一个关键的部分,即ChannelPipeline的初始化。在实例化一个Channel时,必须要实例化一个ChannelPipeline。在AbstractChannel的构造器中看到了Pipeline属性被初始化为DefaultChannelPipeline的实例。DefaultChannelPipeline构造器的代码如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
DefaultChannelPipeline 的构造器需要传入一个Channel,而这个Channel其实就是实例化的NioSocketChannel对象,DefaultChannelPipeline会将这个 NioSocketChannel对象保存在Channel属性中。DefaultChannelPipeline中还有两个属性是双向链表的头和尾,即Head和Tail。其实在DefaultChannelPipeline中维护了一个以AbstractChannelHandlerContext为节点元素的双向链表,这个链表是NEtty实现Pipeline机制的关键。先看HeadContext的继承层次结构,如下图所示:
TailContext的继承层次结构图如下:
可以看到,链表中Head是一个ChannelOutBoundHandler,而Tail是一个ChannelInBoundHandler。接着看HeadContext的构造器代码:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
this.unsafe = pipeline.channel().unsafe();
this.setAddComplete();
}
它调用了父类AbstractChannelHandlerContext的构造器,并传入擦承诺书inbound=false,outbound=true。而TailContext的构造器与HeadContext刚好相反。
5 EventLoop的初始化
回到最开始ChatClient用户代码中,一开始就实例化了一个NioEventLoopGrouop的对象,因此就从它的构造器中追踪EventLoop的初始化过程。首先来看NioEventLoopGrouop的类继承层次结构图:
NioEventLoop中有几个重载的构造器,不过内容都没有太大的区别,最终都调用父类MultithreadEventLoopGroup的构造器,代码如下:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
如果传入的线程数nThreads是0,那么Netty会设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而这个默认的线程数怎么确定的呢?首先确定 DEFAULT_EVENT_LOOP_THREADS的值
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
Netty首先从系统属性中获取“io.netty.eventLoopThreads”的值,如果没有设置,就返回默认值,即CPU核数*2。回到MultithreadEventLoopGroup构造器中会继续调用父类MultithreadEventExecutorGroup的构造器。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
this.terminatedChildren = new AtomicInteger();
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
} else {
if (executor == null) {
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
this.children = new EventExecutor[nThreads];
int j;
for(int i = 0; i < nThreads; ++i) {
boolean success = false;
boolean var18 = false;
try {
var18 = true;
this.children[i] = this.newChild((Executor)executor, args);
success = true;
var18 = false;
} catch (Exception var19) {
throw new IllegalStateException("failed to create a child event loop", var19);
} finally {
if (var18) {
if (!success) {
int j;
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var20) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
if (!success) {
for(j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var22) {
Thread.currentThread().interrupt();
break;
}
}
}
}
this.chooser = chooserFactory.newChooser(this.children);
FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
}
}
};
EventExecutor[] arr$ = this.children;
j = arr$.length;
for(int i$ = 0; i$ < j; ++i$) {
EventExecutor e = arr$[i$];
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
Collections.addAll(childrenSet, this.children);
this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
继续进入newChooser方法查看实现逻辑,代码如下DefaultEventExecutorChooserFactory:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() {
}
public EventExecutorChooser newChooser(EventExecutor[] executors) {
return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors));
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
}
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
public EventExecutor next() {
return this.executors[this.idx.getAndIncrement() & this.executors.length - 1];
}
}
}
上面代码主要表达的意思是:如果nThreads是2的平方,则使用PowerOfTowEventExecutorChooser,否则使用GenericEventExecutorChooser。这两个Chooser都重写next()方法。next()方法的主要功能就是将数组索引循环位移,如下图所示:
当索引移动到最后一个位置时,再调用next方法就会将索引位置重新指向0,如下图所示:
分析到这里,已经非常清楚 MultithreadEventLoopGroup中的处理逻辑,简单总结如下:
1、创建一个大小为nThreads的SingleThreadEventExecutor数组。
2、根据nThreads的大小,创建不同的Chooser,如果nThreads是2的平方,则使用PowerOfTowEventExecutorChooser,否则使用GenericEventExecutorChooser。不论使用哪个Chooser,它们的功能都是一样的,即从children数组中选出一个合适的EventExecutor实例。
3、调用newChild()方法初始化children数组。
根据上面代码,知道了MultithreadEventLoopGroup内部维护了一个EventExecutor数组,而Netty的EventLoopGroup的实现机制其实就建立在MultithreadEventLoopGroup之上。每当Netty需要一个EventLoop时,都会调用next方法获取一个可用的EventLoop。
6 将Channel注册到Selector
前面提到Channel会在Bootstrap的initAndRegister中进行初始化,但是这个方法还会将初始化好的channel注册到NioEventLoop的Selector中。
当Channel初始化后,紧接着会调用group().register()方法来向Selector注册Channel。继续跟踪,会发现其调用链路如下图所示:
通过跟踪链路,最终发现在AbstractBootstrap的initAndRegister方法中调用的是Unsafe的register方法,接下来看一下AbstractChannel代码:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
} else if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var4);
}
}
}
}
首先,将EventLoop赋值给Channel的eventLoop属性,我们直到EventLoop对象其实是通过MultithreadEventLoopGroup的next方法获取的,根据前面的分析,可以确定next方法返回的eventLoop对象是NioEventLoop实例。register方法接着调用了register0方法,代码如下:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
return;
}
boolean firstRegistration = this.neverRegistered;
AbstractChannel.this.doRegister();
this.neverRegistered = false;
AbstractChannel.this.registered = true;
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive();
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
} catch (Throwable var3) {
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var3);
}
}
register0方法又调用了AbsractNioChannel的doRegister方法,代码如下:
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
看到javaChannel()这个方法,我们在前面就知道,它返回的是一个java NIO的SocketChannel对象。到了这里,就将SocketChannel注册到与eventLoop关联的Selector上了。
总结一下Channel的注册过程,具体如下:
1、在AbstractBootstrap的initAndRegister()方法中,通过group().register(channel)调用MultithreadEventLoopGroup的register()方法。
2、在MultithreadEventLoopGroup的register()方法中,调用next方法获取一个可用的SingleThreadEventLoop,然后调用它的register方法。
3、在SingleThreadEventLoop的register方法中,调用channel.unsafe().register(this,promise)方法获取Channel的unsafe()底层操作对象,然后调用Unsafe的register方法。
4、在AbstractUnsafe的register方法中,调用register0方法注册到Channel对象。
5、在AbstractUnsafe的register0方法中,调用AbstractNioChannel的doRegister方法。
6、AbstractNioChannel的doRegiter方法通过javaChannel.register(eventLoop().selector,0,this)将Channel对应的Java NIO的SocketChannel注册到一个eventLoop的Selector中,并且将当前Channel作为Attachment与SocketChannel关联。
总的来说,Channel的注册过程所做的工作就是将Channel与对应的EventLoop进行关联。因此,在Netty中,每个Channel都会关联一个特定的EventLoop,并且这个Channel中的所有I/O操作都是在这个EventLoop中执行的;当关联好Channel和EventLoop后,会继续调用底层的Java NIO的SocketChannel对象的register方法,将底层Java NIO的SocketChannel注册到指定的Selector中。通过这两步,就完成了Netty对Channel的注册过程。