Netty 封装了 JDK NIO 类库复杂而庞大的通信底层细节,使用户不再和 NIO 的 Selector、ServerSocketChannel、SocketChannel、ByteBuffer、SelectionKey 等组件打交道。而对于 Netty 来说,用户可以直接通过 ServerBootstrap(服务端启动辅助类)和 Bootstrap(客户端启动辅助类)来创建一个可通信的 C/S 端。 这无疑大大减轻了工作量、降低了开发难度。
NettyServer 的创建
NTSC-Server示例代码
下面我们以一个 NTSC(National Time Service Center,国家授时中心)举例
public class NettyTimeServer {
public static void main(String[] args) {
new NettyTimeServer().bind(8080);
}
private void bind(int port) {
//创建两个NIO线程组(Reactor线程组),用于网络事件的处理
//用于服务端接受客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于SocketChannel网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//IO事件处理类(此处先省略ChildChannelHandler的功能代码)
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放跟shutdownGracefully相关的资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端创建及绑定步骤详解
步骤1:创建 ServerBootstrap 实例
ServerBootstrap 是 Netty 服务端的启动辅助类,提供了一系列方法用于设置服务端启动相关的参数。底层通过外观模式对各种能力进行抽象和封装,尽量不需要用户跟过多的底层 API 打交道,以降低用户的开发难度。并且,NIO 过多的参数导致不能只依靠构造器来启动服务,此时引入 Builder 模式,使得整体结构更加清晰了。
步骤2:设置并绑定 Reactor 线程池
Netty 的 Reactor 线程池是 EventLoopGroup,它实际就是 EventLoop 数组。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由绑定的 EventLoop 线程的 run 方法驱动,在一个循环体内循环执行。对于 EventLoop 来说,他的职责不仅仅是处理网络IO事件。用户自定义的 Task 和定时任务 Task 也统一由 EventLoop 负责处理。这样线程模型就实现了统一。从调度层面看,也不存在从 EventLoop 线程中再启动其他类型的线程用于异步执行另外的任务,这样就避免了多线程并发操作和锁竞争,提升了IO线程的处理和调度性能。
步骤3:设置并绑定服务端 Channel
作为 NIO 服务端,需要创建 ServerSocketChannel,Netty 对原生的 NIO 类库进行了封装,对应实现是 NioServerSocketChannel。对于用户而言,不需要关心服务端 Channel 的底层实现细节和工作原理,只需要指定具体使用哪种服务端 Channel 即可。因此,Netty 的 ServerBootstrap 方法提供了 Channel 方法用于指定服务端 Channel 的类型。Netty 通过工厂类,利用反射创建 NioServerSocketChannel 对象。由于服务端监听端口往往只需要在系统启动时才会调用,因此反射对性能的影响并不大。
步骤4:链路建立的时候创建并初始化 ChannelPipeline
ChannelPipeline 并不是 NIO 服务端必需的,它本质就是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler。网络事件以事件流的形式在 ChannelPipeline 中流转,由 ChannelPipeline 根据 ChannelHandler 的执行策略调度 ChannelHandler 的执行。典型的网络事件如下:
- 链路注册
- 链路激活
- 链路断开
- 接收到请求消息
- 请求消息接收并处理完毕
- 发送应答消息
- 链路发生异常
- 发生用户自定义事件
步骤5:初始化 ChannelPipeline 完成之后,添加并设置 ChannelHandler
ChannelHandler 是 Netty 提供给用户定制和扩展的关键接口。利用 ChannelHandler 用户可以完成大多数的功能定制,例如消息编解码、心跳、安全认证、TSL/SSL认证、流量控制和流量整形等。
Netty 同时也提供了大量的系统 ChannelHandler 供用户使用,比较实用的系统 ChannelHandler 总结如下
- 系统编解码框架 —— ByteToMessageCodec
- 通用基于长度的半包解码器 —— LengthFieldBasedFrameDecoder
- 码流日志打印 Handler —— HandlerLoggingHandler
- SSL安全认证 Handler —— HandlerSsIHandler
- 链路空闲检测 Handler —— IdleStateHandler
- 流量整形 Handler —— ChannelTrafficShapingHandler
- Base64编解码 —— Base64Decoder 和 Base64Encoder
步骤6:绑定并启动监听端口
在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将 ServerSocketChannel 注册到 Selector 上监听客户端连接。
步骤7:Selector 轮询
由 Reactor 线程 NioEventLoop 负责调度和执行 Selector 轮询操作,选择准备就绪的 Channel 集合
步骤8:轮询到准备就绪的 Channel 后,由 Reactor 线程 NioEventLoop 执行 ChannelPipeline 的相应方法,最终调度并执行 ChannelHandler
- fireChannelRegistered
- fireChannelActive
- fireChannelRead
- fireChannelReadComplete
- fireExceptionCaught
- fireUserEventTriggered
- fireChannelWritabilityChanged
- fireChannelInactive
步骤9:执行 Netty 系统 ChannelHandler 和用户添加定制的 ChannelHandler
ChannelPipeline 根据网络事件的类型,调度并执行 ChannelHandler
服务端创建及绑定源码分析
创建启动器
创建辅助启动器
首先通过构造函数创建 ServerBootstrap 实例
//Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
配置初始化参数
配置初始化参数:绑定线程组
创建两个 EventLoopGroup( 并不是必须要创建两个不同的 EventLoopGroup,也可以只创建一个并共享),代码如下
//创建两个NIO线程组(Reactor线程组),用于网络事件的处理
//用于服务端接受客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于SocketChannel网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup 实际就是 Reactor 线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。通过 ServerBootstrap 的 group 方法将两个 EventLoopGroup 实例传入,代码如下。
//Class ServerBootstrap(子类)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父线程组被传入父类构造函数中
super.group(parentGroup);
this.childGroup = childGroup;
return this;
}
//Class AbstractBootstrap(父类)
public B group(EventLoopGroup group) {
this.group = group;
return (B) this;
}
事实上,无论是客户端还是服务端都需要设置工作IO线程,执行和调度网络事件的读写,因此,AbstractBootstrap 类中设置项是被客户端和服务端重用的。
配置初始化参数:配置端口监听和客户端链路接入的 Channel
Netty 通过 Channel 工厂类来创建不同类型的 Channel,对于服务端,需要创建 NioServerSocketChannel。所以,通过指定 Channel 类型的方式创建 Channel 工厂。
ServerBootstrapChannelFactory 是 ServerBootstrap 的内部静态类,职责是根据 Channel 的类型通过反射创建 Channel 的实例,服务端需要创建的是 NioServerSocketChannel 实例,代码如下。
public ServerBootstrap channel(Class<? extends ServerChannel> channelClass) {
return channelFactory(new ServerBootstrapChannelFactory<ServerChannel>(channelClass));
}
配置初始化参数:配置 Socket Option
指定 NioServerSocketChannel 后,需要设置 Server Socket 的一些参数。
SO_BACKLOG(指定请求等待队列长度)— Server
服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理。
内核要维护两个队列:未连接队列和已连接队列,根据 TCP 三次握手过程中三个分节来分隔这两个队列。服务器处于监听状态时,收到客户端
SYN
时在未连接队列中创建一个新的条目,然后以SYN/ACK
响应客户端,此条目在第三个分节到达前(客户端对服务器SYN/ACK
响应的ACK
)一直保留在未连接队列中。如果握手完成,该条目将从未连接队列搬到已连接队列尾部。当进程调用 accept 时,从已完成队列中的头部取出一个条目给进程,当已完成队列为空时进程将睡眠,直到有条目在已完成连接队列中才唤醒。backlog 被规定为两个队列总和的最大值,大多数实现默认值为5,但在高并发 Web 服务器中此值显然不够, Lighttpd 中此值达到 128×8,需要设置此值更大一些的原因是未完成连接队列的长度可能因为客户端 syn 的到达及等待三路握手第三个分节的到达延时而增大。SO_REUSEADDR(允许重复使用本地地址和端口)
比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。
SO_KEEPALIVE(测试链接的状态)
当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
SO_SNDBUF 和 SO_RCVBUF(指定接受/发送缓冲区大小)
操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
SO_LINGER
Linux内核默认的处理方式是当用户调用
close()
方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞close()
的调用时间,直到数据完全发送SO_TIMEOUT
控制读取操作将阻塞多少毫秒。如果返回值为0,计时器就被禁止了,该线程将无限期阻塞
TCP_NODELAY 和 TCP_CORK
该参数的使用与 Nagle 算法有关,Nagle 算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用 Nagle 算法,使用于小数据即时传输,于 TCP_NODELAY 相对应的是 TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。如果是时延敏感型的应用建议关闭。
CONNECT_TIMEOUT_MILLIS
客户端连接超时时间,由于NIO原生的客户端并不提供设置连接超时的接口,因此,Netty 用的是自定义连接超时定时器负责检测和超时控制
public <T> B option(ChannelOption<T> option, T value) {
synchronized (options) {
options.put(option, value);
}
return (B) this;
}
配置初始化参数:配置 Pipeline Handler
用户可以为启动辅助类和其父类分别指定 Handler。两类 Handler 的用途不同:子类中的 Handler 是 NioServerSocketChannel 对应的 ChannelPipeline 的 Handler;父类中的 Handler 是客户端新接入的连接 SocketChannel 对应的 ChannelPipeline 的 Handler。两者的区别可以通过下图来展示。
本质区别就是:ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有连接该监听端口的客户端都会执行它;父类 AbstractBootstrap 中的 Handler 是个工厂类,它为每个新接入的客户端都创建一个新的 Handler。
绑定端口
//为了简化展示的代码,进行了合并
public ChannelFuture bind(SocketAddress localAddress) {
//参数校验
validate();
//初始化NioServerSocketChannel并注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
初始化NioServerSocketChannel
//为了方便对主线任务的理解,对异常处理进行了删减
final ChannelFuture initAndRegister() {
Channel channel;
////////////////////////////////////////////////////
// START ---------- 初始化Netty服务端监听的相关资源开始
////////////////////////////////////////////////////
channel = createChannel();
/*
//ServerBootstrap重载
@Override
Channel createChannel() {
EventLoop eventLoop = group().next();
//该工厂的作用只是通过反射的方式创建NioServerSocketChannel
//而这两个参数也分别对应了之前在group配置的两个线程组
//参数1是服务端用于监听和接收客户端连接的 Reactor 线程
//参数2是所谓的 WorkerGroup 线程池,它就是处理IO读写的 Reactor 线程组
return channelFactory().newChannel(eventLoop, childGroup);
}
*/
init(channel);
/*
只做了三件事
1.设置 Socket 参数和 NioServerSocketChannel 的附加属性
2.将 AbstractBootstrap 的 Handler 添加到 NioServerSocketChannel 的 ChannelPipeline 中
也就是设置辅助启动类父类的责任链
3.将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline中
也就是设置辅助启动类当前类的责任链
*/
//////////////////////////////////////////////////
// END ---------- 初始化Netty服务端监听的相关资源结束
//////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
// START ---------- 注册Channel到Reactor线程的Selector,并轮询客户端连接事件
/////////////////////////////////////////////////////////////////////////
//ChannelPromise可写的Future
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
//抛出异常后释放资源
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
/////////////////////////////////////////////////////////////////////////////////
// END ---------- 运行到这里有几种情况
// 1.当前线程在试图 bind() 或者 connect()时,发现已注册
// 2.其他线程在执行注册操作时,发现已注册
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled
// registration task is executed. because register(), bind(), and connect()
// are all bound to the same thread.
//////////////////////////////////////////////////////////////////////////////////
return regFuture;
}
注册NioServerSocketChannel
注册 NioServerSocketChannel 到 Reactor 线程的多路复用器上,然后轮询客户端连接事件
//Channel初始化完成之后,需要将它注册到Reactor线程的多路复用器上监听新客户端的接入
@Override
public final void register(final ChannelPromise promise) {
//判断是否为NioEventLoop自身发起的操作,是则不存在并发操作,直接执行注册
if (eventLoop.inEventLoop()) {
register0(promise);
}
//如果由其他线程发起,则封装成一个Task放入消息队列中异步执行
else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
//报异常就强行关闭Channel
}
}
首先判断是否是 NioEventLoop 自身发起的操作。如果是,则不存在并发操作,直接执行 Channel 注册;
此处,由于是由 ServerBootstrap 所在线程执行的注册操作,所以会将其封装成 Task 投递到 NioEventLoop 中执行,代码如下。
private void register0(ChannelPromise promise) {
//确保在注册时,Channel是打开的
if (!ensureOpen(promise)) {
return;
}
//将NioServerSocketChannel注册到NioEventLoop的Selector上
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
//如注册失败,抛出异常并释放资源
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
/*
NIO SelectionKey状态
1. OP_READ = 1 << 0 (1)
2. OP_WRITE = 1 << 2 (4)
3. OP_CONNECT = 1 << 3 (8)
4. OP_ACCEPT = 1 << 4 (16)
*/
/*
应该注册 OP_ACCEPT 到多路复用器上,而 0 表示只注册,不监听任何网络操作。
这样做的原因如下:
1.注册方法是多态的,它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,
也可以注册 SocketChannel 用来监听网络读或者写操作
2.通过 SelectionKey 的 interestOps(int ops) 方法可以方便地修改监听操作位。
所以,此处注册需要获取 SelectionKey 并给 AbstractNioChannel 的成员变量
selectionKey 赋值
*/
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
//由于事件被取消,但SelectionKey仍然缓存在队列中,需要调用Select方法去移除它
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
触发ChannelRegistered事件
//思路跳回到 register0 函数的讲解
//Class AbstractChannel
private void register0(ChannelPromise promise) {
...
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
...
}
//Class DefaultChannelPipeline
@Override
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
//Class DefaultChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
next.invoker.invokeChannelRegistered(next);
return this;
}
//Interface ChannelHandlerInvoker
void invokeChannelRegistered(ChannelHandlerContext ctx);
触发ChannelActive事件
ChannelRegistered事件传递完成后,判断 ServerSocketChannel 监听是否成功,如果成功,需要出发 NioServerSocketChannel 的 ChannelActive 事件。
//Class AbstractChannel
private void register0(ChannelPromise promise) {
...
if (isActive()) {
pipeline.fireChannelActive();
}
...
}
inActive() 也是个多态方法。如果是服务端,判断监听是否启动;如果是客户端,判断TCP连接是否完成。
依据条件触发Channel读操作
ChannelActive 事件在 ChannelPipeline 中传递,完成之后根据配置决定是否自动触发 Channel 的读操作,代码如下。
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
AbstractChannel 的读操作触发 ChannelPipeline 的读操作,最终调用到 HeadHandler 的 read 方法。
由于不同类型的 Channel 对读操作的准备工作不同,因此, beginRead 也是个多态方法,对于 NIO 通信,无论是客户端还是服务端,都是要修改网络监听操作位为自身感兴趣的,对于 NioServerSocketChannel 感兴趣的操作是 OP_ACCEPT(16),于是重新修改注册的操作位为 OP_ACCEPT,代码如下。
@Override
protected void doBeginRead() throws Exception {
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
以网络操作位来表示状态的好处:可以方便地通过位操作来进行网络操作位的状态判断和状态修改,从而提升操作性能。
由于创建 NioServerSocketChannel 将 readInterestOp 设置成了 OP_ACCEPT,所以,在服务端链路注册成功之后重新将操作位设置为监听客户端的网络连接操作,初始化 NioServerSocketChannel 的代码如下。
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
客户端接入源码分析
负责处理网络读写、连接和客户端请求接入的 Reactor 线程就是 NioEventLoop。
当多路复用器检测到新的准备就绪的 Channel 时,默认执行 processSelectedKeysOptimized 方法,代码如下。
@Override
protected void run() {
for (;;) {
//没有新的准备就绪的Channel就睡眠
oldWakenUp = wakenUp.getAndSet(false);
if (hasTasks()) {
selectNow();
} else {
select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
//处理新的准备就绪的Channel
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
}
由于 Channel 的 Attachment 是 NioServerSocketChannel,所以执行 processSelectedKey 方法,根据就绪的操作位,执行不同的操作。此处,由于监听的是连接操作,所以执行 unsafe.read 方法。由于不同的 Channel 执行不同的操作,所以 NioUnsafe 被设计成接口,由不同的 Channel 内部的 NioUnsafe 实现类负责具体实现。我们发现 read 方法的实现有两个,分别是 NioByteUnsafe 和 NioMessageUnsafe。对于 NioServerSocketChannel,它使用的是 NioMessageUnsafe,它的 read 方法代码如下。
@Override
public void read() {
assert eventLoop().inEventLoop();
if (!config().isAutoRead()) {
removeReadOp();
}
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final boolean autoRead = config.isAutoRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
}
}
对 doReadMessages 方法进行分析,发现它实际就是接收新的客户端连接并创建 NioSocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
return 0;
}
接收到新的客户端连接后,触发 ChannelPipeline 的 ChannelRead 方法
执行 head ChannelHandlerContext 的 fireChannelRead 方法,事件在 ChannelPipeline 中传递,执行 ServerBootstrapAcceptor 的 channelRead 方法,代码如下。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//将启动时传入的 childHandler 加入到客户端 SocketChannel 的 ChannelPipeline 中
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
//设置客户端 SocketChannel 的 TCP 参数
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//注册 SocketChannel 到多路复用器
child.unsafe().register(child.newPromise());
}
NioSocketChannel 的注册方法与 ServerSocketChannel 的一致,也是将 Channel 注册到 Reactor 线程的多路复用器上。由于注册的操作位是0,所以,此时 NioSocketChannel 还不能读取客户端发送的消息,那什么时候修改监听操作位为 OP_READ 呢。执行完注册操作之后,紧接着会触发 ChannelReadComplete 事件。我们继续分析 ChannelReadComplete 在 ChannelPipeline 中的处理流程:Netty 的 Header 和 Tail 本身不关注 ChannelReadComplete 事件就直接透传,执行完 ChannelReadComplete 后,接着执行 Pipeline 的 read 方法,最终执行 HeadHandler 的 read 方法。
HeadHandler read 方法的代码已经在之前的小节介绍过,用来将网络操作位修改为读操作。创建 NioSocketChannel 的时候已经将 AbstractNioChannel 的 readInterestOp 设置为 OP_READ,这样,执行 selectionKey.interestOps(interestOps | readInterestOp)
操作时就会把操作位设置为 OP_READ。
protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
super(parent, eventLoop, ch, SelectionKey.OP_READ);
}
到此,新接入的客户端连接处理完成,可以进行网络读写等IO操作。
NettyClient 的创建
NTSC-Client示例代码
相对于服务端,Netty 客户端的创建更加复杂,除了要考虑线程模型、异步连接、客户端连接超时等因素外,还需要对连接过程中的各种异常进行考虑。
public class NettyTimeClient {
public static void main(String[] args) {
new NettyTimeClient().connect("127.0.0.1", 8080);
}
private void connect(String host, int port) {
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端辅助启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelHandler());
//connect发起异步连接操作 sync同步方法等待连接成功
ChannelFuture f = bootstrap.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
}
客户端创建及连接步骤详解
步骤1
用户线程创建 Bootstrap实例,通过APl设置创建客户端相关的参数,异步发起客户端连接。
步骤2
创建处理客户端连接、IO 读写的 Reactor 线程组 NioEventLoopGroup。可以通过构造函数指定 IO 线程的个数,默认为 CPU 内核数的2倍
步骤3
通过 Bootstrap 的 ChannelFactory 和用户指定的 Channel 类型创建用于客户端连接的 NioSocketChannel,它的功能类似于 JDK NIO 类库提供的 SocketChannel
步骤4
创建默认的 ChannelHandlerPipeline,用于调度和执行网络事件
步骤5
异步发起 TCP 连接,判断连接是否成功。如果成功,则直接将 NioSocketChannel 注册到多路复用器上,监听读操作位,用于数据报读取和消息发送:如果没有立即连接成功,则注册连接监听位到多路复用器,等待连接结果;
步骤6
注册对应的网络监听状态位到多路复用器
步骤7
由多路复用器在 IO 现场中轮询各 Channel,处理连接结果
步骤8
如果连接成功,设置 Future 结果,发送连接成功事件,触发 ChannelPipeline 执行
步骤9
由 ChannelPipeline 调度执行系统和用户的 ChannelHandler,执行业务逻辑。
客户端创建及连接源码分析
创建及配置初始化参数
创建辅助启动器
//客户端辅助启动类
Bootstrap bootstrap = new Bootstrap();
配置初始化参数:绑定线程组
对于客户端来说,只需要一个处理IO读写的线程组即可。与服务端不同的是,服务端重载了group
函数,并增加了一个用于接收客户端连接的线程组。而这一点客户端显然不需要,因此直接采用了 AbstractBootstrap 的方法对线程组进行设置。
配置初始化参数:配置端口监听和客户端链路接入的 Channel
与服务端实现底层一样,相区别的是客户端使用 NioSocketChannel
配置初始化参数:配置 Socket Option
详见 服务端创建源码分析 -> 配置初始化参数 -> 配置 Socket Option
配置初始化参数:配置 Pipeline Handler
详见 服务端创建源码分析 -> 配置初始化参数 -> 配置 Pipeline Handler
值得注意的是,Handler的设置类需要实现 ChannelHandler 接口,但一般使用 ChannelInitializer 作为派生实现。它的继承关系如下:
interface ChannelHandler
-implement class ChannelHandlerAdapter
-extends abstract class ChannelInitializer<C extends Channel>
当TCP链路注册成功之后,会调用 initChannel 接口,用于设置用户 ChannelHandler。
//abstract class ChannelInitializer
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
//调用模板方法initChannel,设置用户ChannelHandler
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
客户端连接操作
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//创建和初始化NioSocketChannel并注册到Selector上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
由上述代码可以看出,从 doConnect 操作开始,连接操作切换到了 Netty 的 NIO 线程 NioEventLoop 中进行,此时客户端返回,连接操作异步执行。doConnect 最终调用 HeadHandler 的 connect方法。
需要注意的是, SocketChannel 执行 connect 操作后有以下三种结果:
- 连接成功,返回True
- 暂时没有连接上,服务端没有返回 ACK 应答,连接结果不确定,返回 False
- 连接失败,直接抛出 IO 异常。
如果是第二种结果,需要将 NioSocketChannel 中的 selectionKey 设置为 OP_CONNECT,监听连接结果异步连接返回之后,需要判断连接结果,如果连接成功,则触发 ChannelActive 事件,代码如下。
//abstract class AbstractNioUnsafe
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
ChannelActive 事件处理最终会将 NioSocketChannel 中的 selectionKey 设置为 SelectionKey.OP_READ,用于监听网络读操作。
如果没有立即连接上服务端,则注册 SelectionKey.OP_CONNECT 到多路复用器,失败则关闭链路。
//class NioSocketChannel
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
异步连接结果通知
NioEventLoop 的 Selector 轮询客户端连接 Channel,当服务端返回握手应答之后,对接结果进行判断,代码如下。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//处理连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
对 finishConnect 进行源码分析
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
doFinishConnect 用于判断JDK的 SocketChannel 的连接结果,如果返回true表示连接成功,其他值或者发生异常表示连接失败。
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
连接成功之后,调用 fulfilIConnectPromise 方法,触发链路激活事件,该事件由 ChannePipeline 进行传播,代码如下。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
客户端连接超时机制
对于 SocketChannel 接口,JDK 并没有提供连接超时机制,需要 NIO 框架或者用户自已扩展实现。 Netty利用定时器提供了客户端连接超时控制功能。
首先,用户在创建Netty客户端的时候,可以通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 配置项设置连接超时时间,代码如下。
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
发起连接的同时,启动连接超时检测定时器。一旦超时定时器执行,说明客户端连接超时,构造连接超时异常,将异常结果设置到connect Promise中,同时关闭客户端连接,释放句柄。如果在连接超时之前获取到连接结果,则删除连接超时定时器,防止其被触发。无论连接是否成功,只要获取到连接结果,之后就删除连接超时定时器。
//abstract class AbstractNioChannel finishConnect
finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
进阶:Netty服务端意外退出
通过阻塞方式绑定监听端口,启动服务端之后,没发生任何异常,程序退出。
try {
//Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//IO事件处理类
bootstrap.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();
//*这是跟NTSC-Server有区别的地方*
//等待服务端监听端口关闭
//f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放跟shutdownGracefully相关的资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
造成该结果的原因
- 调用
bind.sync
同步阻塞直到有结果返回,但线程很快执行完毕。 - 紧接着程序在 finally 里面执行了 shutdownGracefully 同时关闭服务端的TCP连接接入线程池和处理客户端网络IO读写的工作线程池,关闭之后,NioEventLoop 线程退出,整个系统的非守护线程就全部执行完成了,此时 main 函数主线程也早已执行完,因此 JVM 就会退出。因为调用的是 Netty 的优雅退出接口(
shutdown Gracefully
),所以整个退出过程并没有发生异常。
改进措施
- 以阻塞形式对关闭端口事件监听
//绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
- 将
finally
块中的shutdownGracefully
切到监听器
f.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//释放跟shutdownGracefully相关的资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
实际项目中的情况
业务往往是通过某种容器(例如 Tomcat、Spring Boot 等)拉起进程,然后通过容器启动来初始化各种业务资源。因此,不需要担心 Netty 服务端意外退出,启动 Netty 服务端比较容易犯的错误是釆用同步的方式调用Nety,导致初始化Nety服务端的业务线程被阻塞,举例如下。
错误用法:这种用法会导致调用方的线程一直被阻塞,直到服务端监听句柄关闭
- 初始化 Netty 服务端
- 同步阻塞等待服务端端口关闭。
- 释放 IO 线程资源和句柄等
- 调用方线程被释放
正确用法:服务端启动之后注册监听器监听服务端句柄关闭事件,待服务端关闭之后异步调用 shutdownGraceful 释放资源,这样调用方线程就可以快速返回,不会被阻塞。
- 初始化 Netty 服务端
- 绑定监听端口
- 向 CloseFuture 注册监听器,在监听器中释放资源
- 调用方线程返回
当系统退出时,建议通过调用 EventLoopGroup 的 shutdownGracefully 来完成内存队列中积压消息的处理、链路的关闭和 EventLoop 线程的退出,以实现停机不中断业务(备注:单靠 Netty 框架实际上无法 100% 保证,需要应用配合来实现)
进阶:优雅退出机制
在 Linux 上通常会通过kill -9 pid
的方式强制将某个进程杀掉,这种方式简单高效,因此很多程序的停止脚本经常会使用kill -9 pid
的方式。但强制进程退出,都会带来些副作用,对应用软件而言其效果等同于突然掉电,可能会导致如下问题:
- 缓存中的数据尚未持久化到磁盘中,导致数据丢失。
- 正在进行文件的写( write)操作,没有更新完成,突然退出,导致文件损坏
- 线程的消息队列中尚有接收到的请求消息还没来得及处理,导致请求消息丢失
- 数据库操作已经完成,例如账户余额更新,准备返回应答消息给客户端时,消息尚在通信线程的发送队列中排队等待发送,进程强制退出导致应答消息没有返回给客户端,客户端发起超时重试,会带来重复更新问题
- 句柄资源没有及时释放等其他问题
Java优雅退出机制
ShutdownHook
Java 的优雅停机通常通过注册 JDK 的 ShutdownHook 来实现,当系统接收到退出指令时,首先标记系统处于退出状态,不再接收新的消息,然后将积压的消息处理完,最后调用资源回收接口将资源销毁,各线程退出执行。
public static void main(String[] args) {
System.out.println("Main start");
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
System.out.println("ShutdownHook start performing");
System.out.println("ShutdownHook end performing");
})
);
System.out.println("Main end");
}
/*--Console--
Main start
Main end
Shutdown Hook start performing
Shutdown Hook end performing
Signal
也可以通过监听信号量并注册 SignalHandler 的方式实现优雅退出,它的工作原理如下图。
import sun.misc.Signal;
import sun.misc.SignalHandler;
public class TestSignalHandler implements sun.misc.SignalHandler {
@Override
public void handle(Signal sig) {
System.out.println("process successfully");
}
}
public static void main(String[] args) {
TestSignalHandler handler = new TestSignalHandler();
//判断是否是Windows操作系统,如果是则选择SIGINT,接收Ctrl+C中断的指令,
//否则选择TERM信号,接收 SIGTERM(等价于 kill pid)指令
String sgn = System.getProperty("os.name").toLowerCase().startsWith("win") ?
"INT" : "TERM";
Signal.handle(new Signal(sgn), handler);
for (; ; ) {
System.out.println("do something");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在 Linux 下支持的信号(具体信号kill -l命令查看)
SEGV, ILL, FPE, BUS, SYS, CPU, FSZ, ABRT, INT, TERM, HUP, USR1, USR2, QUIT, BREAK, TRAP, PIPE在 Windows 下支持的信号
SEGV, ILL, FPE, ABRT, INT, TERM, BREAK
需要注意的点
- ShutdownHook 在某些情况下并不会被执行,例如 JVM 崩溃、无法接收信号量和 kill -9 pid等
- 当存在多个 ShutdownHook 时,JVM无法保证它们的执行先后顺序
- 在 JVM 关闭期间不能动态添加或者去除 ShutdownHook
- 不能在 ShutdownHook 中调用 System.exit,它会卡住 JVM,导致进程无法退出
对于采用注册 SignalHandler 实现优雅退出的程序,在 handle 接口中一定要避免阻塞操作,否则它会导致已经注册的 ShutdownHook 无法执行,系统也无法退出。如果 SignalHandler 执行的操作比较耗时,最好异步或者放在 ShutdownHook 中去执行。
Netty 优雅退出机制
在实际项目中,Netty 作为高性能的异步 NIO 通信框架,往往作为基础通信框架负责各种协议的接入、解析和调度等,例如在 RPC 和分布式服务框架中,往往会使用 Netty 作为内部私有协议的基础通信框架。
Netty优雅退出主要进行三个操作:
- 把 NIO 线程的状态位设置成 ST_SHUTTING_DOWN,不再处理新的消息(不允许再对外发送消息)
- 退出前的预处理操作:把发送队列中尚未发送或者正在发送的消息发送完(备注:不保证能够发送完)、把已经到期或在退出超时之前到期的定时任务执行完成、把用户注册到 NIO 线程的退出 Hook 任务执行完成
- 资源的释放操作:所有 Channel 的释放、多路复用器的去注册和关闭、所有队列和定时任务的清空取消,最后是 EventLoop 线程的退出
//MultithreadEventExecutorGroup EventLoopGroup继承的抽象类
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
//遍历 EventLoop 优雅退出
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
//SingleThreadEventExecutor EventLoop继承的抽象类
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
//加锁更改线程状态为 ST_SHUTTING_DOWN
...
synchronized (stateLock) {
switch (state) {
case ST_NOT_STARTED:
state = ST_SHUTTING_DOWN;
doStartThread();
break;
case ST_STARTED:
state = ST_SHUTTING_DOWN;
break;
default:
wakeup = false;
}
}
...
return terminationFuture();
}
//具体的 EventLoop 退出其实在 NioEventLoop 运行时的轮询上实现
protected void run() {
for (;;) {
...
if (isShuttingDown()) {
closeAll();
//扫尾处理,最后介绍
if (confirmShutdown()) {
break;
}
}
...
}
}
//释放资源继续处理
private void closeAll() {
//统计需要取消注册Channel的个数
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
//遍历取消 Channel
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
@Override
public final void close(final ChannelPromise promise) {
//判断当前链路是否有消息正在发送,如果有则将SelectionKey的去注册操作封装成Task放到
//eventLoop中稍后再执行
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
close(promise);
}
});
return;
}
//将发送队列清空,不再允许发送新的消息
boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
//调用NioSocketChannel的doClose方法关闭链路
doClose();
// Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//调用pipeline的fireChannelInactive,触发链路关闭事件
pipeline.fireChannelInactive();
}
});
}
//从多路复用器上取消selectionKey
deregister();
}
}
//NioEventLoop的扫尾处理
NioEventLoop 除了I/O读写,还负责定时任务执行、 ShutdownHook(备注:此处非 JDK 原生的 ShutdownHook)的执行等,如果此时有到期的定时任务,即使 Channel 已经关闭,但是仍然需要继续执行,线程不能退岀,下面继续分析 TaskQueue 的退出处理流程。
//上面 Run 轮询提到的扫尾工作
protected boolean confirmShutdown() {
//取消延时的任务
cancelDelayedTasks();
//执行剩余任务 运行ShutdownHook
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
//判断是否到达优雅退出的指定超时时间,如果达到或者过了超时时间,则立即退出
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
//如果没到达指定的超时时间,暂时不退出,每隔100ms检测一下是否有新的任务加入,有新任务则继续执行
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
Netty优雅退出的一些误区
- 待发送的消息:调用优雅退出方法之后,不会立即关闭链路。ChannelOutboundBuffer 中的消息可以继续发送,本轮发送操作执行完成之后,无论是否还有消息尚未发送出去,在下一轮的 Selector 轮询中,链路都将被关闭,没有发送完成的消息将会被释放和丢弃。
- 需要发送的新消息:由于应用线程可以随时通过调用 Channel 的 write 系列接口发送消息,即便 ShutdownHook 触发了 Netty 的优雅退出方法,在 Netty 优雅退出方法执行期间,应用线程仍然有可能继续调用 Channel 发送消息,这些消息将发送失败。
应用注册在 NioEventLoop 线程上的普通 Task、ScheduledTask(定时任务)和 ShutdownHook,也无法保证被完全执行,这取决于优雅退出超时时间和任务的数量,以及执行速度。
因此,应用程序的正确性不能完全依赖 Netty 的优雅退出机制,需要在应用层面做容错设计和处理。例如,服务端在返回响应之前关闭了,导致响应没有发送给客户端,这可能会触发客户端的 IO 异常,或者恰好发生了超时异常,客户端需要对 IO 或超时异常做容错处理,采用 Failover 重试其他可用的服务端,而不能寄希望于服务端永远正确。Netty 优雅退出更重要的是保证资源、句柄和线程的快速释放,以及相关对象的清理。
Netty 优雅退出通常用于应用进程退出时,在应用的 ShutdownHook 中调用 EventLoopGroup.shutdownGracefully(long quietPeriod, long timeout, Time Unit unit)
接口,指定退出的超时时间,以防止因为一些任务执行被阻塞而无法正常退出。