Netty源码解析——服务端启动
- Netty案例复习
- Netty原理复习
- Netty服务端启动源码解析
- bind(int)
- initAndRegister()
- channelFactory.newChannel()
- init(channel)
- config().group().register(channel)
- startThread()
- run()
- register0(ChannelPromise promise)
- doBind0(...)
今天我们一起来学习Netty源码,对Netty有一个深入的认知,既能掌握其使用和原理,又能对它底层的设计有一个大概的认知
Netty案例复习
在阅读源码前,我们再看一下Netty服务端的启动代码
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
- 我们创建了两个EventLoopGroup已经,然后设置到ServerBootstrap上,bossGroup中的EventLoop处理accept事件,workerGroup中的EventLoop处理read事件时。
- 再通过ServerBootstrap设置好一些其他的参数,其中包括Netty服务端接收连接请求用的NioServerSocketChannel,NioServerSocketChannel会注册到bossGroup上,监听accept事件
- 然后设置好ChannelInitializer,服务端接收到客户端连接之后,会创建一个NioSocketChannel,ChannelInitializer就是用于初始化连接建立成功后的NioSocketChannel
- 最后就是通过ServerBootstrap的bind(int inetPort)绑定一个端口
Netty原理复习
我们再来复习一下Netty的原理,带着对Netty原理的认知去看源码,效率才会高,不至于走偏。
- 当调用了ServerBootstrap的bind(int inetPort)绑定端口后,我们注册的NioServerSocketChannel就会被注册到bossGroup上的唯一一个NioEventLoop上,然后NioEventLoop会把ServerSocketChannel注册到Selector上监听accept事件,并启动事件循环
- 当有客户端连接后,boosGroup中的EventLoop会获得一个SocketChannel然后将其包装成NioSocketChannel,然后NioServerSocketChannel对应的ChannelHandler会把它注册到workerGroup中
- workerGroup会将NioSocketChannel注册到其中一个NioEventLoop上,并使用我们配置的ChannelInitializer初始化NioSocketChannel
- workerGroup中的NioEventLoop的事件循环中监听到Channel有read事件时发生,就会调用Channel对应的ChannelPipeline进行处理
- ChannelPipeline通过责任链模式,逐一调用pipeline上的ChannelHandler处理事件
Netty服务端启动源码解析
接下来我们就开始读源码了,我们从ServerBootstrap的bind方法走起。
bind(int)
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
bind方法的作用就是创建一个Channel并绑定端口,创建的Channel类型就是我们指定的NioServerSocketChannel。
bind方法会调用doBind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
doBind0(regFuture, channel, localAddress, promise);
...
}
省略了非关键代码后,剩下的就是关键的两行:
- initAndRegister()方法做的事情就是创建并初始化Channel然后注册到NioEventLoop上。
- doBind0(…)方法给NioServerSocockChannel中的ServerSocketChannel绑定端口。
initAndRegister()
我们进入initAndRegister()方法,同样是只看核心代码,其他的代码不关心。
final ChannelFuture initAndRegister() {
...
channel = channelFactory.newChannel();
init(channel);
...
ChannelFuture regFuture = config().group().register(channel);
...
}
- channelFactory.newChannel():反射调用NioServerSocketChannel的构造方法进行实例化。
- init(channel):为NioServerSocketChannel的ChannelPipeline添加ChannelInitializer。
- config().group().register(channel):把NioServerSocketChannel中的ServerSocketChannel注册到workerGroup的NioEventLoop的Selector上。
channelFactory.newChannel()
channelFactory.newChannel()会进入到ReflectiveChannelFactory的newChannel()方法,里面就是调用Constructor构造器的newInstance()方法反射实例化一个Channel,这个构造器就是NioServerSocketChannel的构造器。是在我们调用ServerBootstrap的channel(NioServerSocketChannel.class)方法时设置进去的。
我们进入NioServerSocketChannel的构造方法看看。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
newSocket(…)方法就是创建了一个NIO原生的ServerSocketChannel,然后进入重载的构造方法。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
继续调用父类的构造方法,可以看到指定了事件类型是accept事件。然后进入到父类AbstractNioChannel的构造方法。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
...
ch.configureBlocking(false);
...
}
AbstractNioChannel的构造方法除了继续调用父类的构造方法以外,还保存了ServerSocketChannel和关注的事件类型OP_ACCEPT,然后设置ServerSocketChannel为非阻塞。
继续进入父类的构造方法。
protected AbstractChannel(Channel parent) {
...
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
创建了一个unsafe对象,和一个ChannelPipeline。unsafe其实是Netty内部自己使用的一个工具类,先不用管,后面会看到怎么用,这里看看如何创建ChannelPipeline。
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
我们知道ChannelPipeline中的ChannelHandler都是包在ChannelHandlerContext里面的,而这里先创建了一个头部Context和尾部Context。
也就是这样:
channelFactory.newChannel()的大体逻辑如下:
init(channel)
我们回到initAndRegister()方法,再来看init方法。
@Override
void init(Channel channel) {
...
ChannelPipeline p = channel.pipeline();
...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
...
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法就是拿到NioServerSocketChannel的ChannelPipeline,往里面添加了一个ChannelInitializer,ChannelInitializer会在ServerSocketChannel被注册到Selector后,异步的给ChannelPipeline添加一个ServerBootstrapAcceptor,这个ServerBootstrapAcceptor是一个专门处理连接事件的Handler。当然ChannelInitializer还会从ChannelPipeline中把自己删除。
config().group().register(channel)
我们回到initAndRegister()方法,再往后看一下config().group().register(channel)。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
进入NioEventLoopGroup的父类MultithreadEventLoopGroup的register方法中。next()方法返回一个NioEventLoop,然后调用NioEventLoop的register(channel)方法。进入到NioEventLoop的父类SingleThreadEventLoop的register方法中。
@Override
public ChannelFuture register(final ChannelPromise promise) {
...
promise.channel().unsafe().register(this, promise);
...
}
promise.channel().unsafe()获取到NioServerSocketChannel中的Unsafe对象,然后调用Unsafe对象的register(…)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
Unsafe的register方法中,判断当前线程是否是eventLoop的线程,如果是,直接调用register0方法注册Channel,否则调用eventLoop的execute方法开启一个任务异步注册Channel。
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
...
}
...
}
execute方法首先调用addTask(task)方法,把这个任务放入taskQueue中。然后判断当选线程不是当前EventLoop的线程,因此调用startThread()开启EventLoop的线程,启动事件循环。
addTask方法就是把这个Runnable放入到NioEventLoop内部的队列当中taskQueue中,在NioEventLoop的事件循环的每一轮的最后,会处理taskQueue中的任务,我们就不细看了。我们接下来看一下startThread方法。
startThread()
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
...
doStartThread();
...
}
}
}
startThread方法先修改NioEventLoop的状态state为开启,防止再次调用startThread方法的时候重复开启线程。然后调用doStartThread()方法。
private void doStartThread() {
...
executor.execute(new Runnable() {
@Override
public void run() {
...
SingleThreadEventExecutor.this.run();
...
}
});
}
doStartThread()方法里面调用了executor的execute方法,异步执行SingleThreadEventExecutor.this.run()方法,这个run方法就会进入到NioEventLoop的run方法中,里面就是事件循环。
这个executor其实是一个单线程的线程池,可以看做就是NioEventLoop的唯一一个线程,startThread方法并没有再创建一个线程,而是往NioEventLoop预先创建好的线程提交一个任务,这个任务会启动NioEventLoop的事件循环。
run()
接下来进入NioEventLoop的run方法看看,所谓的事件循环是什么。
@Override
protected void run() {
...
for (;;) {
...
strategy = select(curDeadlineNanos);
if (strategy > 0) {
processSelectedKeys();
}
...
runAllTasks();
...
}
}
可以看到,就是先调用select方法,这个select方法自然是调用NioEventLoop的Selector的select方法,当然现在还没有任何Channel被注册,因此这里是不会select到东西的。然后processSelectedKeys()是处理所有就绪事件的方法,因为这里还没有Channel被注册,自然也没有就绪事件发生。因此最后只有runAllTasks()被执行,也就是获取taskQueue中刚刚放进去的任务,这个任务的执行就会触发ServerSocketChannel的注册。
runAlllTasks方法执行taskQueue中目前唯一一个的任务,也就是刚刚放进去的用于注册ServerSocketChannel的任务,这个任务会调用Unsafe的register0(ChannelPromise promise)方法。
register0(ChannelPromise promise)
private void register0(ChannelPromise promise) {
...
doRegister();
...
}
register0调用doRegister()方法,进入AbstractNioChannel的doRegister()方法。
@Override
protected void doRegister() throws Exception {
...
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
...
}
doRegister方法中把Channel注册到Selector的代码就是上面这一行,javaChannel()返回NIO原生的Channel类型,然后调用register方法,eventLoop().unwrappedSelector()返回的就是Selector,这个Selector作为register方法的参数,这样NIO的Channel就被注册到Selector中了,这里的Channel当然是ServerSocketChannel。
这里注册完ServerSocketChannel之后,就会调用ChannelInitializer初始化ChannelPipeline。
doBind0(…)
我们回调doBind方法,initAndRegister()方法就已经看完了,再看下面的doBind0方法。
private static void doBind0(...,final Channel channel,...) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
...
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
...
}
});
}
dobind0()方法也是通过NioEventLoop执行异步任务的方式去绑定端口,channel.bind(localAddress, promise)方法最终会进入NioServerSocketChannel的doBind(SocketAddress localAddress)方法。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
然后就是调用ServerSocketChannel的bind(SocketAddress local, int backlog)方法绑定指定端口。
至此,Netty服务端就启动起来,下面是Netty服务端启动的源码流程图。
服务端启动之后,就可以接收并处理客户端的请求了,后续的流程就放到下一次再作分析。