0%

Netty EventLoop和EventLoopGroup

Netty 框架的主要线程就是 IO 线程,线程模型设计的好坏,决定了系统的吞吐量、并发性和安全性等架构质量属性。Netty 的线程模型(也就是 Reactor 模型)被精心地设计,既提升了框架的并发性能,又能在很大程度避免锁,局部实现了无锁化设计。

总体介绍

我们先看一下 NioEventLoop & NioEventLoopGroup 的继承层次

作为一个高性能的网络框架,Netty 采用了 Reactor 线程模型以及多路复用的 IO模型,而这两者在 NioEventLoop & NioEventLoopGroup 配合 Selector 来进行实现。

EventLoop

Netty 通过事件循环机制(EventLoop)处理 IO 事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的 IO 事件和待处理的异步任务。这种事件循环机制也是一种常用的 IO 事件处理机制,包括 Redis、Mysql 都使用了类似的机制。

启动

//SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
	boolean inEventLoop = inEventLoop();
    //EventLoop存在就直接加入任务队列
    if (inEventLoop) {
        addTask(task);
    } 
    //EventLoop不存在就创建该线程再加入队列
    else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
}

Selector初始化

未开启优化选项

private Selector openSelector() {
    final Selector selector = provider.openSelector();
    return selector;
}

开启优化选项

private Selector openSelector() {
    SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    //通过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys
    //使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉
    
    //////////////////////////////////////////////////////////
    //优化主要是将SelectionKey储存的方式以Set方式转变为以数组方式
    //////////////////////////////////////////////////////////
    
    selectedKeys = selectedKeySet;
    logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
    return selector;
}

执行EventLoop轮询

protected void run() {
    for (;;) {
        //将wakeUp还原为false并将之前的wakeUp状态保存到oldWakeUp变量中
        oldWakenUp = wakenUp.getAndSet(false);
	    //通过hasTasks方法判断当前的消息队列中是否有消息尚未处理
        if (hasTasks()) {
            //立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪Channel的集合
            selectNow();
        } 
        
        else {
            //如果消息队列中没有消息需要处理,则执行select()方法,
            //由Selector多路复用器轮询,看是否有准备就绪的Channel
            select();
            //选择完成之后,再次判断用户是否调用了Selector的wakeup方法,
            //如果调用,则执行selector.wakeup()操作。
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }

        cancelledKeys = 0;

        ///////////
        //处理IO事件
        ///////////
        
        final long ioStartTime = System.nanoTime();
        needsToSelectAgain = false;
        //processSelectedKeys方法处理IO事件
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
        
        /////////////
        //处理非IO事件
        /////////////
        
        final long ioTime = System.nanoTime() - ioStartTime;
        final int ioRatio = this.ioRatio;
        //通过ioTime,ioRatio计算处理任务的CPU时间
        final long cpuTime = ioTime * (100 - ioRatio) / ioRatio;
        //runAllTasks方法处理非IO任务
        runAllTasks(cpuTime);

        //如果是正在关闭状态,则要关闭所有的Channel
        if (isShuttingDown()) {
            closeAll();
            if (confirmShutdown()) {
                break;
            }
        }
    }
}

调用NIO的多路选择器进行轮询接收Channel

private void select() throws IOException {
    Selector selector = this.selector;
    
    for (;;) {
        //计算下一个定时任务的触发时间并为超时时间增加0.5ms的调整值
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) {
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }

        //阻塞等待轮询
        int selectedKeys = selector.select(timeoutMillis);
        //每完成一次select操作,对select计数器selectCnt加1(涉及到 epoll bug)
        selectCnt ++;

        //轮询结束,需要对结果进行判断,判断要退出的条件:
        //有Channel处于就绪状态,selectedKeys不为0,说明有读写事件需要处理
        //oldWakenUp为true
        //系统或者用户调用了wakeup操作,唤醒当前的多路复用器
        //消息队列中有新的任务需要处理
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
            break;
        }

        //如果本次Selector的轮询结果为空,也没有wakeup操作或是新的消息需要处理,
        //则说明是个空轮询,有可能触发了JDK的epoll bug,导致Selector空轮询,使IO线程占用100%
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            //修复策略
            //对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数
            //在某个周期内如果连续发生N次空轮询,说明触发了JDK NIO的epoll死循环bug
            
            //确定bug被触发后,重建Selector
            rebuildSelector();
            selector = this.selector;

            // Select again to populate selectedKeys.
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        currentTimeNanos = System.nanoTime();
    }

    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
        if (logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
        }
    }
   
}

处理Netty未优化的Selector实现

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    //对SelectionKey进行保护性判断,如果为空则返回
    if (selectedKeys.isEmpty()) {
        return;
    }
	//获取SelectionKey的迭代器进行循环操作,通过迭代器获取SelectionKey和SocketChannel
    //的附件对象,将已选择的选择键从迭代器中删除,防止下次被重复选择和处理
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        //对SocketChannel的附件类型进行判读
        
        if (a instanceof AbstractNioChannel) {
            //附件为NioServerSocketChannel或NioSocketChannel,需要进行I/O读写相关的操作
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            //如果它是NioTask,则对其进行类型转换,调用processSelectedKey进行处理
            
            //由于Netty自身没实现NioTask接口,所以通常情况下系统不会执行该分支,
            //除非用户自行注册该Task到多路复用器
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

执行IO任务

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //如果选择键不可用,释放连接资源
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    //选择键可用,继续对网络操作位进行判断
    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException e) {
        unsafe.close(unsafe.voidPromise());
    }
}

执行NioTask接口任务

private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
    int state = 0;
    try {
        task.channelReady(k.channel(), k);
        state = 1;
    } catch (Exception e) {
        k.cancel();
        invokeChannelUnregistered(task, k, e);
        state = 2;
    } finally {
        switch (state) {
        case 0:
            k.cancel();
            invokeChannelUnregistered(task, k, null);
            break;
        case 1:
            if (!k.isValid()) { // Cancelled by channelReady()
                invokeChannelUnregistered(task, k, null);
            }
            break;
        }
    }
}

执行非IO任务

由于 NioEventLoop 需要同时处理 IO 事件和非 IO 任务,为了保证两者都能得到足够的 CPU 时间被执行,Netty 提供了 lO 比例供用户定制。如果 IO 操作多于定时任务和 Task,则可以将 IO 比例调大,反之则调小,默认值为 50%。

protected boolean runAllTasks(long timeoutNanos) {
    //首先从定时任务消息队列中弹出消息进行处理,如果消息队列为空,则退出循环
    fetchFromDelayedQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    //据当前时间进行判断,如果该定时任务已经超时,将其加入TaskQueue中,同时从延时队列中删除;
    //如果定时任务如果没有超时,说明本轮循环不需要处理,直接退出即可
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    this.lastExecutionTime = lastExecutionTime;
    return true;
}

由于获取系统纳秒时间是个耗时的操作,每次循环都获取当前系统纳秒时间进行超时判断会降低性能。为了提升性能,每执行 60 次循环判断一次,如果当前系统时间已经到了分配给非 IO 操作的超时时间,则退出循环。这是为了防止由于非 IO 任务过多导致 IO 操作被长时间阻塞。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;
    for (;;) {
        ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() <= nanoTime) {
            delayedTaskQueue.remove();
            taskQueue.add(delayedTask);
        } else {
            break;
        }
    }
}

EventLoopGroup

对于 NioEventLoopGroup 来说,主要是用来维护一个 EventLoop 链表,并且在需要时新建 EventLoop。也就是说他仅仅是负责管理 EventLoop 资源的。它配合 Netty Bootstrap 启动模式可以调整不同的 Reactor 模型。

具体的Reactor线程模型的内容在 Network I/O模型与线程模型 有详细讲解

Netty服务端对应的Reactor单线程模型创建方式

EventLoopGroup reactorGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(reactorGroup, reactorGroup);

Netty服务端对应的Reactor多线程模型创建方式

EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(acceptorGroup, workerGroup);

Netty服务端对应的主从Reactor线程模型创建方式

EventLoopGroup acceptorGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(acceptorGroup, workerGroup);