1 介绍
在分析客户端的代码中,已经对Bootstrap启动Netty有了一个大致的认识,接下来在分析服务端时,就会相对简单。先看一下服务端简单的启动代码。
public class ChatServer {
public void start(int port) throws Exception{
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workersGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workersGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//业务代码
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
System.out.println("服务已启动,监听端口是:" + port);
//绑定端口
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器socket关闭
channelFuture.channel().closeFuture().sync();
}finally {
workersGroup.shutdownGracefully();
boosGroup.shutdownGracefully();
System.out.println("服务已关闭");
}
}
public static void main(String[] args) {
try {
new ChatServer().start(8080);
}catch (Exception e){
e.printStackTrace();
}
}
}
服务端基本写法跟客户端相比,差别不大,基本也是进行几个部分的初始化。
1、Event:无论是客户端还是服务端,都必须指定EventLoopGroup。在上面的代码中,指定了NioEventLoopGroup,表示一个NIO的EventLoopGroup,不过服务端需要指定两个EventLoopGroup,一个是boosGroup,用户处理客户端的连接请求;一个是workerGroup,用于处理与各个客户端连接的I/O操作。
2、指定Channel的类型。这里是服务端,所以使用了NioServerSocketChannel。
3、配置自定义的业务处理器Handler。
2、NioServerSocketChannel的创建
在分析客户端Channel的初始化过程时已经提到,Channel是对Java 底层Socket连接的抽象,并且知道客户端Channel的具体类型是NioSocketChannel,由此可知,服务端Channel的类型就是NioServerSocketChannel。
在客户端中,Channel类型的指定是在初始化时通过Bootstrap的channel的方法设置的,服务端也是同样的方式。
再看服务端代码,调用ServerBootstrap的channel(NioServerSocketChannel.class)方法,传入的参数是NioServerSocketChannel对象。可以确定NioServerSocketChannel的实例化是通过ReflectiveChannelFactory工厂类来完成的,而ReflectiveChannelFactory中的clazz属性被赋值为NioServerSocketChannel.class,因此当调用ReflectiveChannelFactory的newChannel方法时,就能获取一个NioServerSocketChannel的实例。newChannel方法的代码如下:
public T newChannel() {
try {
return (Channel)this.clazz.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
}
}
总结一下。
1、ServerBootstrap中的ChannelFactory的实现类是 ReflectiveChannelFactory类。
2、创建Channel具体类型是NioServerSocketChannel。
Channel的实例化过程,其实就是调用ChannelFactory的newChannel方法,而实例化的Channel具体类型就是初始化ServerBootstrap时传给channel方法的实参。因此,上面代码案例中的服务端ServerBootstrap创建的Channel实例就是NioServerSocketChannel的实例。
3 服务端Channel的实例化
下面分析NioServerSocketChannel的实例化过程,先看一下NioServerSocketChannel的类层次结构图。
首先,来看一下NioServerSocketChannel的默认构造器。与NioSocketChannel类似,构造器都是调用newSocket方法来打开一个Java NIO Socket。不过需要注意的是,客户端的newSocket方法调用的是openSocketChannel,而服务端的newSocket调用的是openServerSocketChannel。。顾名思义,一个是客户端的Java SocketChannel,一个是服务端的Java ServerSocketChannel。代码如下:
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
然后调用重载构造方法,代码如下:
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
super((Channel)null, channel, 16);
this.config = new NioServerSocketChannelConfig(this, this.javaChannel().socket());
}
在这个构造方法中,调用父类构造方法时传入的参数是SelectionKey.OP_ACCEPT。作为对比回顾一下,在客户端Channel初始化时,传入的参数是SelectionKey.OP_READ。在服务启动后需要监听客户端的连接请求,因此在这里设置SelectionKey.OP_ACCEPT,也就是通知Selector监听客户端的连接请求。
接下俩,和客户单对比分析,逐级调用父类的构造方法,首先调用NioServerSocketChannel的构造器,其次调用AbstractNioMessageChannel的构造器,最后调用AbstractChannel的构造器。同样的,在AbstractChannel中实例化一个Unsafe和Pipeline,代码如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
不过这里需要注意的是,客户端的Unsafe是AbstractNioByteChannel.NioByteUnsafe的实例,而服务端的Unsafe是AbstractNioMessageChannel.AbstractNioUnsafe的实例。 AbstractNioMessageChannel重写了newUnsafe方法,代码如下:
protected AbstractNioChannel.AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
总结一下在NioServerSocketChannel实例化过程中执行的逻辑。
1、调用NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 方法创建一个新的Java NIO原生的ServerSocketChannel对象。
2、实例化AbstractChannel对象并给属性赋值,具体赋值属性如下:
(1)parent:设置为默认null
(2)unsafe:通过调用newUnsafe方法,实例化一个Unsafe对象,其类型是AbstractNioMessageChannel.AbstractNioUnsafe内部类。
(3)pipeline:赋值的是DefaultChannelPipeline的实例。
3、实例化AbstractNioChannel对象并给属性赋值,具体赋值的属性如下:
(1)ch:被赋值为Java NIO原生的ServerSocketChannel对象,通过调用NioSERverSocketChannel的newSocket方法获取。
(2)readInterestOp:被赋值为默认值SelectionKey.OP_ACCEPT。
(3)ch:被设置为非阻塞,也就是调用ch.configureBlocking(false)方法。
4、给NioServerSocketChannel对象的config属性赋值为new NioServerSocketChannelConfig(this,javaChannel().socket())。
4 ChannelPipeline初始化与Channel注册到Selector
与客户端相同,参考:Netty学习——源码篇2 客户端Bootstrap(一)
5 bossGroup与workerGroup
在客户端初始化的时候,初始化了一个EventLoopGroup对象,而在服务端初始化的时候,设置了两个EventLoopGroup:一个是bossGroup,另一是workerGroup。这两个EventLoopGroup的作用是什么?
其实,bossGroup只用于服务端的accept,也就是用户处理客户端新连接接入的请求。可以把Netty比做一个饭店,bossGroup就像一个大堂经理,当客户来吃饭时,大堂经理就会引导客户就做。而workerGroup就像实际干活的厨师,客户可以稍作休息,而此时厨师(workerGroup)就开始工作了。bossGroup与workerGroup的关系如下图:
首先,服务端的bossGroup不断的监听是否有客户端的连接,当发现有一个新的客户端连接到来时,bossGroup就会为此连接初始化各项资源;然后,从workerGroup中选出一个EventLoop绑定到此客户端连接中;接下来,服务端与客户端的交互过程将全部在此分配的EventLoop中完成。
在ServerBootstrap初始化时调用了bootstrap.group(bossGroup,workerGroup),并设置了两个EventLoopGroup,代码如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
} else if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}
显然,这个方法初始化了两个属性,一个是group = parentGroup,它是在super.group(parentGroup)中完成初始化的。另一个是this.childGroup = childGroup。接着从应用程序的启动代码看,调用了bootstrap.bind()方法来监听一个本地端口,bind方法会触发如下调用链:
AbstractBootstrap.bind()->AbstractBootstrap.doBind()->initAndRegister()
代码看到这里,发现对于AbstractBootstrap的initAndRegister方法已经很熟悉了,再来看一下这个方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这里的group()方法返回的是上面提到的bossGroup,而这里的Channel其实就是NioServerSocketChannel的实例,因此可以猜测group.register(channel)将bossGroup和NioServerSocketChannel关联起来。那么workerGroup具体是在哪里与NioServerSocketChannel关联的呢?继续看init(channel)方法:
void init(Channel channel) throws Exception {
Map<ChannelOption<?>, Object> options = this.options0();
synchronized(options) {
channel.config().setOptions(options);
}
Map<AttributeKey<?>, Object> attrs = this.attrs0();
synchronized(attrs) {
Iterator i$ = attrs.entrySet().iterator();
while(true) {
if (!i$.hasNext()) {
break;
}
Map.Entry<AttributeKey<?>, Object> e = (Map.Entry)i$.next();
AttributeKey<Object> key = (AttributeKey)e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = this.childGroup;
final ChannelHandler currentChildHandler = this.childHandler;
final Map.Entry[] currentChildOptions;
synchronized(this.childOptions) {
currentChildOptions = (Map.Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));
}
final Map.Entry[] currentChildAttrs;
synchronized(this.childAttrs) {
currentChildAttrs = (Map.Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));
}
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}
实际上,init()方法在ServerBootstrap中被重写了,从上面的代码中看到,它为Pipeline添加了一个ChannelInitializer,而这个ChannelInitializer中添加了一个非常关键的ServerBootstrapAccept的Handler。先来关注ServerBootstrapAcceptor类。在ServerBootstrapAcceptor中重写了channelRead()方法,其代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel)msg;
child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
Map.Entry[] arr$ = this.childOptions;
int len$ = arr$.length;
int i$;
Map.Entry e;
for(i$ = 0; i$ < len$; ++i$) {
e = arr$[i$];
try {
if (!child.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
ServerBootstrap.logger.warn("Unknown channel option: " + e);
}
} catch (Throwable var10) {
ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10);
}
}
arr$ = this.childAttrs;
len$ = arr$.length;
for(i$ = 0; i$ < len$; ++i$) {
e = arr$[i$];
child.attr((AttributeKey)e.getKey()).set(e.getValue());
}
try {
this.childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());
}
}
});
} catch (Throwable var9) {
forceClose(child, var9);
}
}
ServerBootstrapAcceptor中的childGroup是构造此对象时传入的currentChildGroup,也就是workerGroup对象。而这里的Channel是NioSocketChannel的实例,因此childGroup的register方法就是将workerGroup中的某个EventLoop和NioSocketChannel关联。那么,ServerBootstrapAcceptor的channelRead()方法是在哪里被调用的呢?其实当一个Client连接到Server时,Java 底层 NIO的ServerSocketChannel就会有一个SelectionKe.OP_ACCEPT的事件就绪,接着会调用NioServerSocketChannel的doReadMessage方法,代码如下:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = this.javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);
try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}
return 0;
}
在doReadMessage方法中,通过调用javaChannel().accept方法获取客户端新连接的SocketChannel对象,紧接着实例化一个NioSocketChannel,并且传入NioServerSocketChannel对象。由此可知,创建的NioSocketChannel的父类Channel就是NioServerSocketChannel实例。接下来利用Netty的ChannelPipeline机制,将读取时间逐级发送给各个Handler中,于是就会触发ServerBootstrapAccept的channelRead()方法。
6 服务端Selector事件轮询
回到服务端ServerBootstrap的启动代码,它是从bind方法开始。ServerBootstrap的bind方法实际上就是其父类AbstractBootstrap的bind方法,来看代码:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在doBind0方法中,调用EventLoop的execute方法,代码如下:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
execute()方法主要就是创建线程,将线程添加到EventLoop的无锁化串行任务队列。重点关注startThread()方法,代码如下:
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
发现startThread方法最终调用的就是SingleThreadEventExecutor.this.run()方法,这个this就是NioEventLoop对象,代码如下:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
上面代码主要就是用一个死循环不断地轮询SelectionKey。select()方法主要用来解决JDK空轮询bug,而processSelectedKeys就是针对不同的轮询事件进行处理。如果客户端有数据写入,最终也会调用AbstractNioMessageChannel的doReadMessages方法。下面总结一下Selector的轮询流程。
1、Selector事件轮询是从EventLoop的execute方法开始的。
2、在EventLoop 的execute方法中,会为每一个任务都创建一个独立的线程,并保存到无锁化串行任务队列。
3、线程任务队列的每个任务实际调用的是NioEventLoop的run方法。
4、在run方法中调用processSelectedKeys处理轮询事件。