0%

Netty Future-Listener机制

由于 Netty 消息通知都采用了异步的方式,因此需要引入 Future-Listener 机制,通过观察者模式将消息返还给结果验证方进行下一步事件的触发。

Future-Listener机制

Channel 通过事件驱动来执行 Flow,异步发起 Task(如超时断连),Task 执行完成后调用 Listener 来执行对 Task 执行结果的后续处理。

层次继承结构

Netty 重新封装了 jdk 原生的 Future 类,并根据业务需求附加了相应的功能特性,这个后续会详细介绍。

Future(java.util.concurrent)

Future 最早来源于 JDK 的java.util.concurrent.Future,它用于代表异步操作的结果。

函数 说明
get() / get(timeout) 可以通过 get 方法获取操作结果,如果操作尚未完成,则会同步阻塞当前调用的线程;如果不允许阻塞太长时间或者无限期阻塞,可以通过带超时时间的 get 方法获取结果
isDone() 可以判断当前的异步操作是否完成,如果完成,无论成功与否,都返回 true,否则返回 false
cancel() 通过 cancel 可以尝试取消异步操作,它的结果是未知的,如果操作已经完成,或者发生其他未知的原因拒绝取消,取消操作将会失败。
isCancelled() 可以判断当前的异步操作是否在它完成前被取消

Future(io.netty.util.concurrent)

Netty 对 Future 进行了进一步的扩展,添加了 Future-Listener 机制。简单的来说,coder 可以通过添加 Listener 的方式异步获取 Future 的异步操作结果信息。

ChannelFuture

ChannelFuture 解决的问题:异步IO调用都立即返回,如何取得异步操作的结果的问题。以及获取相对应的 Channel 的操作。

下图为 ChannelFuture 状态迁移图

Netty 推荐使用 {@link #addListener(GenericFutureListener)} 获取异步操作结果。同时 Netty 也提供了对 Listener 的其他管理方式

当 IO 操作完成之后,IO线程会回调 ChannelFuture 中 GenericFutureListener 的 operationComplete 方法,并把 ChannelFuture 对象当作方法的入参。如果用户需要做上下文相关的操作,需要将上下文信息保存到对应的 ChannelFuture 中。

推荐通过 GenericFutureListener 代替 ChannelFuture 的 get 等方法的原因是:当我们进行异步 IO 操作时,完成的时间是无法预测的,如果不设置超时时间,它会导致调用线程长时间被阻塞,甚至挂死。而设置超时时间,时间又无法精确预测。利用异步通知机制回调 GenericFutureListener 是最佳的解决方案,它的性能最优。

// BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.awaitUninterruptibly();
    // Perform post-closure operation
    // ...
}

// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx,  GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            // Perform post-closure operation
            // ...
        }
    });
}

异步 I/O 操作有两类超时:一个是 TCP 层面的 I/O 超时,另一个是业务逻辑层面的操作超时。两者没有必然的联系,但是通常情况下业务逻辑超时时间应该大于 I/O 超时时间,它们两者是包含的关系。

需要指出的是:ChannelFuture 超时并不代表 I/O 超时,这意味着 ChannelFuture 超时后,如果没有关闭连接资源,随后连接依旧可能会成功,这会导致严重的问题。所以通常情况下,必须要考虑究竟是设置 IO 超时还是 ChannelFuture 超时。

现在我们可以总结两种获取IO操作结果的能力:

  1. sync() 同步阻塞等待结果,直到有结果返回
  2. addListener() 添加监听器对异步返回结果进行处理

Promise

Promise 是可写的 Future,Future 自身并没有写操作相关的接口,Netty 通过 Promise 对 Future 进行扩展,当 I/O 操作发生异常或者完成时,用于设置 IO 操作的结果。

Netty 发起 I/O 操作的时候,会创建一个新的 Promise 对象,例如调用 ChannelHandlerContext 的 write(Object object)方法时,会创建一个新的 ChannelPromise。

DefaultPromise 对 setSuccess 方法的实现

@Override
public Promise<V> setSuccess(V result) {
   ` //调用setSuccess方法并对其操作结果进行判断
    if (setSuccess0(result)) {
        //如果成功,调用notifyListeners方法通知Listener
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
    //判断当前Promise的操作结果是否已经被设置,如果已经被设置,则不允许重复设置,返回设置失败
    if (isDone()) {
        return false;
    }

    synchronized (this) {
        // Allow only once.
        if (isDone()) {
            return false;
        }
        //如果操作结果为空,说明仅仅需要notify在等待的业务线程,不包含具体的业务逻辑对象
        if (result == null) {
            //将result设置为系统默认的SUCCESS
            this.result = SUCCESS;
        } else {
            this.result = result;
        }
        if (hasWaiters()) {
            notifyAll();
        }
    }
    return true;
}

由于可能存在IO线程和用户线程同时操作 Promise,所以设置操作结果的时候需要加锁保护,防止并发操作对操作结果是否被设置进行二次判断(为了提升并发性能的二次判断),如果已经被设置,则返回操作失败。

DefaultPromise 对 await 方法的实现

@Override
public Promise<V> await() throws InterruptedException {
    //如果当前Promise已经被设置直接返回
    if (isDone()) {
        return this;
    }
	//如果线程被中断则抛出中断一场
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
	//sync锁定当前Promise对象
    synchronized (this) {
        //通过循环判断对isDone结果进行判断
        //循环判断就是如果程序执行到while体内部,就必须满足条件才可能退出循环体
        //如果使用if来判断,则会直接退出循环体
        while (!isDone()) {
            checkDeadLock();
            incWaiters();
            try {
                wait();
            } finally {
                decWaiters();
            }
        }
    }
    return this;
}

由于在 I/O 线程中调用 Promise 的 await 或者 sync 方法会导致死锁,所以在循环体中需要对死锁进行保护性校验,防止 I/O 线程被挂死,最后调用java.lang.Object.wait()方法进行无限期等待,直到 I/O 线程调用 setSuccess 方法、trySuccess 方法、setFailure 或者 tryFailure 方法。