0%

Netty ChannelPipeline和ChannelHandler

Netty 的 ChannelPipeline 和 ChannelHandler 机制类似于 Servlet 和 Filter 过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。

Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 IO 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 对 IO 事件进行拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的 ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。

Netty逻辑架构

通信调度层 Reactor

它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 及其父类,NioSocketchanne / NioServerChannel 及其父类,ByteBuffer 以及由其衍生出来的各种 Buffer,Unsafe 以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等,将这些事件触发到 Pipeline 中,由 Pipeline 管理的职责链来进行后续的处理。

职责链 ChannelPipeline

它负责事件在职责链中的有序传播,同时负责动态地编排职责链。职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向后间向前传播事件。不同应用的 Handler 节点的功能也不同,通常情况下,往往会开发编解码 Handler 用于消息的编解码,它可以将外部的协议消息转换成内部的POJO对象,这样上层业务则只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。

业务逻辑编排层 ServiceChannelHandler

业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如CMPP协议,用于管理和中国移动短信系统的对接。

架构的不同层面,需要关心和处理的对象都不同,通常情况下,对于业务开发者,只需要关心职责链的拦截和业务 Handler 的编排。因为应用层协议栈往往是开发一次,到处运行,所以实际上对于业务开发者来说,只需要关心服务层的业务逻辑开发即可。各种应用协议以插件的形式提供,只有协议开发人员需要关注协议插件,对于其他业务开发人员来说,只需关心业务逻辑定制。这种分层的架构设计理念实现了 NIO 框架各层之间的解耦,便于上层业务协议栈的开发和业务逻辑的定制。

ChannelPipeline

ChannelPipeline 是 ChannelHandler 的容器,它负责 ChannelHandler 的管理和事件拦截与调度。其内部维护了一个 ChannelHandler 的链表和迭代器,方便进行管理。

ChannelPipeline的类继承关系图

ChannelPipeline的事件处理

上图展示了一个消息被 ChannelPipeline 的 ChannelHandler 链拦截和处理的全过程,消息的读取和发送处理全流程描述如下:

  1. 底层的 SocketChannelRead 方法读取 ByteBuf,触发 ChannelRead 事件,由 IO 线程 NioEventLoop 调用 ChannelPipeline 的fireChannelRead(Object msg)方法,将消息(ByteBuf)传输到 ChannelPipeline 中。
  2. 消息依次被 HeadHandler、ChannelHandler1、 ChannelHandler2…… TailHandler 拦截和处理,在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。
  3. 调用 ChannelHandlerContext 的 write 方法发送消息,消息从 TailHandler 开始,途经 ChannelHandlerN… ChannelHandler2、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的 Future 返回。

Netty 中的事件分为 inbound 事件和 outbound 事件。

inbound 事件通常由IO线程触发(上图左半部分)。触发 inbound 事件的方法如下:

  1. ChannelHandlerContext.fireChannelRegistered():Channel注册事件
  2. ChannelHandlerContext.fireChannelActive():TCP链路建立成功,Channel激活事件
  3. ChannelHandlerContext.fireChannelRead(Object):读事件
  4. ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件
  5. ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件
  6. ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件
  7. ChannelHandlerContext.fireChannelWritabilityChanged:Channel 的可写状态变化通知事件
  8. ChannelHandlerContext.fireChannellnactive():TCP连接关闭,链路不可用通知事件

outbound 事件通常是由用户主动发起的网络IO操作(上图右半部分)。触发 outbound 事件的方法如下:

  1. ChannelHandlerContext.bind(SocketAddres, ChannelPromise):绑定本地地址事件
  2. ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件
  3. ChannelHandlerContext.write(Object, Channel Promise):发送事件
  4. ChannelHandlerContext.flush():刷新事件
  5. ChannelHandlerContext.read():读事件
  6. ChannelHandlerContext.disconnect(Channel Promise):断开连接事件
  7. ChannelHandlerContext.close(Channel, Promise):关闭当前 Channel 事件

调用过程
在 TailHandler 触发 Outbound 事件,最终会调用到 HeadHandler 的 Outbound 事件,并由 Unsafe 执行对应的方法,Pipieline 仅仅负责事件的调度

ChannelPipeline的主要特性

ChannelPipeline 支持运行态动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护 ChannelHandler 添加到当前的 ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler 了。

ChannelPipeline 是线程安全的,这意味着N个业务线程可以并发地操作 ChannelPipeline 而不存在多线程并发问题。但是,ChannelHandler 却不是线程安全的,这意味着尽管 ChannelPipeline 是线程安全的,但是用户仍然需要自己保证 ChannelHandler 的线程安全。

自定义拦截器使用

通过继承ChannelHandlerAdapter类,覆盖自己关注的事件,实现事件的拦截和处理。

public class InboundHandler extends ChannelHandlerAdapter {

    // 拦截 ChannelActive事件,打印TCP链路建立成功日志
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("TCP CONNECTED!");
        ctx.fireChannelActive();
    }
}
public class OutboundHandler extends ChannelHandlerAdapter {

    // 拦截 close事件,在链路关闭时释放资源
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("TCP CLOSED!");
        //... Release resource
        ctx.close();
    }
}

ChannelHandler

ChannelHandler 类似于 Servlet 的 Filter 过滤器,负责对 l/O 事件或者 l/O 操作进行拦截和处理,它可以选择性地拦截和处理自已感兴趣的事件,也可以透传和终止事件的传递。

ChannelHandler支持注解,目前支持的注解有两种

  • Sharable:多个 ChannelPipeline 共用同一个 ChannelHandler
  • Skip:被Skip注解的方法不会被调用,直接被忽略

ChannelHandlerAdapter

对于大多数的 ChannelHandler 会选择性地拦截和处理某个或者某些事件,其他的事件会忽略,由下一个 ChannelHandler 进行拦截和处理。这就会导致一个问题:用户 ChannelHandler 必须要实现 ChannelHandler 的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的冗余和臃肿,代码的可维护性也会变差。

为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,它的所有接口实现都是事件透传,如果用户 ChannelHandler 关心某个事件,只需要覆盖 ChannelHandlerAdapter 对应的方法即可,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁和清晰。

Encoder和Decoder

处理TCP粘包/拆包的解码器

TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,它们是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

TCP粘包/拆包问题说明

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D11,第二次读取到了D1包的剩余内容D12和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第5种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

TCP粘包/拆包发生的原因

问题产生的原因有三个:

  1. 应用程序 write 写入的字节大小大于套接口发送缓冲区大小
  2. 进行 MSS 大小的 TCP 分段
  3. 以太网帧的 payload 大于 MTU 进行 IP 分片

粘包问题代码复现

代码实现

服务端代码

public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf request = (ByteBuf) msg;
        byte[] req = new byte[request.readableBytes()];
        request.readBytes(req);

        String result = new String(req, "UTF-8")
                .substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("The time server receiver order : " + result + "; the counter is : "+ ++counter);
        String resp = "Query".equalsIgnoreCase(result) ?
                new Date().toString() : "Bad Argument";
        resp += System.getProperty("line.separator");
        ByteBuf response = Unpooled.copiedBuffer(resp.getBytes());
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //释放资源
        ctx.close();
    }
}

客户端代码

public class TimeClientHandler extends ChannelHandlerAdapter {

    byte[] req;
    private int counter;

    public TimeClientHandler() {
        req = ("Query" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] resp = new byte[buf.readableBytes()];
        buf.readBytes(resp);
        String body = new String(resp, "UTF-8");
        System.out.println("Now is : " + body + " ;the counter is " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println(cause.getMessage());
    }
}

结果分析

理想结果

客户端发送100条Query命令,服务端响应100条时间结果

服务器接收了共计100个Query,但只收到了两个包(92,8)

The time server receiver order : Query
省略 90 个 Query...
Query; the counter is : 1
The time server receiver order : Query
省略 6 个 Query
Query; the counter is : 2

客户端只收到了一个包,可以看出服务端对两个请求包做出了响应(即Bad Argument)

Now is : Bad Argument
Bad Argument
 ;the counter is 1

粘包问题的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

  1. 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛
  2. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  3. 消息长度固定,累计读取到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息:将计数器置位,重新开始读取下一个数据报
  4. 通过在消息头中定义长度字段来标识消息的总长度

方案一 LineBasedFrameDecoder

对应解决TCP粘包问题的第一项方案:将回车换行符作为消息结束符

服务端配置

pipeline()
    //Arguments: 
    //1: 单条消息的最大长度(防止异常码流缺失分隔符导致的内存溢出)
    .addLast(new LineBasedFrameDecoder(1024)) // 以回车换行符作为消息结束符组装报文
    .addLast(new StringDecoder()) // 将二进制流的数据报转换为String类型
    .addLast(new ServerHandler()); // 业务逻辑处理Handler

LineBasedFrameDecoder的工作原理
依次遍历 ByteBuf 中的可读字节,判断看是否有\n或者\r\n,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String result = (String) msg; //接收到的直接是String类型字符串--(StringDecoder)
    ...
    //在消息结尾添加特殊标志符 -- \r\n
    //同理,客户端在发送数据时也要添加该标志符作为消息的结束
    result += System.getProperty("line.separator");
    ctx.writeAndFlush(response); //组织好业务数据向客户端传输
}

客户端配置

pipeline()
    .addLast(new LineBasedFrameDecoder(1024))
    .addLast(new StringDecoder())
    .addLast(new ClientHandler());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    ...
}

方案二 DelimiterBasedFrameDecoder

对应解决TCP粘包问题的第二项方案:将特殊的分隔符作为消息的结束标志

服务端以及客户端配置

//自定义的分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
pipeline()
    //Arguments: 
    //1: 单条消息的最大长度 2: 自定义的分隔符
    .addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

对应的Handler内容替换分隔符即可

System.getProperty("line.separator") -> "$_"

方案三 FixedLengthFrameDecoder

对应解决TCP粘包问题的第三项方案:消息长度固定

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码。

服务端以及客户端配置

pipeline().addLast(new FixedLengthFrameDecoder(20));

方案四 LengthFieldBasedFrameDecoder和LengthFieldPrepender

对应解决TCP粘包问题的第四项方案:通过在消息头中定义长度字段来标识消息的总长度

pipeline()
    .addLast(new LengthFieldBasedFrameDecoder(65536,0,2,0,2)) // 解码--长度可变流截取为报文
    .addLast(new MsgPackDecoder()) // 解码--比特流解析为POJO
    .addLast(new LengthFieldPrepender(2)) //编码--为比特流
    .addLast(new MsgPackEncoder()); //编码--POJO编码为比特流

LengthFieldPrepender编码器

@Sharable
public class LengthFieldPrepender extends MessageToMessageEncoder //后面会介绍

工作原理:计算当前待发送消息的二进制字节长度,将该长度添加到 ByteBuf 的缓冲区头

LengthFieldBasedFrameDecoder解码器

该解码器通过以下四个参数进行解码:

  • lengthFieldOffset 长度字段偏移量
  • lengthFieldLength 长度字段所占byte数
  • lengthAdjustment 长度内容调整量
  • initialBytesToStrip 舍去消息的前X个Byte

initialBytesToStrip舍弃一部分Message


lengthAdjustment修正读取内容的长度

lengthFieldOffset修正读取长度字段的偏移量

通过这四个参数的灵活组合可以达到不同的解码效果