作为一个基础的 NIO 通信框架,Netty 被广泛应用于大数据处理、互联网消息中间件、游戏和金融行业等。不同的行业对软件的可靠性需求不同,例如对通信软件的可靠性要求往往需要达到 5 个 9。
可靠性需求
Netty主要应用场景
RPC框架的基础网络通信框架
主要用于分布式节点之间的通信和数据交换,在各个业务领域均有典型的应用,例如阿里的分布式服务框架 Dubbo、消息队列 RocketMQ、大数据处理 Hadoop 的基础通信和序列化框架 Avro
私有协议的基础通信框架:例如 Thrift 协议、Dubbo 协议等
公有协议的基础通信框架:例如 HTTP 协议、SMPP 协议等。
Netty的运行环境
- 手游服务运行的 GSM/3G/WIFI 网络环境可靠性差,偶尔会出现闪断、网络单通等问题。
- 互联网应用在业务高峰期会出现网络拥堵,而且各地用户的网络环境差别也很大,部分地区网速和网络质量不高。
Netty故障的后果
Netty 是基础的通信框架,一旦出现 Bug,轻则需要重启应用,重则可能导致整个业务中断。它的可靠性会影响整个业务集群的数据通信和交换,在当今以分布式为主的软件架构体系中,通信中断就意味着整个业务中断,分布式架构下对通信的可靠性要求非常高。
可靠性设计
网络通信类故障
客户端连接超时
在 BIO 编程模式下,客户端 Socket 发起网络连接,需要指定连接超时时间,主要目的是:
- 避免客户端 IO 线程被长时间阻塞,这会导致系统可用IO线程数的减少
- 业务层需要:大多数系统都会对业务流程执行时间有限制。客户端设置连接超时时间是为了实现业务层的超时。
对于 BIO 的 Socket 来说,调用 connect 方法将被阻塞,直到连接成功或者发生连接超时等异常。
对于 NIO 的 SocketChannel 来说,在非阻塞模式下,它会直接返回连接结果。如果没有连接成功,也没有发生 IO 异常,则需要将 SocketChannel 注册到 Selector 上监听连接结果。所以,异步连接的超时无法在 API 层面直接设置,而是需要通过定时器来主动监测。
在 《Netty Channel和Unsafe》文章 AbstractNioUnsafe 源码分析 章节中对连接过程的超时有详细过程介绍
这样的超时设计既满足了用户的个性化需求,又实现了故障的分层隔离,即上层用户不用关心底层的超时实现机制。
通信对端强制关闭连接
在客户端和服务端正常通信过程中,如果发生网络闪断、对方进程突然宕机或者其他非正常关闭链路事件时,TCP 链路就会发生异常。由于 TCP 是全双工的,通信双方都需要关闭和释放 Socket 句柄才不会发生句柄的泄漏。
在实际的 NIO 编程过程中,我们经常会发现由于句柄没有被及时关闭导致的功能和可靠性问题。究其原因总结如下:
- IO 的读写等操作并非仅仅集中在 Reactor 线程内部,用户上层的一些定制行为可能会导致 IO 操作的外逸,例如业务自定义心跳机制。这些定制行为加大了统一异常处理的难度,IO 操作越发散,故障发生的概率就越大
- 一些异常分支没有考虑到,由于外部环境诱因导致程序进入这些分支,就会引起故障
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
在调用 Socketchannel的read方法时发生了 IOException,从 Channel中读取数据报道缓冲区中的代码如下:
/**
* Transfers the specified source array's data to this buffer starting at
* the specified absolute {@code index}.
* This method does not modify {@code readerIndex} or {@code writerIndex} of
* this buffer.
*
* @throws IndexOutOfBoundsException
* if the specified {@code index} is less than {@code 0},
* if the specified {@code srcIndex} is less than {@code 0},
* if {@code index + length} is greater than
* {@code this.capacity}, or
* if {@code srcIndex + length} is greater than {@code src.length}
*/
public abstract ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);
为了保证 IO 异常被统一处理,该异常应该向上抛,由 NioByteUnsafe 进行统一异常处理
private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
链路关闭
对于短连接协议,例如 HTTP 协议,通信双方数据交互完成之后,通常按照双方的约定由服务端关闭连接,客户端获得 TCP 连接关闭请求之后,关闭自身的 Socket 连接,双方正式断开连接。
在实际的 NIO 编程过程中,经常存在一种误区:认为只要是对方关闭连接,就会发生 IO 异常,捕获 IO 异常之后再关闭连接即可。实际上,连接的合法关闭不会发生 IO 异常,它是一种正常场景,如果遗漏了该场景的判断和处理就会导致连接句柄泄漏。
定制IO故障
在大多数场景下,当底层网络发生故障的时候,应该由底层的 NIO 框架负责释放资源,处理异常等。上层的业务应用不需要关心底层的处理细节。但是,在一些特殊的场景下,用户可能需要感知这些异常,并针对这些异常进行定制处理,例如:
- 客户端的断连重连机制
- 消息的缓存重发
- 接口日志中详细记录故障细节
- 运维相关功能,例如告警、触发邮件/短信等
Netty 的处理策略是发生 IO 异常,底层的资源由它负责释放,同时将异常堆栈信息以事件的形式通知给上层用户,由用户对异常进行定制。这种处理机制既保证了异常处理的安全性,也向上层提供了灵活的定制能力。
具体接口定义以及默认实现(ChannelHandlerAdapter)如下:
/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*
* @deprecated is part of {@link ChannelInboundHandler}
*/
@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
链路的有效性检测
当网络发生单通、连接被防火墙 Hang 住、长时间 GC 或者通信线程发生非预期异常时,会导致链路不可用且不易被及时发现。特别是异常发生在凌晨业务低谷期间,当早晨业务高峰期到来时,由于链路不可用会导致瞬间的大批量业务失败或者超时,这将对系统的可靠性产生重大的威胁。
从技术层面看,要解决链路的可靠性问题,必须周期性的对链路进行有效性检测。目前最流行和通用的做法就是心跳检测。
心跳检测机制分为三个层面:
- TCP 层面的心跳检测,即 TCP 的 Keep-Alive 机制,它的作用域是整个 TCP 协议栈
- 协议层的心跳检测,主要存在于长连接协议中。例如 SMPP 协议
- 应用层的心跳检测,它主要由各业务产品通过约定方式定时给对方发送心跳消息实现
心跳检测的目的就是确认当前链路可用,对方活着并且能够正常接收和发送消息。做为高可靠的 NIO 框架,Netty 也提供了心跳检测机制
不同的协议,心跳检测机制也存在差异,归纳起来主要分为两类
- Ping-Pong 型心跳:由通信一方定时发送 Ping 消息,对方接收到 Ping 消息之后,立即返回 Pong 应答消息给对方,属于请求-响应型心跳。
- Ping-Ping 型心跳:不区分心跳请求和应答,由通信双方按照约定定时向对方发送心跳 Ping 消息,它属于双向心跳。
心跳检测策略如下
- 连续 N 次心跳检测都没有收到对方的 Pong 应答消息或者 Ping 请求消息,则认为链路已经发生逻辑失效,这被称作心跳超时。
- 读取和发送心跳消息的时候如何直接发生了 IO 异常,说明链路已经失效,这被称为心跳失败。
无论发生心跳超时还是心跳失败,都需要关闭链路,由客户端发起重连操作,保证链路能够恢复正常。
Netty 的心跳检测实际上是利用了链路空闲检测机制实现的,而 Netty 对该机制实现了三种方式:WriteTimeoutHandler
、ReadTimeoutHandler
、IdleStateHandler
//WriteTimeoutHandler 写超时接口
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
//ReadTimeoutHandler 读超时接口
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
//IdleStateHandler 链路空闲超时接口
//链路空闲的时候并没有关闭链路,而是触发IdleStateEvent事件,用户订阅IdleStateEvent事件,
//用于自定义逻辑处理,例如关闭链路、客户端发起重新连接、告警和打印日志等。
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
利用 Netty 提供的链路空闲检测机制,可以非常灵活的实现协议层的心跳检测
Reactor线程的保护
Reactor 线程作为 IO 操作的核心,一旦出现故障则导致挂在其上面的多路复用器和多个链路无法正常工作。
异常处理
尽管 Reactor 线程主要处理 IO 操作,发生的异常通常是 lO 异常,但是,实际上在些特殊场景下会发生非 IO 异常,如果仅仅捕获 IO 异常可能就会导致 Reactor 线程跑飞为了防止发生这种意外,在循环体内一定要捕获 Throwable,而不是 IO 异常或者 Exception
//NioEventLoop
@Override
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
// Select操作...
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
捕获 Throwable 之后,即便发生了意外未知对异常,线程也不会跑飞,它休眠1S,防止死循环导致的异常绕接,然后继续恢复执行。
这样处理的核心理念就是:
- 某个消息的异常不应该导致整条链路不可用
- 某条链路不可用不应该导致其他链路不可用
- 某个进程不可用不应该导致其他集群节点不可用
规避 NIO Bug
通常情况下,死循环是可检测、可预防但是无法完全避免的。Reactor 线程通常处理的都是 IO 相关的操作,因此我们重点关注 IO 层面的死循环。
JDK NIO 类库最著名的就是 epoll bug 了,它会导致 Selector 空轮询,IO 线程 CPU 100%,严重影响系统的安全性和可靠性。
Netty 的解决策略:
- 根据该 BUG 的特征,首先侦测该 BUG 是否发生
- 将问题 Selector 上注册的 Channel 转移到新建的 Selector 上
- 老的问题 Selector 关闭,使用新建的 Selector 替换。
//NioEventLoop select
private void select() throws IOException {
Selector selector = this.selector;
try {
for (;;) {
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
}
} catch (CancelledKeyException e) {
}
}
内存保护
NIO 通信的内存保护主要集中在如下几点:
- 链路总数的控制:每条链路都包含接收和发送缓冲区,链路个数太多容易导致内存溢出
- 单个缓冲区的上限控制:防止非法长度或者消息过大导致内存溢出
- 缓冲区内存释放:防止因为缓冲区使用不当导致的内存泄露
- NIO消息发送队列的长度上限控制
缓冲区的内存泄漏保护
为了提升内存的利用率,Netty 提供了内存池和对象池。但是,基于缓存池实现以后需要对内存的申请和释放进行严格的管理,否则很容易导致内存泄漏。
如果不采用内存池技术实现,每次对象都是以方法的局部变量形式被创建,使用完成之后,只要不再继续引用它,JVM 会自动释放。但是,一旦引入内存池机制,对象的生命周期将由内存池负责管理,这通常是个全局引用,如果不显式释放 JVM 是不会回收这部分内存的。
对于 Netty 的用户而言,使用者的技术水平差异很大,一些对 JVM 内存模型和内存泄漏机制不了解的用户,可能只记得申请内存,忘记主动释放内存,特别是 JAVA 程序员为了防止因为用户遗漏导致内存泄漏,Netty 在 Pipeline 的尾 Handler 中自动对内存进行释放。
//TailHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
//ReferenceCountUtil
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
//PooledByteBuf extends AbstractReferenceCountedByteBuf 释放内存
//对于实现了AbstractReferenceCountedByteBuf的ByteBuf,内存申请、使用和释放的时候
//Netty都会自动进行引用计数检测,防止非法使用内存
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, handle);
recycle();
}
}
缓冲区溢出保护
做过协议栈的读者都知道,当我们对消息进行解码的时候,需要创建缓冲区。缓冲区的创建方式通常有两种:
- 容量预分配,在实际读写过程中如果不够再扩展
- 根据协议消息长度创建缓冲区
在实际的商用环境中,如果遇到畸形码流攻击、协议消息编码异常、消息丢包等问题时,可能会解析到一个超长的长度字段。笔者曾经遇到过类似问题,报文长度字段值竟然是 2G 多,由于代码的一个分支没有对长度上限做有效保护,结果导致内存溢出。系统重启后几秒内再次内存溢出,幸好及时定位出问题根因,险些酿成严重的事故。
Netty 提供了编解码框架,因此对于解码缓冲区的上限保护就显得非常重要。下面,我们看下 Netty 是如何对缓冲区进行上限保护的:
首先,在内存分配的时候指定缓冲区长度上限:
/**
* Allocate a {@link ByteBuf} with the given initial capacity and the given
* maximal capacity. If it is a direct or heap buffer depends on the actual
* implementation.
*/
ByteBuf buffer(int initialCapacity, int maxCapacity);
其次,在对缓冲区进行写入操作的时候,如果缓冲区容量不足需要扩展,首先对最大容量进行判断,如果扩展后的容量超过上限,则拒绝扩展;
@Override
public final ByteBuf capacity(int newCapacity) {
if (newCapacity == length) {
ensureAccessible();
return this;
}
}
在消息解码的时候,对消息长度进行判断,如果超过最大容量上限,则抛岀解码异常,拒绝分配内存。
流量整形
大多数的商用系统都有多个网元或者部件组成,例如参与短信互动,会涉及手机、基站、短信中心、短信网关、SP/CP 等网元。不同网元或者部件的处理性能不同。为了防止因为浪涌业务或者下游网元性能低导致下游网元被压垮,有时候需要系统提供流量整形功能。
下面我们一起看下流量整形的定义;流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。一个典型应用是基于下游网络结点的 TP 指标来控制本地流量的输岀。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,也称流量整形( Traffic Shaping,简称TS)。当令牌桶有足够的令牌时,再均匀的向外发送这些被缓存的报文。流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。
流量整形的原理示意图如下图所示。
作为高性能的 NIO 框架,Netty 的流量整形有两个作用:
- 防止由于上下游网元性能不均衡导致下游网元被压垮,业务流程中断
- 防止由于通信模块接收消息过快,后端业务线程处理不及时导致的“撑死”问题
全局级流量整形
全局流量整形的作用范围是进程级的,无论你创建了多少个 Channel,它的作用域针对所有的 Channel。
用户可以通过参数设置:报文的接收速率、报文的发送速率、整形周期 GlobalTrafficShapingHandler 的接口定义如下所示:
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
createGlobalTrafficCounter(executor);
}
Netty 流量整形的原理是:对每次读取到的 ByteBuf 可写字节数进行计算,获取当前的报文流量,然后与流量整形阈值对比。如果已经达到或者超过了阈值。则计算等待时间 delay,将当前的 ByteBuf 放到定时任务 Task 中缓存,由定时任务线程池在延迟 delay 之后继续处理该 ByteBuf。如果达到整形阈值,则对新接收的 ByteBuf 进行缓存,放入线程池的消息队列中,稍后处理。定时任务的延时时间根据检测周期 T 和流量整形阈值计算得来。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// compute the number of ms to wait before reopening the channel
long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
long wait = 0;
if (perChannel != null) {
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
long maxLocalRead;
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
maxLocalRead = 0;
}
if (maxGlobalRead < maxLocalRead) {
maxGlobalRead = maxLocalRead;
}
wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
}
}
if (wait < waitGlobal) {
wait = waitGlobal;
}
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (logger.isDebugEnabled()) {
logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
config.setAutoRead(false);
channel.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
}
}
informReadOperation(ctx, now);
ctx.fireChannelRead(msg);
}
需要指出的是,流量整形的阈值 limit 越大,流量整形的精度越高,流量整形功能是可靠性的一种保障,它无法做到 100% 的精确。这个跟后端的编解码以及缓冲区的处理策略相关,此处不再赘述。感兴趣的朋友可以思考下,Netty 为什么不做到 100% 的精确。
流量整形与流控的最大区别在于流控会拒绝消息,流量整形不拒绝和丢弃消息,无论接收量多大,它总能以近似恒定的速度下发消息,跟变压器的原理和功能类似。
链路级流量整形
除了全局流量整形,Netty 也支持链路级的流量整形,ChannelTrafficShapingHandler 接口定义如下:
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
/**
* Create a new instance.
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
}
单链路流量整形与全局流量整形的最大区别就是它以单个链路为作用域,可以对不同的链路设置不同的整形策略。它的实现原理与全局流量整形类似,我们不再赘述。值得说明的是,Netty 支持用户自定义流量整形策略,通过继承 AbstractTrafficShapingHandler 的 doAccounting 方法可以定制整形策略。
优雅停机接口
具体的优雅停机接口的内容在《Netty C/S创建与初始化》有详细讲解
优化建议
发送队列容量上限控制
Netty 的 NIO 消息发送队列 ChannelOutboundBuffer 并没有容量上限控制,它会随着消息的积压自动扩展,直到达到0x7fffffff
。
如果网络对方处理速度比较慢,导致 TCP 滑窗长时间为 0;或者消息发送方发送速度过快,或者一次批量发送消息量过大,都可能会导致 ChannelOutboundBuffer 的内存膨胀,这可能会导致系统的内存溢出。
建议优化方式如下:在启动客户端或者服务端的时候,通过启动项的 ChannelOption 设置发送队列的长度,或者通过 -D 启动参数配置该长度。
回推发送失败的消息
当网络发生故障的时候,Netty 会关闭链路,然后循环释放待未发送的消息,最后通知监听 listener。
这样的处理策略值得商榷,对于大多数用户而言,并不关心底层的网络 IO 异常,他们希望链路恢复之后可以自动将尚未发送的消息重新发送给对方,而不是简单的销毁 Netty 销毁尚未发送的消息,用户可以通过监听器来得到消息发送异常通知,但是却无法获取原始待发送的消息。如果要实现重发,需要自己缓存消息,如果发送成功,自己删除,如果发送失败,重新发送。这对于大多数用户而言,非常麻烦,用户在开发业务代码的同时,还需要考虑网络 IO 层的异常并为之做特殊的业务逻辑处理。
下面我们看下 Mina 的实现,当发生链路异常之后,Mina 会将尚未发送的整包消息队列封装到异常对象中,然后推送给用户 Handler,由用户来决定后续的处理策略。相比于 Netty 的“野蛮”销毁策略,Mina 的策略更灵活和合理,由用户自己决定发送失败消息的后续处理策略大多数场景下,业务用户会使用 RPC 框架,他们通常不需要直接针对 Netty 编程,如果 Netty 提供了发送失败消息的回推功能,RPC 框架就可以进行封装,提供不同的策略给业务用户使用,例如:
- 缓存重发策略:当链路发生异常之后,尚未发送成功的消息自动缓存,待链路恢复正常之后重发失败的消息
- 失败删除策略:当链路发生异常之后,尚未发送成功的消息自动销毁,它可能是非重要消息,例如日志消息,也可能是由业务直接监听异常并做特殊处理
- 其他策略