Netty 的 ChannelPipeline 和 ChannelHandler 机制类似于 Servlet 和 Filter 过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。
Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 IO 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 对 IO 事件进行拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的 ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。
Netty逻辑架构
通信调度层 Reactor
它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 及其父类,NioSocketchanne / NioServerChannel 及其父类,ByteBuffer 以及由其衍生出来的各种 Buffer,Unsafe 以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等,将这些事件触发到 Pipeline 中,由 Pipeline 管理的职责链来进行后续的处理。
职责链 ChannelPipeline
它负责事件在职责链中的有序传播,同时负责动态地编排职责链。职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向后间向前传播事件。不同应用的 Handler 节点的功能也不同,通常情况下,往往会开发编解码 Handler 用于消息的编解码,它可以将外部的协议消息转换成内部的POJO对象,这样上层业务则只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。
业务逻辑编排层 ServiceChannelHandler
业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如CMPP协议,用于管理和中国移动短信系统的对接。
架构的不同层面,需要关心和处理的对象都不同,通常情况下,对于业务开发者,只需要关心职责链的拦截和业务 Handler 的编排。因为应用层协议栈往往是开发一次,到处运行,所以实际上对于业务开发者来说,只需要关心服务层的业务逻辑开发即可。各种应用协议以插件的形式提供,只有协议开发人员需要关注协议插件,对于其他业务开发人员来说,只需关心业务逻辑定制。这种分层的架构设计理念实现了 NIO 框架各层之间的解耦,便于上层业务协议栈的开发和业务逻辑的定制。
ChannelPipeline
ChannelPipeline 是 ChannelHandler 的容器,它负责 ChannelHandler 的管理和事件拦截与调度。其内部维护了一个 ChannelHandler 的链表和迭代器,方便进行管理。
ChannelPipeline的类继承关系图
ChannelPipeline的事件处理
上图展示了一个消息被 ChannelPipeline 的 ChannelHandler 链拦截和处理的全过程,消息的读取和发送处理全流程描述如下:
- 底层的 SocketChannelRead 方法读取 ByteBuf,触发 ChannelRead 事件,由 IO 线程 NioEventLoop 调用 ChannelPipeline 的
fireChannelRead(Object msg)
方法,将消息(ByteBuf)传输到 ChannelPipeline 中。 - 消息依次被 HeadHandler、ChannelHandler1、 ChannelHandler2…… TailHandler 拦截和处理,在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。
- 调用 ChannelHandlerContext 的 write 方法发送消息,消息从 TailHandler 开始,途经 ChannelHandlerN… ChannelHandler2、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的 Future 返回。
Netty 中的事件分为 inbound 事件和 outbound 事件。
inbound 事件通常由IO线程触发(上图左半部分)。触发 inbound 事件的方法如下:
ChannelHandlerContext.fireChannelRegistered()
:Channel注册事件ChannelHandlerContext.fireChannelActive()
:TCP链路建立成功,Channel激活事件ChannelHandlerContext.fireChannelRead(Object)
:读事件ChannelHandlerContext.fireChannelReadComplete()
:读操作完成通知事件ChannelHandlerContext.fireExceptionCaught(Throwable)
:异常通知事件ChannelHandlerContext.fireUserEventTriggered(Object)
:用户自定义事件ChannelHandlerContext.fireChannelWritabilityChanged
:Channel 的可写状态变化通知事件ChannelHandlerContext.fireChannellnactive()
:TCP连接关闭,链路不可用通知事件
outbound 事件通常是由用户主动发起的网络IO操作(上图右半部分)。触发 outbound 事件的方法如下:
ChannelHandlerContext.bind(SocketAddres, ChannelPromise)
:绑定本地地址事件ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
:连接服务端事件ChannelHandlerContext.write(Object, Channel Promise)
:发送事件ChannelHandlerContext.flush()
:刷新事件ChannelHandlerContext.read()
:读事件ChannelHandlerContext.disconnect(Channel Promise)
:断开连接事件ChannelHandlerContext.close(Channel, Promise)
:关闭当前 Channel 事件
调用过程
在 TailHandler 触发 Outbound 事件,最终会调用到 HeadHandler 的 Outbound 事件,并由 Unsafe 执行对应的方法,Pipieline 仅仅负责事件的调度
ChannelPipeline的主要特性
ChannelPipeline 支持运行态动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护 ChannelHandler 添加到当前的 ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler 了。
ChannelPipeline 是线程安全的,这意味着N个业务线程可以并发地操作 ChannelPipeline 而不存在多线程并发问题。但是,ChannelHandler 却不是线程安全的,这意味着尽管 ChannelPipeline 是线程安全的,但是用户仍然需要自己保证 ChannelHandler 的线程安全。
自定义拦截器使用
通过继承ChannelHandlerAdapter类,覆盖自己关注的事件,实现事件的拦截和处理。
public class InboundHandler extends ChannelHandlerAdapter {
// 拦截 ChannelActive事件,打印TCP链路建立成功日志
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("TCP CONNECTED!");
ctx.fireChannelActive();
}
}
public class OutboundHandler extends ChannelHandlerAdapter {
// 拦截 close事件,在链路关闭时释放资源
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("TCP CLOSED!");
//... Release resource
ctx.close();
}
}
ChannelHandler
ChannelHandler 类似于 Servlet 的 Filter 过滤器,负责对 l/O 事件或者 l/O 操作进行拦截和处理,它可以选择性地拦截和处理自已感兴趣的事件,也可以透传和终止事件的传递。
ChannelHandler支持注解,目前支持的注解有两种
- Sharable:多个 ChannelPipeline 共用同一个 ChannelHandler
- Skip:被Skip注解的方法不会被调用,直接被忽略
ChannelHandlerAdapter
对于大多数的 ChannelHandler 会选择性地拦截和处理某个或者某些事件,其他的事件会忽略,由下一个 ChannelHandler 进行拦截和处理。这就会导致一个问题:用户 ChannelHandler 必须要实现 ChannelHandler 的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的冗余和臃肿,代码的可维护性也会变差。
为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,它的所有接口实现都是事件透传,如果用户 ChannelHandler 关心某个事件,只需要覆盖 ChannelHandlerAdapter 对应的方法即可,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁和清晰。
Encoder和Decoder
处理TCP粘包/拆包的解码器
TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,它们是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
TCP粘包/拆包问题说明
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
- 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
- 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包
- 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
- 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D11,第二次读取到了D1包的剩余内容D12和D2包的整包。
如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。
TCP粘包/拆包发生的原因
问题产生的原因有三个:
- 应用程序 write 写入的字节大小大于套接口发送缓冲区大小
- 进行 MSS 大小的 TCP 分段
- 以太网帧的 payload 大于 MTU 进行 IP 分片
粘包问题代码复现
代码实现
服务端代码
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf request = (ByteBuf) msg;
byte[] req = new byte[request.readableBytes()];
request.readBytes(req);
String result = new String(req, "UTF-8")
.substring(0, req.length - System.getProperty("line.separator").length());
System.out.println("The time server receiver order : " + result + "; the counter is : "+ ++counter);
String resp = "Query".equalsIgnoreCase(result) ?
new Date().toString() : "Bad Argument";
resp += System.getProperty("line.separator");
ByteBuf response = Unpooled.copiedBuffer(resp.getBytes());
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
客户端代码
public class TimeClientHandler extends ChannelHandlerAdapter {
byte[] req;
private int counter;
public TimeClientHandler() {
req = ("Query" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] resp = new byte[buf.readableBytes()];
buf.readBytes(resp);
String body = new String(resp, "UTF-8");
System.out.println("Now is : " + body + " ;the counter is " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println(cause.getMessage());
}
}
结果分析
理想结果
客户端发送100条Query命令,服务端响应100条时间结果
服务器接收了共计100个Query,但只收到了两个包(92,8)
The time server receiver order : Query
省略 90 个 Query...
Query; the counter is : 1
The time server receiver order : Query
省略 6 个 Query
Query; the counter is : 2
客户端只收到了一个包,可以看出服务端对两个请求包做出了响应(即Bad Argument)
Now is : Bad Argument
Bad Argument
;the counter is 1
粘包问题的解决策略
由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
- 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛
- 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
- 消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息:将计数器置位,重新开始读取下一个数据报
- 通过在消息头中定义长度字段来标识消息的总长度
方案一 LineBasedFrameDecoder
对应解决TCP粘包问题的第一项方案:将回车换行符作为消息结束符
服务端配置
pipeline()
//Arguments:
//1: 单条消息的最大长度(防止异常码流缺失分隔符导致的内存溢出)
.addLast(new LineBasedFrameDecoder(1024)) // 以回车换行符作为消息结束符组装报文
.addLast(new StringDecoder()) // 将二进制流的数据报转换为String类型
.addLast(new ServerHandler()); // 业务逻辑处理Handler
LineBasedFrameDecoder的工作原理
依次遍历 ByteBuf 中的可读字节,判断看是否有\n
或者\r\n
,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String result = (String) msg; //接收到的直接是String类型字符串--(StringDecoder)
...
//在消息结尾添加特殊标志符 -- \r\n
//同理,客户端在发送数据时也要添加该标志符作为消息的结束
result += System.getProperty("line.separator");
ctx.writeAndFlush(response); //组织好业务数据向客户端传输
}
客户端配置
pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new ClientHandler());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
...
}
方案二 DelimiterBasedFrameDecoder
对应解决TCP粘包问题的第二项方案:将特殊的分隔符作为消息的结束标志
服务端以及客户端配置
//自定义的分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
pipeline()
//Arguments:
//1: 单条消息的最大长度 2: 自定义的分隔符
.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
对应的Handler内容替换分隔符即可
System.getProperty("line.separator") -> "$_"
方案三 FixedLengthFrameDecoder
对应解决TCP粘包问题的第三项方案:消息长度固定
FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码。
服务端以及客户端配置
pipeline().addLast(new FixedLengthFrameDecoder(20));
方案四 LengthFieldBasedFrameDecoder和LengthFieldPrepender
对应解决TCP粘包问题的第四项方案:通过在消息头中定义长度字段来标识消息的总长度
pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65536,0,2,0,2)) // 解码--长度可变流截取为报文
.addLast(new MsgPackDecoder()) // 解码--比特流解析为POJO
.addLast(new LengthFieldPrepender(2)) //编码--为比特流
.addLast(new MsgPackEncoder()); //编码--POJO编码为比特流
LengthFieldPrepender编码器
@Sharable
public class LengthFieldPrepender extends MessageToMessageEncoder //后面会介绍
工作原理:计算当前待发送消息的二进制字节长度,将该长度添加到 ByteBuf 的缓冲区头
LengthFieldBasedFrameDecoder解码器
该解码器通过以下四个参数进行解码:
- lengthFieldOffset 长度字段偏移量
- lengthFieldLength 长度字段所占byte数
- lengthAdjustment 长度内容调整量
- initialBytesToStrip 舍去消息的前X个Byte
initialBytesToStrip
舍弃一部分Message
lengthAdjustment
修正读取内容的长度
lengthFieldOffset
修正读取长度字段的偏移量
通过这四个参数的灵活组合可以达到不同的解码效果