文章目录
- Pre
- 问题说明
- NIO Code
- Netty是如何解决的?
- 源码分析
- 入口
- 源码分析
- selectCnt
- selectRebuildSelector
Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty Review - 服务端channel注册流程源码解析
问题说明
NIO空轮询(Empty Polling)是指在使用Java NIO 时,当Selector上注册的Channel没有就绪事件时,Selector.select()方法会返回0,但该方法会导致CPU空转,因为它会不断地调用操作系统的底层select系统调用。这种现象被称为NIO空轮询的bug。
NIO空轮询的问题源于Java NIO的Selector(选择器)机制。在NIO中,Selector负责监视多个Channel的事件,当某个Channel有事件发生时,Selector会将该Channel的就绪事件返回给应用程序进行处理。但是,如果Selector的select方法返回0,表示当前没有任何Channel处于就绪状态,此时,如果应用程序不进行任何处理,就会导致空轮询。
在早期版本的JDK中,Java NIO的实现对于空轮询问题没有进行有效的处理,导致在高并发、高负载的网络应用中,会造成CPU资源的浪费。空轮询问题的存在会降低系统的性能,并可能引发系统负载过高、响应缓慢等问题。
因此,对于网络应用来说,解决NIO空轮询的问题是非常重要的。后续版本的JDK和一些框架(比如Netty)针对这一问题进行了优化和改进,采取了一些措施来有效地避免空轮询,提高了系统的性能和稳定性。
在Netty中,通过使用基于事件驱动的模型,避免了空轮询的问题。Netty使用了单线程模型,基于事件循环(EventLoop)处理所有的I/O事件,而不是像原生的Java NIO那样在应用程序中频繁地进行轮询。这种基于事件驱动的模型能够更加高效地处理大量的并发连接,并且减少了CPU资源的浪费。
NIO Code
public class NioSelectorServer {
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
SelectionKey selKey = socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) { // 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
正常情况下
// 阻塞等待需要处理的事件发生
selector.select();
这里在没有事件的情况下会阻塞的,但有些特殊的情况下不会阻塞住,导致整个while(true)
一直成立 , 嗷嗷叫 ,CPU 100%。
Netty是如何解决的?
当使用Java NIO进行网络编程时,通常会使用Selector来监听多个Channel的I/O事件。Selector会不断地轮询所有注册的Channel,以检查是否有就绪的事件需要处理。但是,在某些情况下,由于操作系统或者底层网络实现的限制,Selector可能会出现空轮询的情况,即Selector不断地被唤醒,但没有任何就绪的事件,这会导致CPU资源的浪费。
Netty针对这个问题采取了一系列的优化和解决方案:
-
事件驱动模型:Netty采用了基于事件驱动的模型,所有的I/O操作都被视为事件,由事件循环(EventLoop)负责处理。事件循环会将就绪的事件放入队列中,然后按照顺序处理这些事件,避免了空轮询。
-
选择合适的Selector策略:Netty在不同的操作系统上使用不同的Selector实现,以获得最佳的性能和可靠性。例如,在Linux系统上,Netty使用epoll作为默认的Selector实现,而不是传统的Selector。epoll具有更好的扩展性和性能,并且不容易出现空轮询的问题。
-
自适应阻塞:Netty引入了自适应阻塞的概念,可以根据当前的负载情况自动调整阻塞和非阻塞的策略。这样可以使得Netty在不同的网络环境和负载下都能够表现出良好的性能。
通过以上优化和解决方案,Netty能够有效地避免NIO空轮询的问题,提高了系统的性能和可靠性,特别是在高并发的网络应用场景下。
源码分析
入口
我们根据我们画的Netty线程模型源码图里 找到入口
源码分析
io.netty.channel.nio.NioEventLoop
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector; // 获取当前NioEventLoop的Selector
try {
int selectCnt = 0; // 记录select操作的次数
long currentTimeNanos = System.nanoTime(); // 当前时间(纳秒)
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 计算选择器的超时时间
for (;;) { // 进入循环,不断执行select操作
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 计算超时时间(毫秒)
if (timeoutMillis <= 0) { // 如果超时时间小于等于0
if (selectCnt == 0) { // 如果select操作次数为0
selector.selectNow(); // 立即执行非阻塞的select操作
selectCnt = 1; // 将select操作次数设置为1
}
break; // 跳出循环
}
// 如果有任务且需要唤醒Selector,则立即执行非阻塞的select操作
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 执行阻塞的select操作,等待就绪事件的发生,超时时间为timeoutMillis
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++; // 增加select操作次数
// 如果选择到了就绪事件,或者已经被唤醒,或者有任务等待处理,或者有定时任务待执行,则跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 如果线程被中断,则重置select操作次数并跳出循环
if (Thread.interrupted()) {
selectCnt = 1;
break;
}
long time = System.nanoTime();
// 如果超时时间已经过去,并且仍然没有选择到就绪事件,则将select操作次数设置为1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 如果select操作次数达到了重建Selector的阈值,则重建Selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1; // 重置select操作次数
break;
}
currentTimeNanos = time; // 更新当前时间
}
// 如果select操作次数大于最小的不完整的select操作次数,则输出日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
// 忽略取消键异常,因为它不会对程序执行造成实质影响
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}
这段代码主要实现了对Selector的select操作的调度和控制,确保了在不同的情况下都能够正常执行select操作,并且针对一些特殊情况进行了处理和优化。
selectCnt
我们来总结一下:
-
selectCnt
用于记录 select 操作的次数。在循环中,每次执行一次 select 操作,都会增加selectCnt
的值。它主要用于以下几个方面:- 控制是否执行阻塞的 select 操作。
- 在一些特殊情况下,如线程中断、超时等,重置
selectCnt
的值,以便重新执行 select 操作。
-
selectRebuildSelector
方法用于重建 Selector。当selectCnt
达到某个阈值(SELECTOR_AUTO_REBUILD_THRESHOLD
),表明连续多次 select 操作未返回任何事件,可能存在 Selector 内部状态异常。为了解决这个问题,会调用selectRebuildSelector
方法重建 Selector。重建 Selector 的目的是确保 Selector 内部状态的一致性和正确性,从而避免空轮询等问题的发生。
selectRebuildSelector
// 如果超时时间已经过去,并且仍然没有选择到就绪事件,则将select操作次数设置为1
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 如果select操作次数达到了重建Selector的阈值,则重建Selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1; // 重置select操作次数
break;
}
重建Selector,就意味着,要把旧的Selector上关注的Channel迁移到新的Selector上来.
下面这段代码是用于重建 Selector 的方法 selectRebuildSelector
。
private Selector selectRebuildSelector(int selectCnt) throws IOException {
// Selector 连续多次返回了空结果,可能存在问题,因此需要重建 Selector。
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 重新构建 Selector。
rebuildSelector();
// 获取重建后的 Selector。
Selector selector = this.selector;
// 再次执行 select 操作,以填充 selectedKeys。
selector.selectNow();
return selector;
}
这段代码
- 首先记录了 Selector 连续多次返回空结果的次数,并发出警告日志。
- 然后调用
rebuildSelector()
方法重新构建 Selector。 - 重建完成后,再次执行
selectNow()
方法进行一次非阻塞的 select 操作,以填充selectedKeys
集合,并返回重建后的 Selector。
这样做的目的是为了尽快恢复 Selector 的正常工作状态,避免因连续空轮询导致的性能问题。
这段代码实现了重建 Selector 的方法 rebuildSelector()
。
/**
* 用于替换当前事件循环的 Selector,以新创建的 Selector 来解决臭名昭著的 epoll 100% CPU bug。
*/
public void rebuildSelector() {
// 如果不在事件循环中,则提交任务到事件循环中执行 rebuildSelector0() 方法。
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
// 如果在事件循环中,则直接调用 rebuildSelector0() 方法。
rebuildSelector0();
}
这段代码首先判断当前线程是否在事件循环中。如果不在事件循环中,则通过 execute()
方法将任务提交到事件循环中执行,确保在事件循环线程中执行 rebuildSelector0()
方法。如果已经在事件循环中,则直接调用 rebuildSelector0()
方法进行 Selector 的重建。
这样做的目的是为了确保在事件循环线程中执行 Selector 的重建操作,避免多线程并发访问导致的线程安全问题。
这段代码实现了 Selector 的重建操作 rebuildSelector0()
。
private void rebuildSelector0() {
// 保存旧的 Selector 引用
final Selector oldSelector = selector;
// 新的 SelectorTuple 对象
final SelectorTuple newSelectorTuple;
// 如果旧的 Selector 为 null,则直接返回
if (oldSelector == null) {
return;
}
try {
// 创建一个新的 SelectorTuple 对象
newSelectorTuple = openSelector();
} catch (Exception e) {
// 如果创建新 Selector 失败,则记录日志并返回
logger.warn("Failed to create a new Selector.", e);
return;
}
// 记录迁移的 Channel 数量
int nChannels = 0;
// 遍历旧 Selector 的所有 SelectionKey
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
// 如果 SelectionKey 无效,或者对应的 Channel 已经注册到新的 Selector 上,则跳过
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
// 获取原来的 SelectionKey 的感兴趣事件,并取消旧的 SelectionKey
int interestOps = key.interestOps();
key.cancel();
// 将 Channel 注册到新的 Selector 上,并保持感兴趣事件不变
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// 如果是 AbstractNioChannel 类型的 Attachment,更新其 SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
// 如果注册失败,记录日志,并关闭对应的 Channel
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
// 更新当前 EventLoop 的 Selector 引用为新的 Selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 关闭旧的 Selector
oldSelector.close();
} catch (Throwable t) {
// 如果关闭旧 Selector 失败,记录日志,但不影响继续执行
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
// 记录迁移完成的 Channel 数量
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
这段代码首先尝试创建一个新的 Selector,并遍历旧的 Selector 上的所有注册的 Channel。对于每个 Channel,取消其在旧 Selector 上的注册,然后重新在新的 Selector 上注册,并保持感兴趣的事件不变。如果注册失败,记录日志并关闭对应的 Channel。最后关闭旧的 Selector,更新当前 EventLoop 的 Selector 引用为新的 Selector。