提起 Channel,可以联想到java.nio.SocketChannel
和java.nio.ServerSocketChannel
,它们用于非阻塞的IO操作。类似于 NIO 的 Channel,Netty 提供了自己的 Channel 和其子类实现,用于异步 IO 操作和其他相关的操作。
Unsafe 是个内部接口,聚合在 Channel 中协助进行网络读写相关的操作,因为它的设计初衷就是 Channel 的内部辅助类,不应该被 Netty 框架的上层使用者调用,所以被命名为 Unsafe。这里不能仅从字面理解认为它是不安全的操作,而要从整个架构的设计层面体会它的设计初衷和职责。
Channel功能说明
io.netty.channel.Channel
是 Netty 网络操作抽象类,它聚合了一组功能,包括但不限于网路的读、写,客户端发起连接,主动关闭连接,链路关闭,获取通信双方的网络地址等。它也包含了 Netty 框架相关的一些功能,包括获取该 Channel 的 EventLoop,获取缓冲分配器 ByteBufAllocator 和 pipeline等。
Channel的工作原理
Channel 是 Netty 抽象出来的网络I/O读写相关的接口,为什么不使用 JDK NIO 原生的 Channel 而要另起炉灶呢,主要原因如下:
- JDK 的 SocketChannel 和 ServerSocketChannel 没有统一的 Channel 接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便。
- JDK 的 SocketChannel 和 ServerSocketChannel 的主要职责就是网络 I/O 操作,由于它们是 SPI 类接口,由具体的虚拟机厂家来提供,所以通过继承 SPI 功能类来扩展其功能的难度很大;直接实现 ServerSocketChannel 和 SocketChannel 抽象类,其工作量和重新开发一个新的 Channel 功能类是差不多的。
- Netty 的 Channel 需要能够跟 Netty 的整体架构融合在一起,例如 IO 模型、基于 ChannelPipeline 的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些 JDK 的 SocketChannel 和 ServerSocketChannel 都没有提供,需要重新封装。
- 自定义的 Channel,功能实现更加灵活。
基于上述4个原因,Netty 重新设计了 Channel 接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下:
- 在 Channel 接口层,采用 Facade 模式进行统一封装,将网络IO操作、网络IO相关联的其他操作封装起来,统一对外提供
- Channel 接口的定义尽量大而全,为 SocketChannel 和 ServerSocketChannel 提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
- 具体实现采用聚合而非包含的方式,将相关的功能类聚合在 Channel 中,由 Channel 统一负责分配和调度,功能实现更加灵活。
Channel功能介绍
网络IO操作
1. read()
Channel read();
从当前的 Channel 中读取数据到第一个 inbound 缓冲区中,如果数据被成功读取,触发channelRead()事件,读取操作API调用完成之后,紧接着会触发channelReadComplete()事件,这样业务的 ChannelHandler 可以决定是否需要继续读取数据。如果已经有读操作请求被挂起,则后续的读操作会被忽略。
2. write()
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelFuture write(Object msg);
请求将当前的参数 msg 通过 ChannelPipeline 写入到目标 Channel 中。
注意
write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用 flush 操作才会被写入到 Channel 中,发送给对方。
3. flush()
Channel flush();
将之前写入到发送环形数组中的消息全部写入到目标 Channel 中,发送给通信对方。
4. writeAndFlush()
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
write 和 flush 的聚合操作
5. bind()
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
绑定指定的本地 Socket 地址 localAddress,该方法会级联触发 bind 事件。
6. connect()
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
客户端使用指定的服务端地址 Remote Address 发起连接请求,如果连接因为应答超时而失败,ChannelFuture 中的操作结果就是 ConnectTimeoutException 异常;如果连接被拒绝,操作结果为 ConnectException。该方法会级联触发 ChannelPipeline 中所有 ChannelHandler 的 connect 事件。该方法也可以携带一个 ChannelPromise 参数用于写入操作结果。
7. disconnect()
ChannelFuture disconnect();
ChannelFuture disconnect(ChannelPromise promise);
请求断开与远程通信对端的连接并使用 ChannelPromise 来获取操作结果的通知消息。该方法会级联触发 ChannelPipeline 中所有 ChannelHandler 的 disconnect 事件。
8. close()
ChannelFuture close();
ChannelFuture close(ChannelPromise promise);
主动关闭当前连接,通过参数 ChannelPromise 设置操作结果并进行结果通知,无论操作是否成功,都可以通过 ChannelPromise 获取操作结果。该操作会级联触发 ChannelPipeline 中所有 ChannelHandler 的 close 事件。
9. config() & metadata()
//Channel配置信息
ChannelConfig config();
//对应连接的Channel的TCP参数配置(每个连接都有自己的TCP参数配置)
ChannelMetadata metadata();
其他常用API
1. eventLoop()
Channel 需要注册到 EventLoop 的多路复用器上,用于处理IO事件,通过 eventLoop() 方法可以获取到 Channel 注册的 EventLoop。EventLoop本质上就是处理网络读写事件的 Reactor 线程。在 Netty 中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义 NioTask 等任务。
2. parent()
对于服务端 Channel 而言,它的父 Channel 为空;对于客户端 Channel,它的父 Channel 就是创建它的 ServerSocketChannel。
3. id()
获取 Channel 标识的id,它返回 ChannelId 对象, ChannelID 是 Channel 的唯一标识,它的可能生成策略:
- 机器的MAC地址(EU-48或者EUI-64)等可以代表全局唯一的信息
- 当前的进程ID
- 当前系统时间的毫秒 —— System.currentTimeMillis
- 当前系统时间纳秒数 —— Systen.nanoTime
- 32位的随机整型数
- 32位自增的序列数
Channel源码分析
Channel 的实现子类非常多,我们只对 NioSocketChannel 和 NioServerSocketChannel 进行重点分析
下图为 NioServerSocketChannel 继承关系
下图为 NioSocketChannel 继承关系
AbstractChannel源码分析
AbstractChannel 聚合了所有 Channel 使用到的能力对象,并提供初始化和统一封装,如果功能和子类强相关,则定义成抽象方法由子类具体实现。
成员变量定义
//estimatorHandle用于预测下一个报文的大小,它基于之前数据的采样进行分析预测
private MessageSizeEstimator.Handle estimatorHandle;
核心API源码分析
Netty 基于事件驱动,当 Channel 进行 IO 操作时会产生对应的 IO 事件,然后驱动事件在 ChannelPipeline 中传播,由对应的 ChannelHandler 对事件进行拦截和处理,不关心的事件可以直接忽略。采用事件驱动的方式可以非常轻松地通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更高,但是功能却基本等价。
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
AbstractNioChannel源码分析
成员变量定义
//由于 NIO Channel、NioSocketChannel 和 NioServerSocketChannel 需要共用,
//所以定义了一个`java.nio.SocketChannel` 和`java.nio.ServerSocketChannel`的
//公共父类 SelectableChannel,用于设置 SelectableChannel 参数和进行IO操作。
private final SelectableChannel ch;
//代表了JDK SelectionKey的OP_READ
protected final int readInterestOp;
//SelectionKey是Channel注册到EventLoop后返回的选择键
//由于Channel会面临多个业务线程的并发写操作,当SelectionKey由SelectionKey修改之后,
//为了能让其他业务线程感知到变化,所以需要使用volatile保证修改的可见性
private volatile SelectionKey selectionKey;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
//代表连接操作结果的ChannelPromise
private ChannelPromise connectPromise;
//连接超时定时器ScheduledFuture
private ScheduledFuture<?> connectTimeoutFuture;
//请求的通信地址信息
private SocketAddress requestedRemoteAddress;
核心API源码分析
Channel的注册
定义一个布尔类型的局部变量 selected 来标识注册操作是否成功,调用 SelectableChannel 的 register 方法,将当前的 Channel 注册到 EventLoop 的多路复用器上。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//注册值为0,说明对任何操作都不感兴趣,仅完成注册
//此处将AbstractNioChannel的实现子类自身当作附件注册
//如果注册成功则返回selectionKey,通过它可以从Selector中获取Channel对象
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
}
//当前注册返回的selectionKey如被取消捕获CancelledKeyException异常
catch (CancelledKeyException e) {
if (!selected) {
//如果是第一次处理该异常,调用多路复用器的selectNow方法将已经取消的
//selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true,
//说明之前失效的 selectionKey 已经被删除掉。
eventLoop().selectNow();
selected = true;
} else {
//继续发起下一次注册操作,如果仍然发生CancelledKeyException异常,
//说明我们无法删除已经被取消的selectionKey,按照JDK的API说明,
//这种意外不应该发生。如果发生这种问题,则说明可能NIO的相关类库存在不可恢复
//的BUG,直接抛出CancelledKeyException异常到上层进行统一处理
throw e;
}
}
}
}
注册 Channel 的时候需要指定监听的网络操作位来表示 Channel 对哪几类网络事件感兴趣,具体的定义如下。
// -- Operation bits and bit-testing convenience methods --
/*
Operation-set bit for read operations.
Suppose that a selection key's interest set contains OP_READ at the start of a selection operation. If the selector detects that the corresponding channel is ready for reading, has reached end-of-stream, has been remotely shut down for further reading, or has an error pending, then it will add OP_READ to the key's ready-operation set and add the key to its selected-key set.
*/
public static final int OP_READ = 1 << 0;
/*
Operation-set bit for write operations.
Suppose that a selection key's interest set contains OP_WRITE at the start of a selection operation. If the selector detects that the corresponding channel is ready for writing, has been remotely shut down for further writing, or has an error pending, then it will add OP_WRITE to the key's ready set and add the key to its selected-key set.
*/
public static final int OP_WRITE = 1 << 2;
/*
Operation-set bit for socket-connect operations.
Suppose that a selection key's interest set contains OP_CONNECT at the start of a selection operation. If the selector detects that the corresponding socket channel is ready to complete its connection sequence, or has an error pending, then it will add OP_CONNECT to the key's ready set and add the key to its selected-key set.
*/
// C connect S
public static final int OP_CONNECT = 1 << 3;
/*
Operation-set bit for socket-accept operations.
Suppose that a selection key's interest set contains OP_ACCEPT at the start of a selection operation. If the selector detects that the corresponding server-socket channel is ready to accept another connection, or has an error pending, then it will add OP_ACCEPT to the key's ready set and add the key to its selected-key set.
*/
// S accept C
public static final int OP_ACCEPT = 1 << 4;
Channel读取前处理
@Override
protected void doBeginRead() throws Exception {
//判断Channel是否关闭,如果处于关闭中,则直接返回
if (inputShutdown) {
return;
}
//获取当前的 SelectionKey进行判断
final SelectionKey selectionKey = this.selectionKey;
//如果可用,说明Channel当前状态正常,则可以进行正常的操作位修改
if (!selectionKey.isValid()) {
return;
}
//将SelectionKey当前的操作位与读操作位进行按位与操作
final int interestOps = selectionKey.interestOps();
//如果等于0,说明目前并没有设置读操作位,随后设置
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
AbstractNioByteChannel源码分析
成员变量定义
//负责继续写半包消息
private Runnable flushTask;
核心API源码分析
Channel的写操作
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
for (;;) {
//从发送消息环形数组ChannelOutboundBuffer弹出一条消息
//首先,获取需要发送的消息,如果消息为ByteBuf且它分配的是JDK的非堆内存,则直接返回。
Object msg = in.current(true);
//对返回的消息进行判断,如果为空,说明消息发送数组中所有待发送的消息都已经发送完成,
//清除半包标识
if (msg == null) {
//清除半包标识,也就是清除写操作位
clearOpWrite();
break;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
//可读字节为0,丢弃
if (readableBytes == 0) {
in.remove();
continue;
}
//声明消息发送相关的成员变量
//写半包标识
boolean setOpWrite = false;
//消息是否全部发送标识
boolean done = false;
//发送的总消息字节数
long flushedAmount = 0;
//对循环发送次数进行判断,如果为-1,则从Channel配置对象中获取循环发送次数。
//循环发送次数是指当一次发送没有完成时(写半包),继续循环发送的次数。
//设置写半包最大循环次数的原因是当循环发送的时候,IO线程会一直尝试进行写操作,
//此时IO线程无法处理其他的IO操作,例如读新的消息或者执行定时任务和 NioTask等,
//如果网络IO阻塞或者对方接收消息太慢,可能会导致线程假死。
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
//发送数据(抽象方法)
int localFlushedAmount = doWriteBytes(buf);
//如本次发送的字节数为0,说明发送TCP缓冲区己满,发生了ZERO_WINDOW。
//此时再次发送仍然可能出现写0字节,空循环会占用CPU的资源,导致I/O线程
//无法处理其他IO操作,所以将写半包标识setOpWrite设置为true,退出循环,
//释放IO线程。
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
//如果发送的字节数大于0,则对发送总数进行计数。判断当前消息是否已经发送成功
//(缓冲区没有可读字节),如果发送成功则设置done为true,退出当前循环。
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
//消息发送操作完成之后调用ChannelOutboundBuffer更新发送进度信息,
in.progress(flushedAmount);
//对发送结果进行判断。如果发送成功,则将已经发送的消息从发送数组中删除;
//否则调用incompleteWrite方法,设置写半包标识,启动刷新线程继续发送之前没有发送
//完全的半包消息(写半包)
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
}
//文件传输
else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
}
}
处理半包发送任务的方法
protected final void incompleteWrite(boolean setOpWrite) {
//判断是否需要设置写半包标识,如果需要则调用setOpWrite设置写半包标识
//如果SelectionKey的OP_WRITE被设置,多路复用器会不断轮询对应的Channel,
//用于处理没有发送完成的半包消息,直到清除SelectionKey的OP_WRITE操作位
//设置了OP_WRITE操作位后,就不需要启动独立的Runnable来负责发送半包消息了
//没有设置OP_WRITE操作位,需要启动独立的Runnable,将其加入到EventLoop中执行,
//由Runnable负责半包消息的发送
if (setOpWrite) {
setOpWrite();
} else {
//调用flush方法来发送缓冲数组中的消息
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
AbstractNioMessageChannel源码分析
//主要实现方法只有这一个
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
}
}
在循环体内对消息进行发送,从 ChannelOutboundBuffer 中弹出一条消息进行处理,如果消息为空,说明发送缓冲区为空,所有消息都已经被发送完成。清除写半包标识,退出循环。与 AbstractNioByteChannel 的循环发送类似,利用 writeSpinCount 对单条消息进行发送,调用 doWriteMessage 判断消息是否发送成功。如果成功,则将发送标识 done 设置为 true,退出循环;否则继续执行循环,直到执行 writeSpinCount 次。
发送操作完成之后,判断发送结果,如果当前的消息被完全发送出去,则将该消息从缓冲数组中删除;否则设置半包标识,注册 SelectionKey.OP_WRITE 到多路复用器上,由多路复用器轮询对应的 Channel 重新发送尚未发送完全的半包消息。
通过代码分析我们发现, AbstractNioMessageChannel 和 AbstractNioByteChannel 的消息发送实现比较相似,不同之处在于:一个发送的是 ByteBuf 或者 FileRegion,它们可以直接被发送:另一个发送的则是 POJO 对象。
AbstractNioMessageServerChannel源码分析
AbstractNioMessageServerChannel 的实现非常简单,它定义了一个 EventLoopGroup 类型的 childGroup,用于给新接入的客户端 NioSocketChannel 分配 EventLoop。
public abstract class AbstractNioMessageServerChannel extends AbstractNioMessageChannel implements ServerChannel {
private final EventLoopGroup childGroup;
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
super(parent, eventLoop, ch, readInterestOp);
this.childGroup = childGroup;
}
@Override
public EventLoopGroup childEventLoopGroup() {
return childGroup;
}
}
每当服务端接入一个新的客户端连接 NioSocketChannel 时,都会调用 childEventLoopGroup 方法获取 EventLoopGroup 线程组,用于给 NioSocketChannel 分配 Reactor 线程 EventLoop。
//class NioServerSocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
return 0;
}
NioServerSocketChannel源码分析
成员变量定义
//metadate & config
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final ServerSocketChannelConfig config;
//静态创建了 ServerSocketChannel
private static ServerSocketChannel newSocket() {
return ServerSocketChannel.open();
}
核心API源码分析
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
return 0;
}
首先通过 ServerSocketChannel 的 accept 接收新的客户端连接,如果 SocketChannel 不为空,则利用当前的 NioServerSocketChannel、EventLoop 和 SocketChannel 创建新的 NioSocketChannel,并将其加入到 List<object> buf
中,最后返回1,表示服务端消息读取成功。
对于 NioServerSocketChannel,它的读取操作就是接收客户端的连接,创建 NioSocketChannel 对象。
对于与服务端 Channel 无关的接口定义,由于这些方法是客户端 Channel 相关的,因此,对于服务端 Channel 无须实现。如果这些方法被误调,则返回 UnsupportedOperationException 异常。
// Unnecessary stuff
@Override
protected boolean doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
NioSocketChannel源码分析
连接操作
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
//参数检验
if (localAddress != null) {
//绑定该地址
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
//发起TCP连接,有三个可能的结果
//1. 连接成功,返回true
//2. 暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回 false
//3. 连接失败,直接抛出IO异常
boolean connected = javaChannel().connect(remoteAddress);
//如果是2,需要将NioSocketChannel中的selectionKey设置为OP_CONNECT,监听连接网络操作位
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
//如果抛出了IO异常,说明客户端的TCP握手请求直接被REST或者被拒绝,此时需要关闭客户端连接
return connected;
} finally {
if (!success) {
doClose();
}
}
}
写半包
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
// Do non-gathering write for a single buffer case.
//获取待发送的ByteBuf个数,如果小于等于1,则调用父类 AbstractNioByteChannel 的
//doWrite方法,操作完成之后退出。
final int msgCount = in.size();
if (msgCount <= 1) {
super.doWrite(in);
return;
}
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers == null) {
super.doWrite(in);
return;
}
//在批量发送缓冲区的消息之前,先对一系列的局部变量进行赋值
//需要发送的ByteBuffer数组个数nioBufferCnt
int nioBufferCnt = in.nioBufferCount();
//从ChannelOutboundBuffer中获取需要发送的总字节数
long expectedWrittenBytes = in.nioBufferSize();
//从NioSocketChannel中获取NIO的SocketChannel
final SocketChannel ch = javaChannel();
long writtenBytes = 0;
//是否发送完成标识
boolean done = false;
//是否有写半包标识
boolean setOpWrite = false;
//循环发送
//就像循环读一样,我们必须对一次Selector轮询的写操作次数进行上限控制,
//因为如果TCP的发送缓冲区满,TCP处于KEEP-ALIVE状态,消息会无法发送出去,
//如果不对上限进行控制,就会长时间地处于发送状态,Reactor线程无法及时读取
//其他消息和执行排队的Task。
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
//调用NIO SocketChannel的write方法,它有三个参数:
//第一个是需要发送的ByteBuffer数组
//第二个是数组的偏移量
//第三个参数是发送的ByteBuffer个数
//返回值是写入SocketChannel的字节个数
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
//下面对写入的字节进行判断,如果为0,说明TCP发送缓冲区已满,很有可能无法再写进去,
//因此从循环中跳出,同时将写半包标识设置为true,用于向多路复用器注册写操作位,
//告诉多路复用器有没发完的半包消息,需要轮询出就绪的SocketChannel继续发送。
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
//发送操作完成后进行两个计算:
//1. 需要发送的字节数要减去已经发送的字节数
//2. 发送的字节总数+已经发送的字节数
//更新完这两个变量后,判断缓冲区中所有的消息是否已经发送完成。
//如果是,则把发送完成标识设置为true同时退出循环;
//如果没有发送完成,则继续循环。
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
//从循环发送中退出之后,首先对发送完成标识done进行判断,
//如果发送完成,则循环释放己经发送的消息。
//环形数组的发送缓冲区释放完成后,取消半包标识,告诉多路复用器消息已经全部发送完成
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
// Finish the write loop if no new messages were flushed by in.remove().
if (in.isEmpty()) {
clearOpWrite();
break;
}
} else {
//出现写半包的处理方式
//首先,循环遍历发送缓冲区,对消息的发送结果进行判断,下面具体展开进行说明
//1.从ChannelOutboundBuffer弹出第一条发送的ByteBuf,然后获取该ByteBuf的
//读索引和可读字节数。
//2.对可读字节数和发送的总字节数进行比较,如果发送的字节数大于可读的字节数,
//说明当前的ByteBuf已经被完全发送出去,更新ChannelOutboundBuffer的发送进度信息,
//将已经发送的ByteBuf删除,释放相关资源。
//最后,发送的字节数要减去第一条发送的字节数,得到后续消息发送的总字节数,
//然后继续循环判断第二条消息、第三条消息……
//3.如果可读的消息大于已经发送的总字节数,说明这条消息没有被完整地发送出去,
//仅仅发送了部分数据报,也就是出现了所谓的“写半包”问题。此时,需要更新可读的索引
//为当前索引+已经发送的总字节数,然后更新ChannelOutboundBuffer的发送进度信息,
//退出循环。
//4.如果可读字节数等于已经发送的总字节数,则说明最后一次发送的消息是个整包消息,
//没有剩余的半包消息待发送。更新发送进度信息,将最后一条已发送的消息从缓冲区中删除,
//最后退出循环。
//循环发送操作完成之后,更新SocketChannel的操作位为OP_WRITE,由多路复用器在下一次轮
//询中触发 SocketChannel,继续处理没有发送完成的半包消息。
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
incompleteWrite(setOpWrite);
break;
}
}
}
读写操作
NioSocketChannel 的读写操作实际上是基于 NIO 的 SocketChannel 和 Netty 的 ByteBuf 封装而成。
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);
//分析setBytes在UnpooledHeapByteBuf实现的内容
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
return writtenBytes;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
ensureAccessible();
try {
return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
} catch (ClosedChannelException e) {
return -1;
}
}
从 SocketChannel 中读取字节数组到缓冲区java.nio.ByteBuffer
中,它的起始 position 为 writeIndex,limit 为 writeIndex + length。
Unsafe功能说明
Unsafe 接口实际上是 Channel 接口的辅助接口,它不应该被用户代码直接调用。实际的IO读写操作都是由 Unsafe 接口负责完成的。
下图是 Unsafe API
Unsafe源码分析
实际的网络IO操作基本都是由 Unsafe 功能类负责实现的,下面我们一起看下它的主要功能子类和重要的API实现。
AbstractUnsafe源码分析
register方法
register 方法主要用于将当前 Unsafe 对应的 Channel 注册到 EventLoop 的多路复用器上并 Trigger 相应的事件
@Override
public final void register(final ChannelPromise promise) {
//判断当前所在的线程是否是 Channel 对应的 NioEventLoop 线程
if (eventLoop.inEventLoop()) {
//如果是同一个线程,则不存在多线程并发操作问题,直接调用 register0 进行注册
register0(promise);
} else {
//如果是由用户线程或者其他线程发起的注册操作,则将注册操作封装成 Runnable,
//放到 NioEventLoop 任务队列中执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
//如果直接执行 register0 方法,会存在多线程并发操作 Channel 的问题
private void register0(ChannelPromise promise) {
//调用ensureOpen方法判断当前Channel是否打开
if (!ensureOpen(promise)) {
//Channel未打开,直接返回
return;
}
//抽象方法,导出类负责实现(由AbstractNioUnsafe对应的AbstractNioChannel实现)
doRegister();
registered = true;
promise.setSuccess();
//调取注册事件
pipeline.fireChannelRegistered();
//调取Channel激活事件
if (isActive()) {
pipeline.fireChannelActive();
}
}
bind方法
bind 方法主要用于绑定指定的端口,对于服务端,用于绑定监听端口,可以设置 backlog 参数;对于客户端,主要用于指定客户端 Channel 的本地绑定 Socket 地址。
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
//导出类实现(NIO 绑定方法的CS端调用是分离的)
try {
doBind(localAddress);
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
promise.setSuccess();
}
disconnect方法
disconnect 用于客户端或者服务端主动关闭连接
@Override
public final void disconnect(final ChannelPromise promise) {
boolean wasActive = isActive();
try {
doDisconnect();
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
promise.setSuccess();
closeIfClosed(); // doDisconnect() might have closed the channel
}
close方法
在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成 Runnable 稍后再执行。
如果链路没有处于刷新状态,需要从 closeFuture 中判断关闭操作是否完成,如果已经完成,不需要重复关闭链路,设置 ChannelPromise 的操作结果为成功并返回。
@Override
public final void close(final ChannelPromise promise) {
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
close(promise);
}
});
return;
}
if (closeFuture.isDone()) {
// Closed already.
promise.setSuccess();
return;
}
执行关闭操作,将消息发送缓冲数组设置为空,通知JVM进行内存回收。调用抽象方法 doClose 关闭链路。如果关闭操作成功,设置 ChannelPromise 结果为成功。如果操作失败,则设置异常对象到 ChannelPromise 中。
boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
try {
doClose();
closeFuture.setClosed();
promise.setSuccess();
} catch (Throwable t) {
closeFuture.setClosed();
promise.setFailure(t);
}
调用 ChannelOutboundBuffer 的 close 方法释放缓冲区的消息,随后构造链路关闭通知 Runnable 放到 NioEventLoop 中执行。
// Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
deregister();
}
}
最后,调用 deregister 方法,将 Channel 从多路复用器上取消注册。
write方法
write 方法实际上将消息添加到环形发送数组中,并不是真正的写 Channel
@Override
public void write(Object msg, ChannelPromise promise) {
if (!isActive()) {
// Mark the write request as failure if the channel is inactive.
if (isOpen()) {
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
} else {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
}
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
} else {
outboundBuffer.addMessage(msg, promise);
}
}
如果 Channel 没有处于激活状态,说明 TCP 链路还没有真正建立成功,当前 Channel 存在以下两种状态。
- Channel 打开,但是 TCP 链路尚未建立成功:NOT_YET_CONNECTED_EXCEPTION
- Channel 已经关闭:CLOSED_CHANNEL_EXCEPTION
对链路状态进行判断,给 ChannelPromise 设置对应的异常,然后调用 ReferenceCountUtil 的 release 方法释放发送的 msg 对象。
如果链路状态正常,则将需要发送的msg和 promise放入发送缓冲区中(环形数组)。
flush方法
flush 方法负责将发送缓冲区中待发送的消息全部写入到 Channel 中,并发送给通信对方。
@Override
public void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//将发送环形数组的unflushed指针修改为tail,标识本次要发送消息的缓冲区范围
outboundBuffer.addFlush();
//调用flush0进行发送
flush0();
}
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
} else {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
outboundBuffer.failFlushed(t);
} finally {
inFlush0 = false;
}
}
doWrite方法
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
//计算需要发送的消息个数(unflushed-flush),如果只有1个消息需要发送,则调用父类的写操作
final int msgCount = in.size();
if (msgCount <= 1) {
super.doWrite(in);
return;
}
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers == null) {
super.doWrite(in);
return;
}
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
final SocketChannel ch = javaChannel();
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
// Finish the write loop if no new messages were flushed by in.remove().
if (in.isEmpty()) {
clearOpWrite();
break;
}
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
incompleteWrite(setOpWrite);
break;
}
}
}
AbstractNioUnsafe源码分析
AbstractNioUnsafe 是 AbstractUnsafe 类的 NIO 实现,它主要实现了 connect 操作
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
首先获取当前的连接状态进行缓存,然后发起连接操作,连接操作(doConnect)有三种可能产生的结果:
连接成功,返回true
通过 fulfillConnectPromise 触发 ChannelActive 事件
暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回 false
将 NioSocketChannel 中的 selectionKey 设置为 OP_CONNECT,继续监听连接应答消息。异步连接返回之后,需要判断连接结果,如果连接成功,则 触发 ChannelActive 事件
连接失败,直接抛出IO异常。
而 ChannelActive 事件最终会将 NioSocketChannel 中的 selectionKey 设置为 SelectionKey.OP_READ,用于监听网络读操作位
下图是连接操作的执行流程
NioByteUnsafe源码分析
@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
//如果首次调用,则先初始化
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
//通过接收缓冲区分配器的Handler计算获得下次预分配的缓冲区容量ByteBufCapacity
//紧接着根据缓冲区容量进行缓冲区分配
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
//接收缓冲区ByteBuf分配完成后,进行消息的异步读取
int localReadAmount = doReadBytes(byteBuf);
//完成消息的异步读取后,需要对本次读取的字节数进行判断,有以下三种可能:
//1.返回0,表示没有就绪的消息可读
//2.返回值大于0,读到了消息
//3.返回值-1,表示发生了I/O异常,读取失败
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
//完成一次异步读之后,就会触发一次ChannelRead事件
//但这里要特别提醒大家的是:完成一次读操作,并不意味着读到了一条完整的消息,
//因为TCP底层存在组包和粘包,所以,一次读操作可能包含多条消息,也可能是一条
//不完整的消息。因此不要把它跟读取的消息个数等同起来。在没有做任何半包处理的情况下,
//以ChannelRead的触发次数做计数器来进行性能分析和统计,是完全错误的。当然,
//如果你使用了半包解码器或者处理了半包,就能够实现一次ChannelRead对应一条完整的消息。
//触发和完成ChannelRead事件调用之后,将接收缓冲区释放。
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//因为一次读操作未必能够完成TCP缓冲区的全部读取工作,所以,读操作在循环体
//中进行,每次读取操作完成之后,会对读取的字节数进行累加
//在累加之前,需要对长度上限做保护,如果累计读取的字节数已经发生溢出,
//则将读取到的字节数设置为整型的最大值,然后退出循环。原因是本次循环已经读取过多
//的字节,需要退出,否则会影响后面排队的Task任务和写操作的执行。
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
//如果没有溢出,则执行累加操作。
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
}
//最后,对本次读取的字节数进行判断,如果小于缓冲区可写的容量,说明TCP缓冲区已经没有
//就绪的字节可读,读取操作已经完成,需要退出循环。如果仍然有未读的消息,则继续执行
//读操作。连续的读操作会阻塞排在后面的任务队列中待执行的Task,以及写操作,所以,
//要对连续读操作做上限控制,默认值为16次,无论TCP缓冲区有多少码流需要读取,只要连续
//16次没有读完,都需要强制退出,等待下次selector轮询周期再执行
while (++ messages < maxMessagesPerRead);
//完成多路复用器本轮读操作之后,触发ChannelReadComplete事件,随后调用接收缓冲区容量
//分配器的Hanlder的记录方法,将本次读取的总字节数传入到 record 方法中进行缓冲区的
//动态分配,为下一次读取选取更加合适的缓冲区容量。
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
//如果读到的返回值为-1,表明发生了1O异常,需要关闭连接,释放资源
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
RecvByteBufAllocator分析
RecvByteBufAllocator默认有两种实现,分别是 AdaptiveRecvByteBufAllocator 和 FixedRecvByteBufAllocator。下面介绍的是 AdaptiveRecvByteBufAllocator(缓冲区大小可以动态调整的 ByteBuf 分配器)。
成员变量
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
它分别定义了三个系统默认值:最小缓冲区长度64字节、初始容量1024字节、最大容量65536字节。还定义了两个动态调整容量时的步进参数:扩张的步进索引为4、收缩的步进索引为1。最后,定义了长度的向量表 SIZE_TABLE 并初始化它。
向量数组的每个值都对应一个 Buffer 容量,当容量小于512的时候,由于缓冲区已经比较小,需要降低步进值,容量每次下调的幅度要小些;当大于 512 时,说明需要解码的消息码流比较大,这时采用调大步进幅度的方式减少动态扩张的频率,所以它采用 512 的倍数进行扩张。
HandleImpl
private static final class HandleImpl implements Handle {
//向量表的最小索引
private final int minIndex;
//向量表的最大索引
private final int maxIndex;
//向量表的当前索引
private int index;
//下一次预分配的Buffer大小
private int nextReceiveBufferSize;
//是否立即执行容量收缩操作
private boolean decreaseNow;
//ByteBuf动态伸缩和扩张
@Override
public void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
}
使用动态缓冲区分配器的优点如下:
- Netty 作为一个通用的 NIO 框架,并不对用户的应用场景进行假设,可以使用它做流媒体传输,也可以用它做聊天工具。不同的应用场景,传输的码流大小千差万别,无论初始化分配的是 32KB 还是 1MB,都会随着应用场景的变化而变得不适应。因此,Netty 根据上次实际读取的码流大小对下次的接收 Buffer 缓冲区进行预测和调整,能够最大限度地满足不同行业的应用场景。
- 性能更高,容量过大会导致内存占用开销增加,后续的 Buffer 处理性能会下降容量过小时需要频繁地内存扩张来接收大的请求消息,同样会导致性能下降。
- 更节约内存。假如通常情况下请求消息平均值为1MB左右,接收缓冲区大小为1.2MB。
突然某个客户发送了一个10MB的流媒体附件,接收缓冲区扩张为10MB以接纳该附件,如果缓冲区不能收缩,每次缓冲区创建都会分配10MB的内存,但是后续所有的消息都是1MB左右,这样会导致内存的浪费,如果并发客户端过多,可能会发生内存溢出,最终宕机。