0%

Netty 私有协议栈开发

对自定义协议栈的网络拓扑图、通信模型、消息定义、链路的建立关闭、可靠性设计、安全性设计、可扩展性设计七个方面进行介绍,并通过 Netty 实现一套相应的简单的的通信协议,加强对网络传输和 Netty 的了解。

私有协议介绍

私有协议本质上是厂商内部发展和采用的标准,除非授权,其他厂商一般无权使用该协议。私有协议也称非标准协议,就是未经国际或国家标准化组织采纳或批准,由某个企业自己制订,协议实现细节不愿公开,只在企业自己生产的设备之间使用的协议。私有协议具有封闭性、垄断性、排他性等特点。如果网上大量存在私有(非标准)协议,现行网络或用户一旦使用了它,后进入的厂家设备就必须跟着使用这种非标准协议,才能够互连互通,否则根本不可能进入现行网络。这样,使用非标准协议的厂家就实现了垄断市场的愿望。
尽管私有协议具有垄断性的特征,但并非所有的私有协议设计者的初衷就是为了垄断。由于现代软件系统的复杂性,一个大型软件系统往往会被人为地拆分成多个模块,另外随着移动互联网的兴起,网站的规模也越来越大,业务的功能越来越多,为了能够支撑业务的发展,往往需要集群和分布式部署,这样,各个模块之间就要进行跨节点通信。

在传统的Java应用中,通常使用以下4种方式进行跨节点通信

  1. 通过RMI进行远程服务调用
  2. 通过 Java 的 Socket+Java 序列化的方式进行跨节点调用
  3. 利用一些开源的RPC框架进行远程服务调用,例如 Facebook 的 Thrift、 Apache 的 Avro 等
  4. 利用标准的公有协议进行跨节点服务调用,例如 HTTP+XML、RESTful+JSON或者 WebService

跨节点的远程服务调用,除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令,例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起之后,就会形成私有协议。

事实上,私有协议并没有标准的定义,只要是能够用于跨进程、跨主机数据交换的非标准协议,都可以称为私有协议。通常情况下,正规的私有协议都有具体的协议规范文档,类似于《XXXX协议VXX规范》,但是在实际的项目中,内部使用的私有协议往往是口头约定的规范,由于并不需要对外呈现或者被外部调用,所以一般不会单独写相关的内部私有协议规范文档。

自定义协议栈功能设计

Netty 协议栈用于内部各模块之间的通信,它基于 TCP/IP 协议栈,是一个类 HTTP 协议的应用层协议栈,相比于传统的标准协议栈,它更加轻巧、灵活和实用。

网络拓扑图

在分布式组网环境下,每个 Netty 节点(Netty 进程)之间建立长连接,使用 Netty 协议进行通信。Netty 节点并没有服务端和客户端的区分,谁首先发起连接,谁就作为客户端,另一方自然就成为服务端。一个 Netty 节点既可以作为客户端连接另外的 Netty 节点,也可以作为 Netty 服务端被其他 Netty 节点连接,这完全取决于使用者的业务场景和具体的业务组网。

协议栈功能描述

Netty 协议栈承载了业务内部各模块之间的消息交互和服务调用,它的主要功能如下。

  1. 基于 Netty 的 NlO 通信框架,提供高性能的异步通信能力
  2. 提供消息的编解码框架,可以实现 POJO 的序列化和反序列化
  3. 提供基于 IP 地址的白名单接入认证机制
  4. 链路的有效性校验机制
  5. 链路的断连重连机制

通信模型

具体步骤如下:

  1. Netty 协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息
  2. Netty 协议栈服务端对握手请求消息进行合法性校验,包括节点 ID 有效性校验、节点重复登录校验和 IP 地址合法性校验,校验通过后,返回登录成功的握手应答消息
  3. 链路建立成功之后,客户端发送业务消息
  4. 链路建立成功之后,服务端发送心跳消息
  5. 链路建立成功之后,客户端发送心跳消息
  6. 链路建立成功之后,服务端发送业务消息
  7. 服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接

备注:需要指出的是,Netty 协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息給对方,通信方式可以是 TWO WAY 或者 ONE WAY。双方之间的心跳采用 Ping-Pong 机制,当链路处于空闲状态时,客户端主动发送 Ping 消息給服务端,服务端接收到 Ping 消息后发送应答消息 Pong 给客户端,如果客户端连续发送 N 条 Ping 消息都没有接收到服务端返回的 Pong 消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期 T 后发起重连操作,直到重连成功。

消息定义

Netty 协议栈消息定义包含两部分:消息头、消息体

下图为Netty协议消息头定义

Netty协议支持的字段类型

Netty协议的编解码规范

1. Netty协议的编码

  1. crcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价

  2. length:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价

  3. sessionid:java.nio.ByteBuffer.putLong(long value),如果采用其他缓冲区实现必须与其等价

  4. type:java.nio.ByteBuffer.put(byte b),如果采用其他缓冲区实现,必须与其等价

  5. priority:java.nio.ByteBuffer.put(byte b),如果釆用其他缓冲区实现,必须与其等价

  6. attachment:它的编码规则为——如果 attachment 长度为0,表示没有可选附件,则将长度编码设为0。如果大于0,说明有附件需要编码,具体的编码规则:首先对附件的个数进行编码,然后对 Key 和 Value 进行编码,先编码长度,再将它转换成 byte 数组之后的编码内容

  7. body的编码:通过编解码器将其序列化为byte数组,然后将其写入 ByteBuffer 缓冲区中。

    由于整个消息的长度必须等全部字段都编码完成之后才能确认,所以最后需要更新消息头中的 length 字段,将其重新写入 ByteBuffer 中。

2. Netty协议的解码

相对于 Netty Message 的编码,给出 Netty 协议的解码规范

  1. crcCode:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要与其等价
  2. length:通过java.nio.ByteBuffer.getInt()获取Nety消息的长度,其他缓冲区需要与其等价
  3. sessionId:通过java.nio.ByteBuffer.putLong(long value)获取会话ID,其他缓冲区实现必须与其等价
  4. type:通过java.nio.ByteBuffer.get()获取消息类型,其他缓冲区需要与其等价
  5. priority:通过java.nio.ByteBuffer.get()获取消息优先级,其他缓冲区需要与其等价
  6. attachment:它的解码规则为——首先创建一个新的 attachment 对象,调用java.nio.ByteBuffer.getInt()获取附件的长度,如果为0,说明附件为空,解码结束,继续解消息体;如果非空,则根据长度通过for循环进行解码。
  7. body:通过解码器进行解码

链路的建立

Netty 协议栈支持服务端和客户端,对于使用 Netty 协议栈的应用程序而言,不需要刻意区分到底是客户端还是服务端,在分布式组网环境中,一个节点可能既是服务端也是客户端,这个依据具体的用户场景而定。

Netty 协议栈对客户端的说明如下:如果A节点需要调用B节点的服务,但是A和B之间还没有建立物理链路,则由调用方主动发起连接,此时,调用方为客户端,被调用方为服务端。

考虑到安全,链路建立需要通过基于IP地址或者号段的黑白名单安全认证机制,作为样例,本协议使用基于IP地址的安全认证,如果有多个IP,通过逗号进行分割。在实际商用项目中,安全认证机制会更加严格,例如通过密钥对用户名和密码进行安全认证。

客户端与服务端链路建立成功之后,由客户端发送握手请求消息,握手请求消息的定义如下。

  1. 消息头的type字段值为3

  2. 可选附件为个数为0

  3. 消息体为空

  4. 握手消息的长度为22个字节

    服务端接收到客户端的握手请求消息之后,如果IP校验通过,返回握手成功应答消息给客户端,应用层链路建立成功。握手应答消息定义如下

  5. 消息头的type字段值为4

  6. 可选附件个数为0

  7. 消息体为byte类型的结果,“0”表示认证成功:“-1”表示认证失败。

链路的关闭

由于采用长连接通信,在正常的业务运行期间,双方通过心跳和业务消息维持链路,任何一方都不需要主动关闭连接。
但是,在以下情况下,客户端和服务端需要关闭连接。

  1. 当对方宕机或者重启时,会主动关闭链路,另一方读取到操作系统的通知信号,得知对方REST链路,需要关闭连接,释放自身的句柄等资源。由于采用TCP全双工通信,通信双方都需要关闭连接,释放资源
  2. 消息读写过程中,发生了IO异常,需要主动关闭连接
  3. 心跳消息读写过程中发生了IO异常,需要主动关闭连接
  4. 心跳超时,需要主动关闭连接
  5. 发生编码异常等不可恢复错误时,需要主动关闭连接。

可靠性设计

Netty 协议栈可能会运行在非常恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况都有可能发生。为了保证在这些极端异常场景下 Netty 协议栈仍能够正常工作或者自动恢复,需要对它的可靠性进行统一规划和设计。

1. 心跳机制

在凌晨等业务低谷期时段,如果发生网络闪断、连接被Hang住等网络问题时,由于没有业务消息,应用进程很难发现。到了白天业务高峰期时,会发生大量的网络通信失败,严重的会导致一段时间进程内无法处理业务消息。为了解决这个问题,在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路,主动重连。

具体的设计思路如下

  1. 当网络处于空闲状态持续时间达到T(连续周期T没有读写消息)时,客户端主动发送Ping心跳消息给服务端。
  2. 如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息,则心跳失败计数器加1。
  3. 每当客户端接收到服务的业务消息或者Pong应答消息时,将心跳失败计数器清零:连续N次没有接收到服务端的Pong消息或者业务消息,则关闭链路,间隔 INTERVAL 时间后发起重连操作
  4. 服务端网络空闲状态持续时间达到T后,服务端将心跳失败计数器加1;只要接收到客户端发送的Ping消息或者其他业务消息,计数器清零
  5. 服务端连续N次没有接收到客户端的Ping消息或者其他业务消息,则关闭链路,释放资源,等待客户端重连。

通过 Ping-Pong 双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被及时地检测出来。为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检测都失败才认定链路已经损害,需要关闭链路并重建链路。

当读或者写心跳消息发生IO异常的时候,说明链路已经中断,此时需要立即关闭链路,如果是客户端,需要重新发起连接。如果是服务端,需要淸空缓存的半包信息,等待客户端重连。

2. 重连机制

如果链路中断,等待 INTERVAL 时间后,由客户端发起重连操作,如果重连失败,间隔周期 INTERVAL 后再次发起重连,直到重连成功。

为了保证服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后就立即重连。

为了保证句柄资源能够及时释放,无论什么场景下的重连失败,客户端都必须保证自身的资源被及时释放,包括但不限于 SocketChannel、 Socket等

重连失败后,需要打印异常堆栈信息,方便后续的问题定位

3. 重复登陆保护

当客户端握手成功之后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致句柄资源被耗尽。

服务端接收到客户端的握手请求消息之后,首先对IP地址进行合法性检验,如果校验成功,在缓存的地址表中查看客户端是否已经登录,如果已经登录,则拒绝重复登录,返回错误码-1,同时关闭TCP链路,并在服务端的日志中打印握手失败的原因。

客户端接收到握手失败的应答消息之后,关闭客户端的TCP连接,等待 INTERVAL 时间之后,再次发起TCP连接,直到认证成功。

为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功的问题,当服务端连续N次心跳超时之后需要主动关闭链路,清空该客户端的地址缓存信息,以保证后续该客户端可以重连成功,防止被重复登录保护机制拒绝掉。

4. 消息缓存重发

无论客户端还是服务端,当发生链路中断之后,在链路恢复之前,缓存在消息队列中待发送的消息不能丢失,等链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失。

考虑到内存溢出的风险,建议消息缓存队列设置上限,当达到上限之后,应该拒绝继续向该队列添加新的消息。

安全性设计

安全性设计为了保证整个集群环境的安全,内部长连接采用基于IP地址的安全认证机制,服务端对握手请求消息的IP地址进行合法性校验:如果在白名单之内,则校验通过:否则,拒绝对方连接。

如果将 Netty协议栈放到公网中使用,需要采用更加严格的安全认证机制,例如基于密钥和AES加密的用户名+密码认证机制,也可以采用SSL/TSL安全传输。

作为示例程序, Netty协议栈采用最简单的基于IP地址的白名单安全认证机制。

可扩展性设计

Netty 协议需要具备一定的扩展能力,业务可以在消息头中自定义业务域字段,例如消息流水号、业务自定义消息头等。通过 Netty 消息头中的可选附件 attachment 字段,业务可以方便地进行自定义扩展。

Netty 协议栈架构需要具备一定的扩展能力,例如统一的消息拦截、接口日志、安全、加解密等可以被方便地添加和删除,不需要修改之前的逻辑代码,类似 Servlet 的 FilterChain 和 AOP,但考虑到性能因素,不推荐通过 AOP 来实现功能的扩展。

Netty协议栈开发

数据结构定义

/**
 * Netty 协议栈消息体
 * 业务消息、心跳消息、握手请求/应答消息都由 NettyMessage 承载
 */
public final class NettyMessage {
    //请求头
    private Header header;
    //请求体
    private Object body;

    //省略 Getter Setter ToString 方法 
}
/**
 * Netty 协议栈消息头
 */
public class Header {
    /** 校验码 */
    private int crcCode;
    /** 消息长度 */
    private int length;
    /** 会话ID */
    private long sessionID;
    /** 消息类型 */
    private byte type;
    /** 消息优先级 */
    private byte priority;
    /** 附件 */
    private Map<String, Object> attachment;

    public Header() {
        this.attachment = new HashMap<String, Object>();
    }

    //省略 Getter Setter ToString 方法 
}
/**
 * 常规Message生成器,全部种类的信息都由NettyMessage承载
 */
public class MessageBuilder {
    public static NettyMessage loginResp(String result) {
        NettyMessage message = new NettyMessage();
        message.getHeader().setType(MessageType.LOGIN_RESP);
        message.setBody(result);
        return message;
    }

    public static NettyMessage loginReq() {
        NettyMessage message = new NettyMessage();
        message.getHeader().setType(MessageType.LOGIN_REQ);
        return message;
    }

    public static NettyMessage heartbeatReq() {
        NettyMessage message = new NettyMessage();
        message.getHeader().setType(MessageType.HEARTBEAT_REQ);
        return message;
    }

    public static NettyMessage heartbeatResp() {
        NettyMessage message = new NettyMessage();
        message.getHeader().setType(MessageType.HEARTBEAT_RESP);
        return message;
    }
}

常量类型定义

public interface Configuration {
    String REMOTE_IP = "127.0.0.1";
    int REMOTE_PORT = 8080;
    String LOCAL_IP = "127.0.0.1";
    int LOCAL_PORT = 8081;
}
public interface MessageType {
    byte LOGIN_REQ = (byte) 1;
    byte LOGIN_RESP = (byte) 2;
    byte HEARTBEAT_REQ = (byte) 3;
    byte HEARTBEAT_RESP = (byte) 4;
}

消息编解码

public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
        if (msg == null || msg.getHeader() == null)
            throw new Exception("The encode message is null");
        int length = 22;
        out.writeInt(msg.getHeader().getCrcCode());
        out.writeInt(msg.getHeader().getLength());
        out.writeLong(msg.getHeader().getSessionID());
        out.writeByte(msg.getHeader().getType());
        out.writeByte(msg.getHeader().getPriority());
        out.writeInt(msg.getHeader().getAttachment().size());

        //Attachment
        for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) {
            String key = param.getKey();
            byte[] keyArray = key.getBytes();
            out.writeInt(keyArray.length);
            out.writeBytes(keyArray);
            byte[] valueArray = EDUtil.encodeObject(param.getValue());
            out.writeInt(valueArray.length);
            out.writeBytes(valueArray);
            length += 8 + keyArray.length + valueArray.length;
        }
        //Body
        if (msg.getBody() != null) {
            byte[] bodyArray = EDUtil.encodeObject(msg.getBody());
            out.writeInt(bodyArray.length);
            out.writeBytes(bodyArray);
            length += 4 + bodyArray.length;
        }
        out.setInt(4, length);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}
public class NettyMessageDecoder extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf revBuf = (ByteBuf) msg;
        NettyMessage message = new NettyMessage();
        message.getHeader().setCrcCode(revBuf.readInt());
        message.getHeader().setLength(revBuf.readInt());
        message.getHeader().setSessionID(revBuf.readLong());
        message.getHeader().setType(revBuf.readByte());
        message.getHeader().setPriority(revBuf.readByte());
        //Attachment
        for (int i = 0; i < revBuf.readInt(); i++) {
            byte[] key = new byte[revBuf.readInt()];
            revBuf.readBytes(key);
            byte[] value = new byte[revBuf.readInt()];
            revBuf.readBytes(value);
            message.getHeader().getAttachment().put(
                    new String(key),
                    EDUtil.decodeObject(value)
            );
        }
        //Body
        if (revBuf.readableBytes() > 0) {
            byte[] body = new byte[revBuf.readInt()];
            revBuf.readBytes(body);
            message.setBody(EDUtil.decodeObject(body));
        }
        System.out.println(message);
        ctx.fireChannelRead(message);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

编解码POJO使用的工具类

public class EDUtil {

    /**
     * 对象转数组
     */
    public static byte[] encodeObject(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bytes = bos.toByteArray();
            oos.close();
            bos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        return bytes;
    }

    /**
     * 数组转对象
     */
    public static Object decodeObject(byte[] bytes) {
        Object obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bis);
            obj = ois.readObject();
            ois.close();
            bis.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}

握手和安全认证

握手的发起是在客户端和服务端TCP链路建立成功通道激活时,握手消息的接入和安全认证在服务端处理。握手认证客户端,用于在通道激活时发起握手请求。

public class LoginAuthReqHandler extends ChannelHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(MessageBuilder.loginReq());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;

        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP) {
            String result = (String) message.getBody();
            if (!"Connection allowed".equals(result)) {
                //握手失败,关闭连接
                ctx.close();
            } else {
                System.out.println(result);
                ctx.fireChannelRead(msg);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}
public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_REQ) {
            System.out.println("Receive client heart beat message: " + message);
            NettyMessage heartbeat = MessageBuilder.heartbeatResp();
            System.out.println("Send heart beat response message to client: " + heartbeat);
            ctx.writeAndFlush(heartbeat);
        }else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

心跳检测机制

握手成功之后,由客户端主动发送心跳消息,服务端接收到心跳消息之后,返回心跳应答消息。由于心跳消息的目的是为了检测链路的可用性,因此不需要携带消息体。

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private volatile ScheduledFuture<?> heartbeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP) {
            heartbeat = ctx.executor().scheduleAtFixedRate(
                    new HeartBeatTask(ctx), 0, 10000, TimeUnit.MILLISECONDS
            );
        } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP) {
            System.out.println("Client send heart beat message to server: " + heartbeat);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }

    private class HeartBeatTask implements Runnable {

        private final ChannelHandlerContext ctx;

        private HeartBeatTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        public void run() {
            NettyMessage message = MessageBuilder.heartbeatReq();
            System.out.println("Client send heart beat message to server: " + heartbeat);
            ctx.writeAndFlush(message);
        }
    }
}
public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_REQ) {
            System.out.println("Receive client heart beat message: " + message);
            NettyMessage heartbeat = MessageBuilder.heartbeatResp();
            System.out.println("Send heart beat response message to client: " + heartbeat);
            ctx.writeAndFlush(heartbeat);
        }else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

服务端的心跳 Handle r非常简单,接收到心跳请求消息之后,构造心跳应答消息返回,并打印接收和发送的心跳消息。

心跳超时的实现非常简单,直接利用 Netty 的 ReadTimeoutHandler 机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接;如果是服务端,释放资源,清除客户端登录缓存信息,等待服务端重连。

断连重连

当客户端感知断连事件之后,释放资源,重新发起连接。

首先监听网络断连事件,如果 Channel 关闭,则执行后续的重连任务,通过 Bootstrap 重新发起连接,客户端挂在 closeFuture 上监听链路关闭信号,一旦关闭,则创建重连定时器,10s 之后重新发起连接,直到重连成功。

executor.execute(() -> {
    try {
        TimeUnit.SECONDS.sleep(10);
        connect(Configuration.REMOTE_IP, Configuration.REMOTE_PORT);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

服务端感知到断连事件之后,需要清空缓存的登录认证注册信息,以保证后续客户端能够正常重连。

服务端与客户端启动器

public class NettyServer {
    public static void main(String[] args) {
        new NettyServer().bind(Configuration.REMOTE_PORT);
    }

    private void bind(int port) {
        //创建两个NIO线程组(Reactor线程组),用于网络事件的处理
        //用于服务端接受客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //用于SocketChannel网络读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //Netty用于启动NIO服务端的辅助启动类,目的是降低服务端开发复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //IO事件处理类
            bootstrap.childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture f = bootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放跟shutdownGracefully相关的资源
//            bossGroup.shutdownGracefully();
//            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer {
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline()
                    .addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 4, 4, -8, 0))
                    .addLast(new NettyMessageDecoder())
                    .addLast(new NettyMessageEncoder())
                    .addLast(new ReadTimeoutHandler(30))
                    .addLast(new LoginAuthRespHandler())
                    .addLast(new HeartBeatRespHandler());
        }
    }
}
public class NettyClient {
    public static void main(String[] args) {
        new NettyClient().connect(Configuration.REMOTE_IP, Configuration.REMOTE_PORT);
    }

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    private void connect(String host, int port) {
        System.out.println("Connect host:" + host + " port:" + port);
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //客户端辅助启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelHandler());
            //connect发起异步连接操作 sync同步方法等待连接成功
            ChannelFuture f = bootstrap.connect(host, port).sync();
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    connect(Configuration.REMOTE_IP, Configuration.REMOTE_PORT);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    class ChannelHandler extends ChannelInitializer {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
                    .addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 4, 4, -8, 0))
                    .addLast(new NettyMessageDecoder())
                    .addLast(new NettyMessageEncoder())
                    .addLast(new ReadTimeoutHandler(30))
                    .addLast(new LoginAuthReqHandler())
                    .addLast(new HeartBeatReqHandler());
        }
    }
}

运行协议栈

正常场景

待服务端启动成功后,启动客户端,成功建立链路后,双方互发心跳。

异常场景:服务器宕机重启

假设服务端宕机一段时间重启,检验如下功能是否正常。

  1. 客户端是否能够正常发起重连
  2. 重连成功之后,不再重连
  3. 断连期间,心跳定时器停止工作,不再发送心跳请求消息
  4. 服务端重启成功之后,允许客户端重新登录
  5. 服务端重启成功之后,客户端能够重连和握手成功
  6. 重连成功之后,双方的心跳能够正常互发
  7. 性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏(堆内存指标、线程资源指标)

异常场景:客户端宕机重启

客户端宕机重启之后,服务端需要能够清除缓存信息,允许客户端重新登录。