0%

RPC gRPC异常处理流程设计

封装 gRPC 调用异常处理的流程。

gRPC 内部拥有简单的异常传递机制,通过以下方式以客户端可感知的方式生成异常。

// 实现在 proto 文件定义的 rpc 接口
private class GreeterImpl extends GreeterGrpc.GreeterImplBase {

  @Override
  public void sayHello(URB.HelloRequest req, StreamObserver<URB.HelloReply> responseObserver){
    StatusRuntimeException e = Status.INTERNAL
                                     .withCause(new RuntimeException())
                                     .withDescription("This is a description")
                                     .asRuntimeException(new Metadata())
    responseObserver.onError(e);
  }
}

可以看到 onError 可以传递四种信息:

  1. 代表该异常类型的 Status
  2. 代表抛出异常的 Cause
  3. 代表对异常的描述描述信息 Description
  4. 代表附加信息的元数据 Metadata

但是在客户端只能接收到 Status、Description 和 Metadata 三种信息,丢失了异常堆栈信息。

此时,对于业务来说有以下两个问题:

  1. Status 应用于 gRPC 内部,但对于业务消费来说,有可能对于异常状态有着层级和粒度更加细致的要求。比如业务的一级异常为参数错误,其还有下设的二级异常为整型溢出等等。
  2. Cause 是不会传递给客户端的,因此客户端并不能感知到哪里出问题了。是中间件报错?还是业务抛出的错误提示信息?如果对每一种异常(Exception)进行区分,在大多数业务系统中都会变成一种灾难。

针对上述两个问题,我们需要从两方面着手解决:

  1. 分门别类收敛繁杂的业务异常
  2. 将业务异常提示信息传递给客户端
  3. (可选)将异常堆栈传递给客户端

对于将业务异常提示信息传递给客户端也有两种方式:

  1. 文本信息传递的方式
  2. 抛异常传递的方式

收敛业务异常

详细的定义及异常理论浅析请看 《架构设计 异常处理方案》

简单来讲,层级式的异常(也就是通过继承的方式来组成异常的层级结构,如 Exception -> SQLException -> SQLTimeoutException)更适用于中间件的开发设计,因为中间件的调用方通常是代码,编程语言级别的异常能让使用者更方便的处理针对各个不同异常的情况。而业务代码通常在返回异常信息后,将其通过 TCP 或 UDP 传递给前端界面,显示的结果为字符串,因此以错误码的形式多级管理可能是一种更好的方式。并且业务代码通常比较灵活,并不能很好的抽象为某些固定的类型,因此错误码附加个性化描述的形式更能适应业务的表达及更好地收敛编码所用到的异常类型。

因此我们需要一个方便易用的模型来管理这些业务异常。

下面是异常通用错误码定义的类:

/**
 * {@link Exceptions} 异常处理流程通用错误码
 * 规则如下:
 * 1. 通过收集通用错误码的方式收敛错误码爆炸的问题
 * 2. 通过 Exceptions 工具中的 desc 和 meta 信息完善业务异常
 * @author wangzihao
 */
public enum Code {

    // COMMON 也是给USR返回的数据内容,
    // 如果同类别错误过多,则需要抽象更细化的错误类型
    COMMON_ERROR(100000, "通用错误"), //用于 Asserts 断言
    //////
    // USR
    //////
    INVALID_PARAMETER(100001, "非法参数"),
    //////
    // TPY
    //////
    TRANSACTION_EXECUTE_FAIL(100002, "事务执行失败"),
    //////
    // INR
    //////
    DATABASE_ROW_NOT_EXIST(100003, "数据库不存在查询内容"),
    CACHE_KEY_NOT_EXIST(100004, "缓存不存在查询内容"),
    DISTRIBUTED_LOCK_BLOCKING(100005, "分布式锁阻塞中");

    private int code;
    private String desc;
    private static final Map<Integer, Code> relationship = Arrays.stream(Code.values()).collect(Collectors.toMap(Code::code, Function.identity()));

    Code(int code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public int code() {
        return code;
    }

    public String desc() {
        return desc;
    }

    // 各系统间 code 是通用的,但描述文案可以是不同的。
    public static Code parseFrom(int code) {
        return relationship.get(code);
    }
}

下面是异常的 Builder:

/**
 * Exception 异常处理流程
 * Exceptions.{Type}.{withXXX}.{asException} 的形式
 * Type 分为三种:
 * 1. USR 用户级,异常抛出可被用户看到
 * 2. INR 系统内部异常
 * 3. TPY 第三方系统异常(如中间件、其他服务等)
 * with可装填的信息有以下几种:
 * 1. Code 通用错误码,用于给各异常做个大体上的区分
 * 2. Description 附加错误信息,用于给 Code 细化其业务含义
 * 3. Meta 附加日志信息,用于给日志一些快速定位的条件
 * 4. cause 异常,如果异常来源于其他程序抛出的异常,由此统一引入调用栈
 * @author wangzihao
 */
public class Exceptions implements Serializable {

    public enum Type {
        USR, // User 用户角色异常,需要对异常进行页面上的正确提示
        INR, // Inner System 系统内部级别的异常,需要将关键信息记录入日志
        TPY; // Third Party System 第三方系统,涉及到调用 RPC MQ 等服务的失败

        public Exceptions toExceptions() {
            return EXCEPTIONS_MAP.get(this);
        }
    }

    private final static Map<Type, Exceptions> EXCEPTIONS_MAP = buildExceptionsMap();

    // 用单例生成填写Type的Exceptions优化生成速度,
    // 且方便做统一初始化
    private static Map<Type, Exceptions> buildExceptionsMap(){
        TreeMap<Type, Exceptions> canonicalizer = new TreeMap<>();
        for (Type type : Type.values()) {
            Exceptions replaced = canonicalizer.put(type, new Exceptions(type));
            if (replaced != null) {
                throw new IllegalStateException("Code value duplication between "
                        + replaced.type.name() + " & " + type.name());
            }
        }
        return Collections.unmodifiableMap(canonicalizer);
    }

    public static final Exceptions USR = Type.USR.toExceptions();
    public static final Exceptions INR = Type.INR.toExceptions();
    public static final Exceptions TPY = Type.TPY.toExceptions();

    private final Type type;
    private final Code code;
    private final String description;
    private final Map<String, String> meta;
    private final Throwable cause;

    Exceptions(Type type) {
        this(type, Code.COMMON_ERROR, null, null, null);
    }

    Exceptions(Type type, Code code, String description, Map<String, String> meta, Throwable cause) {
        this.type = type;
        this.code = code;
        this.description = description;
        this.meta = meta;
        this.cause = cause;
    }

    public Type type() {
        return type;
    }

    public Code code(){
        return code;
    }

    public String description() {
        return description;
    }

    public Map<String, String> meta() {
        return meta;
    }

    public Throwable cause() {
        return cause;
    }

    public Exceptions withCode(Code code) {
        if (code == null || code.equals(this.code))
            return this;
        return new Exceptions(this.type, code, this.description, this.meta, this.cause);
    }

    public Exceptions withCause(Throwable cause) {
        if (cause == null || cause.equals(this.cause))
            return this;
        return new Exceptions(this.type, this.code, this.description, this.meta, cause);
    }

    public Exceptions withDescription(String description) {
        if (description == null || description.equals(this.description))
            return this;
        if (this.description == null && description.isEmpty())
            return this;
        return new Exceptions(this.type, this.code, description, this.meta, this.cause);
    }

    public Exceptions withMeta(Map<String, String> meta) {
        if (meta == null || meta.equals(this.meta))
            return this;
        if (this.meta == null && meta.isEmpty())
            return this;
        return new Exceptions(this.type, this.code, description, meta, this.cause);
    }

    public FundxException asCheckedException() {
        validateParameters();
        return new FundxException(this);
    }

    public FundxException asCheckedException(ExceptionHandler... handler) {
        validateParameters();
        FundxException exception = new FundxException(this);
        Arrays.stream(handler).filter(Objects::nonNull).forEach(x -> x.handle(exception));
        return exception;
    }

    public FundxRuntimeException asUncheckedException() {
        validateParameters();
        return new FundxRuntimeException(this);
    }

    public FundxRuntimeException asUncheckedException(ExceptionHandler... handler) {
        validateParameters();
        FundxRuntimeException exception = new FundxRuntimeException(this);
        Arrays.stream(handler).filter(Objects::nonNull).forEach(x -> x.handle(exception));
        return exception;
    }

    private void validateParameters() {
        if (Objects.isNull(this.type))
            throw Exceptions.INR.withCode(Code.INVALID_PARAMETER).withDescription("Vertify type 参数不能为空.").asUncheckedException();
        if (Objects.isNull(this.code))
            throw Exceptions.INR.withCode(Code.INVALID_PARAMETER).withDescription("Vertify code 参数不能为空.").asUncheckedException();
    }
}

注意由于 Exceptions 的成员变量都是 final 类型,因此每次使用 withCode、withDescription、withCause、withMeta 都需要重新 new 一个对象。这通过 Immutable 的线程模式以一种较大的代价保证其线程安全,但由于 Exception 本身是低频的程序分支,因此在处理流程中也可以接受。

下面是依赖的两个 Exception 类及其函数定义的扩展接口:

public interface FundxVirtualException {
    Exceptions.Type getType();

    Code getCode();

    String getDescription();

    Map<String, String> getMeta();
}
public class FundxRuntimeException extends RuntimeException implements FundxVirtualException {

    private final Exceptions exceptions;

    FundxRuntimeException(Exceptions exceptions) {
        super(exceptions.description(), exceptions.cause());
        this.exceptions = exceptions;
    }

    @Override
    public Exceptions.Type getType() {
        return exceptions.type();
    }

    @Override
    public Code getCode() {
        return exceptions.code();
    }

    @Override
    public String getDescription() {
        return exceptions.description();
    }

    @Override
    public Map<String, String> getMeta() {
        return exceptions.meta();
    }

}
public class FundxException extends Exception implements FundxVirtualException {

    private final Exceptions exceptions;

    FundxException(Exceptions exceptions) {
        super(exceptions.description(), exceptions.cause());
        this.exceptions = exceptions;
    }

    @Override
    public Exceptions.Type getType() {
        return exceptions.type();
    }

    @Override
    public Code getCode() {
        return exceptions.code();
    }

    @Override
    public String getDescription() {
        return exceptions.description();
    }

    @Override
    public Map<String, String> getMeta() {
        return exceptions.meta();
    }
}

下面是依赖的 Exception 后处理器的定义接口:

public interface ExceptionHandler {

    void handle(FundxException exception);

    void handle(FundxRuntimeException exception);
}

使用后处理器可以很方便的集成功能插件,对抛出的异常做链式处理(如飞书报警、日志个性化打印、MQ 通知、短信通知等)。

传递异常

根据异常设计的理论,异常需要通过网的方式被拦截,最后依次被个性化处理、统一处理,不应有未处理的异常被抛出在主线程。因此我们需要实现以下功能:

  1. 客户端对服务端的异常是有感知的(✓)
  2. 客户端能从服务端获取特定的异常(✓)
  3. 客户端能从服务端获取自定义异常的文本信息(×)
  4. 客户端能从服务端获取自定义异常的堆栈信息(×)
  5. 客户端能抛出服务端自定义的异常(×)

以上五条 表示 gRPC 框架已实现, × 表示需要自己实现。

获取自定义异常文本信息

传递文本信息在 gRPC 框架中本身就有着较好的支持,只需要通过 gRPC 内部的 StatusRuntimeException 传递即可。

Status.INTERNAL
        .withDescription(t.getMessage())
        .withCause(t)
        .asRuntimeException()

也可以通过 Metadata 的方式传递序列化的信息。

message Error {
  string type = 1;
  int32 code = 2;
  string description = 3;
  string cause = 4;
  map<string, string> meta = 5;
}
 Metadata trailers = new Metadata();

Base.Error error = Base.Error.newBuilder()
  .setType(exception.getType().name())
  .setCode(exception.getCode().code())
  .setDescription(StringUtils.defaultIfBlank(exception.getDescription(), exception.getCode().desc()))
  .setCause(encode(exception))
  .putAllMeta(exception.getMeta())
  .build();
trailers.put(Metadata.Key.of("error-bin", Metadata.BINARY_BYTE_MARSHALLER), Metadata.BINARY_BYTE_MARSHALLER.toBytes(error.toByteArray()));
this.origin.onError(Status.INTERNAL.withCause(t).asRuntimeException(trailers));

解决了客户端和服务端异常时数据相互传递的问题,接下来的问题是解决客户端能从服务端获取自定义异常的堆栈信息。

获取自定义异常的堆栈信息

需要知道的是,通过 gRPC 提供的异常 StatusRuntimeException 并不会序列化它的异常堆栈信息(其实也并没有必要,客户端在大多数情况下并不需要知道服务端为什么返回一个未知的报错,唯一我能想到的场景就是在 CS 两端联调的时候),那我们如何传递这个信息呢?

其实市面上有非常多的序列化类的方式:protobuf、thrift、JDK 提供的序列化方式等等,这里就不再赘述。只是简单地采用 JDK 自带的序列化工具。作为对端调试的能力并不需要考虑性能以及带宽等。

private ByteString encode(FundxVirtualException exception) {

        if (exception == null)
            return null;

        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            ObjectOutputStream oo = new ObjectOutputStream(byteArrayOutputStream);
            oo.writeObject(exception);
            oo.flush();
            oo.close();
            return ByteString.copyFrom(byteArrayOutputStream.toByteArray());
        } catch (IOException e) {
            log.error("StreamObserverDelegate IO 异常, 内容:{}", exception, e);
            return null;
        }
    }
}
private Throwable decode(ByteString content) {

        if (content.isEmpty())
            return null;

        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content.toByteArray());
            ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
            FundxVirtualException ex = (FundxVirtualException) ois.readObject();
            ois.close();
            byteArrayInputStream.close();
            return ex.getThrowable();
        } catch (IOException | ClassNotFoundException e) {
            log.error("Result IO 异常或 Cast 异常, 内容:{}", content, e);
            return null;
        }
    }
}

抛出服务端自定义的异常

我们有多个切入点去做这件事,如修改 GRPC 中 ServerCalls 和 ClientCalls 的源码让其支持这样一件事,但对于实现业务功能而修改中间件源码会带来更加复杂的问题。作者实现的思路:

  1. Server 端通过对 StreamObserver 的委托为其 onError 调用增加一条边路,通过 Metadata 传递自封装的业务异常信息。
  2. Client 端通过对 Stub 的调用封装,对不同结果进行处理。

以下是服务端封装思路:


public class StreamObserverDelegate<RespT extends Message> implements StreamObserver<RespT> {


    private StreamObserver<RespT> origin;

    public StreamObserverDelegate(StreamObserver<RespT> origin) {
        if (origin == null) {
            throw new IllegalArgumentException("StreamObserverDelegate parameter can't be null.");
        }
        this.origin = origin;
    }

    @Override
    public void onNext(RespT value) {
        this.origin.onNext(value);
    }

    @Override
    public void  onError(Throwable t) {
        if (t instanceof FundxException || t instanceof FundxRuntimeException) {

            // 业务异常,返回错误码和默认文案到客户端
            FundxVirtualException exception = (FundxVirtualException) t;

            Metadata trailers = new Metadata();

            Base.Error error = Base.Error.newBuilder()
                    .setType(exception.getType().name())
                    .setCode(exception.getCode().code())
                    .setDescription(StringUtils.defaultString(exception.getDescription(), exception.getCode().desc()))
                    .setCause(encode(exception))
                    .putAllMeta(exception.getMeta())
                    .build();
            trailers.put(Metadata.Key.of("error-bin", Metadata.BINARY_BYTE_MARSHALLER), Metadata.BINARY_BYTE_MARSHALLER.toBytes(error.toByteArray()));
            this.origin.onError(Status.INTERNAL.withCause(t).asRuntimeException(trailers));
        } else {
            // 非业务异常,返回异常详情到客户端。
            this.origin.onError(Status.INTERNAL
                    .withDescription(t.getMessage())
                    .withCause(t)
                    .asRuntimeException());
        }
    }

    @Override
    public void onCompleted() {
        origin.onCompleted();
    }

    private ByteString encode(FundxVirtualException exception) {

        if (exception == null)
            return null;

        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
            ObjectOutputStream oo = new ObjectOutputStream(byteArrayOutputStream);
            oo.writeObject(exception);
            oo.flush();
            oo.close();
            return ByteString.copyFrom(byteArrayOutputStream.toByteArray());
        } catch (IOException e) {
            log.error("StreamObserverDelegate IO 异常, 内容:{}", exception, e);
            return null;
        }
    }
}

对结果调用的封装:


public class Result {

    private boolean isSuccess;
    private Object result;

    private Result(Object result, boolean isSuccess) {
        this.result = result;
        this.isSuccess = isSuccess;
    }

    public static <T> Result success(T result) {
        return new Result(result, true);
    }

    public static Result error(Base.Error result) {
        return new Result(result, false);
    }

    public boolean isSuccess() {
        return isSuccess;
    }

    public <T> T success() {
        return isSuccess() ? (T) result : null;
    }

    public Base.Error failure() {
        return isSuccess() ? null : (Base.Error) result;
    }

    public <T> T peek(Consumer<T> handler) {

        try {
            if (isSuccess()) {
                T succeed = (T) result;
                handler.accept(succeed);
                return succeed;
            }
        } catch (ClassCastException e) {
            throw new ClassCastException("Result.peek 类型转换错误.");
        }

        Base.Error error = (Base.Error) result;
        throw Exceptions.Type.valueOf(error.getType()).toExceptions()
                .withCode(Code.parseFrom(error.getCode()))
                .withDescription(error.getDescription())
                .withCause(decode(error.getCause()))
                .withMeta(error.getMetaMap())
                .asUncheckedException();
    }

    public <T, R> R map(Function<T, R> handler) {

        try {
            if (isSuccess()) {
                return handler.apply((T) result);
            }
        } catch (ClassCastException e) {
            throw new ClassCastException("Result.map 类型转换错误.");
        }

        Base.Error error = (Base.Error) result;
        throw Exceptions.Type.valueOf(error.getType()).toExceptions()
                .withCode(Code.parseFrom(error.getCode()))
                .withDescription(StringUtils.defaultIfBlank(error.getDescription(), null))
                .withCause(decode(error.getCause()))
                .withMeta(error.getMetaMap())
                .asUncheckedException();
    }

    private Throwable decode(ByteString content) {

        if (content.isEmpty())
            return null;

        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(content.toByteArray());
            ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
            FundxVirtualException ex = (FundxVirtualException) ois.readObject();
            ois.close();
            byteArrayInputStream.close();
            return ex.getThrowable();
        } catch (IOException | ClassNotFoundException e) {
            log.error("Result IO 异常或 Cast 异常, 内容:{}", content, e);
            return null;
        }
    }
}

以下是客户端封装思路:

public class Rpcs {
    private static final Metadata.Key<byte[]> ERROR_KEY = Metadata.Key.of("error-bin", Metadata.BINARY_BYTE_MARSHALLER);

    public static <T, R> Result execute(T item, Function<T, R> function) {
        try {
            R r = function.apply(item);

            return Result.success(r);
        } catch (StatusRuntimeException e) {
            if (e.getTrailers() != null && e.getTrailers().containsKey(ERROR_KEY)) {
                try {
                    return Result.error(Base.Error.parseFrom(e.getTrailers().get(ERROR_KEY)));
                } catch (InvalidProtocolBufferException ipbe){
                    return Result.error(
                            Base.Error.newBuilder()
                                    .setType(Exceptions.Type.INR.name())
                                    .setCode(Code.INVALID_PARAMETER.code())
                                    .setDescription("未得到正确的序列化异常")
                                    .build());
                }
            } else {
                throw e;
            }
        }
    }
}

最终的实现效果

private class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  @Override
  public void sayHello(Base.HelloRequest req, StreamObserver<Base.HelloReply> responseObserver){
    StreamObserverDelegate<Base.HelloReply> delegate = new StreamObserverDelegate<>(responseObserver);
    delegate.onError(
      Exceptions.USR
      .withCode(Code.COMMON_ERROR)
      .withDescription("测试业务描述")
      .withCause(new RuntimeException("测试Runtime异常"))
      .asUncheckedException());
  }
}

通过委托 StreamObserver 抛出封装的业务异常 FundxRuntimeException。

Base.HelloRequest request = Base.HelloRequest.newBuilder().setName(name).build();
Base.Error error = Rpcs.execute(request, blockingStub::sayHello).success();

客户端通过委托调用器获取最终的执行结果。

Exception in thread "main" exception.FundxRuntimeException: 测试业务描述
	at exception.Exceptions.asUncheckedException(Exceptions.java:132)
	at Result.peek(Result.java:62)
	at GrpcClient.greet(GrpcClient.java:35)
	at GrpcClient.main(GrpcClient.java:49)
Caused by: exception.FundxRuntimeException: 测试业务描述
	at exception.Exceptions.asUncheckedException(Exceptions.java:132)
	at HelloWorld_Server$GreeterImpl.sayHello(HelloWorld_Server.java:80)
	at com.elijah.grpc.GreeterGrpc$MethodHandlers.invoke(GreeterGrpc.java:236)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 测试Runtime异常
	at HelloWorld_Server$GreeterImpl.sayHello(HelloWorld_Server.java:78)
	... 9 more

日志获取到服务端的异常调用栈用于调试当前的程序。如果是非指定异常则与 gRPC 一般报错相同。

传递信息

但对于 gRPC 来说更好的实践应该是制定合理统一的错误码及返回文案,并且各调用方严格执行,但这种情况更适用于类似 IM 系统的产品,如石墨文档、聊天软件等。