Netty 框架的主要线程就是 IO 线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。Netty 的线程模型(也就是 Reactor 模型)被精心地设计,既提升了框架的并发性能,又能在很大程度避免锁,局部实现了无锁化设计。
总体介绍
我们先看一下 NioEventLoop & NioEventLoopGroup 的继承层次
作为一个高性能的网络框架,Netty 采用了 Reactor 线程模型以及多路复用的 IO模型,而这两者在 NioEventLoop & NioEventLoopGroup 配合 Selector 来进行实现。
EventLoop
Netty 通过事件循环机制(EventLoop)处理 IO 事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的 IO 事件和待处理的异步任务。这种事件循环机制也是一种常用的 IO 事件处理机制,包括 Redis、Mysql 都使用了类似的机制。
启动
//SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
//EventLoop存在就直接加入任务队列
if (inEventLoop) {
addTask(task);
}
//EventLoop不存在就创建该线程再加入队列
else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
}
Selector初始化
未开启优化选项
private Selector openSelector() {
final Selector selector = provider.openSelector();
return selector;
}
开启优化选项
private Selector openSelector() {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
//通过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys
//使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉
//////////////////////////////////////////////////////////
//优化主要是将SelectionKey储存的方式以Set方式转变为以数组方式
//////////////////////////////////////////////////////////
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
return selector;
}
执行EventLoop轮询
protected void run() {
for (;;) {
//将wakeUp还原为false并将之前的wakeUp状态保存到oldWakeUp变量中
oldWakenUp = wakenUp.getAndSet(false);
//通过hasTasks方法判断当前的消息队列中是否有消息尚未处理
if (hasTasks()) {
//立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪Channel的集合
selectNow();
}
else {
//如果消息队列中没有消息需要处理,则执行select()方法,
//由Selector多路复用器轮询,看是否有准备就绪的Channel
select();
//选择完成之后,再次判断用户是否调用了Selector的wakeup方法,
//如果调用,则执行selector.wakeup()操作。
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
///////////
//处理IO事件
///////////
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
//processSelectedKeys方法处理IO事件
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
/////////////
//处理非IO事件
/////////////
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
//通过ioTime,ioRatio计算处理任务的CPU时间
final long cpuTime = ioTime * (100 - ioRatio) / ioRatio;
//runAllTasks方法处理非IO任务
runAllTasks(cpuTime);
//如果是正在关闭状态,则要关闭所有的Channel
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
}
调用NIO的多路选择器进行轮询接收Channel
private void select() throws IOException {
Selector selector = this.selector;
for (;;) {
//计算下一个定时任务的触发时间并为超时时间增加0.5ms的调整值
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
//阻塞等待轮询
int selectedKeys = selector.select(timeoutMillis);
//每完成一次select操作,对select计数器selectCnt加1(涉及到 epoll bug)
selectCnt ++;
//轮询结束,需要对结果进行判断,判断要退出的条件:
//有Channel处于就绪状态,selectedKeys不为0,说明有读写事件需要处理
//oldWakenUp为true
//系统或者用户调用了wakeup操作,唤醒当前的多路复用器
//消息队列中有新的任务需要处理
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
break;
}
//如果本次Selector的轮询结果为空,也没有wakeup操作或是新的消息需要处理,
//则说明是个空轮询,有可能触发了JDK的epoll bug,导致Selector空轮询,使IO线程占用100%
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//修复策略
//对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数
//在某个周期内如果连续发生N次空轮询,说明触发了JDK NIO的epoll死循环bug
//确定bug被触发后,重建Selector
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = System.nanoTime();
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
}
处理Netty未优化的Selector实现
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
//对SelectionKey进行保护性判断,如果为空则返回
if (selectedKeys.isEmpty()) {
return;
}
//获取SelectionKey的迭代器进行循环操作,通过迭代器获取SelectionKey和SocketChannel
//的附件对象,将已选择的选择键从迭代器中删除,防止下次被重复选择和处理
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
//对SocketChannel的附件类型进行判读
if (a instanceof AbstractNioChannel) {
//附件为NioServerSocketChannel或NioSocketChannel,需要进行I/O读写相关的操作
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//如果它是NioTask,则对其进行类型转换,调用processSelectedKey进行处理
//由于Netty自身没实现NioTask接口,所以通常情况下系统不会执行该分支,
//除非用户自行注册该Task到多路复用器
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
执行IO任务
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
//如果选择键不可用,释放连接资源
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
//选择键可用,继续对网络操作位进行判断
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
执行NioTask接口任务
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
执行非IO任务
由于 NioEventLoop 需要同时处理 IO 事件和非 IO 任务,为了保证两者都能得到足够的 CPU 时间被执行,Netty 提供了 lO 比例供用户定制。如果 IO 操作多于定时任务和 Task,则可以将 IO 比例调大,反之则调小,默认值为 50%。
protected boolean runAllTasks(long timeoutNanos) {
//首先从定时任务消息队列中弹出消息进行处理,如果消息队列为空,则退出循环
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
//据当前时间进行判断,如果该定时任务已经超时,将其加入TaskQueue中,同时从延时队列中删除;
//如果定时任务如果没有超时,说明本轮循环不需要处理,直接退出即可
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
由于获取系统纳秒时间是个耗时的操作,每次循环都获取当前系统纳秒时间进行超时判断会降低性能。为了提升性能,每执行 60 次循环判断一次,如果当前系统时间已经到了分配给非 IO 操作的超时时间,则退出循环。这是为了防止由于非 IO 任务过多导致 IO 操作被长时间阻塞。
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}
EventLoopGroup
对于 NioEventLoopGroup 来说,主要是用来维护一个 EventLoop 链表,并且在需要时新建 EventLoop。也就是说他仅仅是负责管理 EventLoop 资源的。它配合 Netty Bootstrap 启动模式可以调整不同的 Reactor 模型。
具体的Reactor线程模型的内容在 Network I/O模型与线程模型 有详细讲解
Netty服务端对应的Reactor单线程模型创建方式
EventLoopGroup reactorGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(reactorGroup, reactorGroup);
Netty服务端对应的Reactor多线程模型创建方式
EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(acceptorGroup, workerGroup);
Netty服务端对应的主从Reactor线程模型创建方式
EventLoopGroup acceptorGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(acceptorGroup, workerGroup);