0%

RPC gRPC框架

随着微服务和云原生相关技术的发展,应用程序的架构模式已从传统的单体架构或分层架构转向了分布式的计算架构。尽管分布式架构本身有一定的开发成本和运维成本,但它所带来的收益是显而易见的。正是在这样的背景下,gRPC应运而生并成为云原生计算基金会的孵化项目广泛应用于众多开源项目和企业级项目中。

进程间通信技术的演化

传统的RPC

在构建客户端–服务器端应用程序方面,RPC 是很流行的进程间通信技术。借助 RPC,客户端能够像调用本地方法那样远程调用某个方法的功能。早期有一些很流行的 RPC 实现,比如通用对象请求代理体系结构 (common object request broker architecture,CORBA) 和 Java 远程方法调用 (remote method invocation,RMI),它们都用来构建和连接服务或应用程序。但是,大多数传统的 RPC 实现极其复杂,因为它们构建在 TCP 这样的通信协议之上,而这会妨碍互操作性,并且它们还有大量的规范限制。

SOAP

鉴于 CORBA 等传统 RPC 实现的局限性,简单对象访问协议(simple object access protocol,SOAP)应运而生,并且得到了微软、IBM 等企业的大力推广。SOAP 是面向服务的架构(service-oriented architecture,SOA)中的标准通信技术,用于在服务(在 SOA 中通常叫作 Web 服务)之间交换基于 XML 的结构化数据,并且能够基于任意的底层通信协议进行通信,其中最常用的协议是 HTTP。

通过 SOAP,可以定义服务接口、服务的操作以及调用这些操作的 XML 消息格式。SOAP 曾是一项非常流行的技术,但其消息格式的复杂性以及围绕 SOAP 所构建的各种规范的复杂性,妨碍了构建分布式应用程序的敏捷性。因此,在现代分布式应用程序开发中,SOAP Web 服务被认为是一种遗留技术。大多数现有的分布式应用程序采用 REST 架构风格,而非 SOAP。

REST

描述性状态迁移(representational state transfer,REST)架构风格起源于 Roy Fielding 的博士论文。Fielding 是 HTTP 规范的主要作者之一,也是 REST 架构风格的创始人。REST 是面向资源的架构(resource-oriented architecture,ROA)的基础,在这种架构中,需要将分布式应用程序建模为资源集合,访问这些资源的客户端可以变更这些资源的状态(创建、读取、更新或删除)。

REST 的通用实现是 HTTP,通过 HTTP,可以将 RESTful Web 应用程序建模为能够通过唯一标识符访问的资源集合。应用于资源的状态变更操作会采用 HTTP 动词(GET、POST、PUT、DELETE、PATCH 等)的形式,资源的状态会以文本的格式来表述,如 JSON、XML、HTML、YAML 等。

实际上,通过 HTTP 和 JSON 将应用程序构建为 REST 架构风格已成为构建微服务的标准方法。但是,随着微服务的数量及其网络交互的激增,RESTful 服务已经无法满足现代化的需求了。下面介绍 RESTful 服务的 3 个主要局限性,这些局限性妨碍了其作为消息协议在现代微服务应用程序中的运用。

  1. 基于文本的低效消息协议

从本质上来讲,RESTful 服务建立在基于文本的传输协议(如 HTTP 1.x)之上,并且会使用人类可读的文本格式,如 JSON。但是,在进行服务与服务之间的通信时,通信双方都不需要这种人类可读的文本化格式,这时使用这种格式非常低效。

客户端应用程序(源)生成需要发送给服务器的二进制内容,然后需要将二进制结构转换成文本(如果使用 HTTP 1.x,就只能发送文本化消息),并通过网络以文本的形式(借助 HTTP)发送到另一台机器上,这台机器需要在服务器端(目标)解析文本并将其转换回二进制结构。其实,我们也可以很轻松地发送映射服务和消费者业务逻辑的二进制内容,采用 JSON 格式主要是因为它是“人类可读的”,相对来说易于使用。这涉及工具选择问题,而不是二进制协议问题。

  1. 应用程序之间缺乏强类型接口

随着越来越多的服务要通过网络进行交互,而且这些服务使用完全不同的语言来构建,缺乏明确定义和强类型的服务接口成了使用 RESTful 服务的主要阻碍。RESTful 中现有的各种服务定义技术 (如 OpenAPI/Swagger 等)都是事后的补救措施,并没有与底层的架构风格或消息协议紧密集成在一起。

在构建这种分散的应用程序时,会遇到很多的不兼容、运行时错误和互操作等问题。例如,在开发 RESTful 服务时,应用程序之间并不需要共享服务定义和类型定义的信息。但是,在开发 RESTful 应用程序时,我们要么通过网络查看文本格式,要么使用第三方 API 定义技术(如 OpenAPI)。因此,现在非常重要的任务就是拥有现代化的强类型服务定义技术以及框架,从而为多语言技术生成核心的服务器端代码和客户端代码。

  1. REST架构风格难以强制实施

REST 架构风格有很多“好的实践”,只有遵循这些实践,才能构建出真正的 RESTful 服务。但是,由于它们并没有作为实现协议(比如 HTTP)的一部分进行强制的要求,因此在实现阶段,这些实践很难实施。事实上,大多数自称 RESTful 的服务并没有遵循基础的 REST 架构风格,也就是说,这些所谓的 RESTful 服务不过是通过网络公开的 HTTP 服务。因此,开发团队必须花费大量时间来维护 RESTful 服务的一致性和纯度。

鉴于进程间通信技术在构建现代云原生应用程序时所存在的这些限制,人们开始寻求更好的消息协议。

gRPC

gRPC的起源

长期以来,谷歌有一个名为 Stubby 的通用 RPC 框架,用来连接成千上万的微服务,这些微服务跨多个数据中心并且使用完全不同的技术来构建。Stubby 的核心 RPC 层每秒能处理数百亿次的互联网请求。Stubby 有许多很棒的特性,但无法标准化为业界通用的框架,这是因为它与谷歌内部的基础设施耦合得过于紧密。

2015 年,谷歌发布了开源 RPC 框架 gRPC,这个 RPC 基础设施具有标准化、可通用和跨平台的特点,旨在提供类似 Stubby 的可扩展性、性能和功能,但它主要面向社区。

在此之后,gRPC 的受欢迎程度陡增,很多大型公司大规模采用了 gRPC,如 Netflix、Square、Lyft、Docker、CoreOS 和思科。接着,gRPC 加入了云原生计算基金会(Cloud Native Computing Foundation, CNCF),这是最受欢迎的开源软件基金会之一,它致力于让云原生计算具备通用性和可持续性。gRPC 从 CNCF 生态系统项目中获得了巨大的发展动力。

下面看一下相对于传统进程间通信协议,选择使用 gRPC 的一些关键原因。

选择gRPC的原因

gRPC 是一种支持互联网规模的进程间通信技术,可以弥补传统进程间通信技术的大多数缺点。鉴于 gRPC 所带来的收益,越来越多的现代应用程序和服务器将其进程间通信协议替换成了 gRPC。在面对如此众多的可选方案时,为什么选择 gRPC 作为通信协议呢?下面详细介绍 gRPC 的关键优势。

gRPC 的优势

gRPC 的优势是它被越来越多的人所采用的关键所在,主要有以下几个方面。

  1. 提供高效的进程间通信。

    gRPC 没有使用 JSON 或 XML 这样的文本化格式,而是使用一个基于 protocol buffers 的二进制协议与 gRPC 服务和客户端通信。同时,gRPC 在 HTTP/2 之上实现了 protocol buffers,从而能够更快地处理进程间通信。这样一来,gRPC 就变成了最高效的进程间通信技术之一。

  2. 具有简单且定义良好的服务接口和模式
    gRPC 为应用程序开发提供了一种契约优先的方式。也就是说,首先必须定义服务接口,然后才能去处理实现细节。因此,与 RESTful 服务定义中的 OpenAPI/Swagger 和 SOAP Web 服务中的 WSDL 不同,gRPC 提供了简单但一致、可靠且可扩展的应用程序开发体验。

  3. 属于强类型

    因为使用 protocol buffers 来定义 gRPC 服务,所以 gRPC 服务契约清晰定义了应用程序间进行通信所使用的类型。这样一来,在构建跨多个团队和技术类型的云原生应用程序时,对于其所产生的大多数运行时错误和互操作错误,可以通过静态类型来克服,因此分布式应用程序的开发更加稳定。

  4. 支持多语言

    gRPC 支持多种编程语言。基于 protocol buffers 的服务定义是语言中立的。因此,我们可以选择任意一种语言,它们都能与现有的 gRPC 服务或客户端进行互操作。

  5. 支持双工流

    gRPC 在客户端和服务器端都提供了对流的原生支持,这些功能都被整合到了服务定义本身之中。因此,开发流服务或流客户端变得非常容易。与传统的 RESTful 服务消息风格相比,gRPC 的关键优势就是能够同时构建传统的请求–响应风格的消息以及客户端流和服务器端流。

  6. 具备内置的商业化特性

    gRPC 提供了对商业化特性的内置支持,如认证、加密、弹性 (截止时间和超时)、元数据交换、压缩、负载均衡、服务发现等 。

  7. 与云原生生态系统进行了集成

gRPC 是 CNCF 的一部分,大多数现代框架和技术对 gRPC 提供了原生支持。例如,CNCF 下的很多项目(如 Envoy)支持使用 gRPC 作为通信协议。另外,对于横切性的特性,比如度量指标和监控,gRPC 也得到了大多数工具的支持,比如使用 Prometheus 来监控 gRPC 应用程序。

  1. 行业已成熟并被广泛采用

通过在谷歌进行的大量实战测试,gRPC 已发展成熟。许多大型科技公司采用了 gRPC,如 Square、Lyft、Netflix、Docker、CoreOS 和思科等。

与其他技术一样,gRPC 也存在一定的劣势。在开发应用程序时,了解这些方面非常有用。

gRPC的劣势

下面介绍 gRPC 的一些劣势,在选择它来构建应用程序时,需要注意以下 3 点。

  1. gRPC 可能不太适合面向外部的服务

    大多数的外部消费者可能对 gRPC、REST 或 HTTP 等协议很陌生。因此,如果希望将应用程序或服务通过互联网暴露给外部客户端,gRPC 可能不是最适合的协议。gRPC 服务具有契约驱动、强类型的特点,这可能会限制我们向外部暴露的服务的灵活性,同时消费者的控制权会削弱很多。按照设计,gRPC 网关将是克服该问题的解决方案。

  2. 巨大的服务定义变更是复杂的开发流程

    在现代的服务间通信场景中,模式修改很常见。如果出现巨大的 gRPC 服务定义变更,通常需要重新生成客户端代码和服务器端代码。这需要整合到现有的持续集成过程中,可能会让整个开发生命周期复杂化。但是,大多数 gRPC 服务定义的变更可以在不破坏服务契约的情况下完成,而且只要不引入破坏性的变更,gRPC 就可以与使用不同版本 proto 的客户端和服务器端进行交互。因此,大多数情况并不需要重新生成代码。

  3. gRPC 生态系统相对较小

    与传统的 REST 或 HTTP 等协议相比,gRPC 的生态系统依然相对较小。浏览器和移动应用程序对 gRPC 的支持依然处于初级阶段。

    在开发应用程序时,必须注意这些方面的问题。由此可以看到,gRPC 并不是适用于所有进程间通信需求的万能技术。相反,你需要评估业务场景和需求,选择适当的消息协议。

如前所述,目前有很多新兴的进程间通信技术。因此,有一点非常重要,那就是了解如何将 gRPC 与在现代应用程序开发中流行的类似技术进行对比,从而为服务选择最合适的协议。

gRPC与其他协议的对比:Thrift和GraphQL

前面详细讨论了 RESTful 服务的局限性,正是这些局限性为 gRPC 的诞生奠定了基础。无独有偶,还有很多新兴的进程间通信技术,它们的问世也是为了满足相同的需求。下面看一下目前较为流行的技术,并将之与 gRPC 进行对比。

Thrift

Apache Thrift(以下简称 Thrift)是与 gRPC 类似的 RPC 框架,最初由 Facebook 开发,后来被捐赠给了 Apache。它有自己的接口定义语言并提供了对多种编程语言的支持。Thrift 可以在定义文件中定义数据类型和服务接口。Thrift 编译器以服务定义作为输入,能够生成客户端代码和服务器端代码。Thrift 的传输层为网络 I/O 提供了抽象,并将 Thrift 从系统的其他组成部分中解耦出来,这意味着 Thrift 可以在任意传输实现上运行,如 TCP、HTTP 等。

如果将 Thrift 和 gRPC 进行对比,可以发现它们遵循相同的设计理念和使用目标。但是,两者之间也有一些重要的区别。

  1. 传输方面
    相对于 Thrift,gRPC 的倾向性更强,它为 HTTP/2 提供了一流的支持。gRPC 基于 HTTP/2 的实现充分利用了该协议的功能,从而实现了高效率并且能够支持像流这样的消息模式。

  2. 流方面

gRPC 服务定义原生支持双向流(客户端和服务器端),它本身便是服务定义的一部分。

  1. 采用情况和社区资源方面

从采用情况来看,gRPC 的势头似乎更好,它已围绕 CNCF 项目成功构建了一个良好的生态系统。同时,gRPC 的社区资源非常丰富,比如良好的文档、外部的演讲以及示例。因此,相对于 Thrift,采用 gRPC 会更顺利一些。

  1. 性能方面

虽然目前还没有 gRPC 和 Thrift 对比的官方结果,但一些在线资源对比了两者的性能,结果显示 Thrift 的数据表现更好。然而,gRPC 的绝大多数发布版本经过了大量的性能测试。因此,性能问题不太可能是选择 Thrift 而非 gRPC 的决定性因素。同时,一些其他 RPC 框架提供了类似的功能,但不管怎样,gRPC 是目前最标准、最具交互性和采用范围最广的 RPC 技术,处于领先地位。

GraphQL

GraphQL 是另一项越来越流行的进程间通信技术,该项目由 Facebook 发起并通过开源进行了标准化。它是一门针对 API 的查询语言,并且是基于既有数据满足这些查询的运行时。GraphQL 为传统的客户端–服务器端通信提供了一种完全不同的方法,该方法允许客户端定义希望获得的数据、获取数据的方式以及数据格式。gRPC 则有针对远程方法的特定契约,借此实现客户端和服务器端之间的通信。

GraphQL 更适合面向外部的服务或 API,它们被直接暴露给消费者。在这种情况下,消费者需要对来自服务器端的数据有更多的控制权。以在线零售应用程序场景为例,假设 ProductInfo 服务的

消费者只需要关于商品的特定信息,而不是商品属性的完整集合,而且他们希望能有一种方法来指定想要的信息,那么我们可以借助 GraphQL 来建模一个服务,允许消费者使用 GraphQL 查询语言来查询服务并获取想要的信息。

在 GraphQL 和 gRPC 的大多数使用场景中,GraphQL 用于面向外部的服务或 API,而支撑 API 的内部服务则使用 gRPC 来实现。

接下来看一些实际的 gRPC 采用者及其使用场景。

gRPC的通信模式

作为承载四种通信模式的 Stub,有三种分类:FutureStub、Stub、BlockingStub。其中 FutureStub 只支持普通的 gRPC 服务,不支持流的形式。BlockingStub 支持普通的和服务端流。Stub 支持所有的形式。

一元RPC模式

在一元 RPC 模式中,gRPC 服务器端和 gRPC 客户端的通信始终只涉及一个请求和一个响应。如下图所示,请求消息包含头信息,随后是以长度作为前缀的消息,该消息可以跨一个或多个数据帧。消息最后会添加一个 EOS 标记,方便客户端半关(half-close)连接,并标记请求消息的结束。在这里,“半关”指的是客户端在自己的一侧关闭连接,这样一来,客户端无法再向服务器端发送消息,但仍能够监听来自服务器端的消息。只有在接收到完整的消息之后,服务器端才生成响应。响应消息包含一个头信息帧,随后是以长度作为前缀的消息。当服务器端发送带有状态详情的 trailer 头信息之后,通信就会关闭。

创建服务定义

具体 protobuf 使用见《RPC Protobuf框架》

syntax = "proto3";

import "google/protobuf/wrappers.proto";

option java_multiple_files = true;
option java_package = "io.github.elijah.communication";
option java_outer_classname = "Message";


package communication;

service ComMsg {
  rpc get(google.protobuf.Int32Value) returns (Response) {}
  rpc set(Request) returns (Response) {}
}

message Request {
  int32 id = 1;
  string value = 2;
}

message Response {
  int32 id = 1;
  string value = 2;
}

实现gRPC服务端

首先先要实现 *ImplBase 抽象类,重载定义的 rpc 接口逻辑。

//继承插件ComMsgImplBase,重载服务端处理逻辑
class ComMsgServerImpl extends ComMsgGrpc.ComMsgImplBase {
    //重载方法
    @Override
    public void get(Int32Value request, StreamObserver<Response> responseObserver) {
        //responseObserver用来发送响应给客户端并关闭流
        int value = request.getValue();
        if (value > 0) {
            Response response = Response.newBuilder()
                    .setValue("OK").build();
            //发送响应给客户端
            responseObserver.onNext(response);
            //通过关闭流终结客户端调用
            responseObserver.onCompleted();
        } else {
            //发送错误给客户端
            responseObserver.onError(new IllegalArgumentException("无效参数"));
        }
    }

    @Override
    public void set(Request request, StreamObserver<Response> responseObserver) {
        super.set(request, responseObserver);
    }
}

启动 Server 并阻塞该线程。

public class ComMsgServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //通过ServerBuilder构建Server服务端
        Server server = ServerBuilder
                .forPort(50051)
                .addService(new ComMsgServerImpl())
                .build()
                .start();
        System.out.println("Server start, listening on " + server.getPort());
        //添加运行时的Hook,使JVM在关闭时关闭gRpc服务器
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("Shutting down gRpc server");
            server.shutdown();
        }));
        //服务器线程会一直保持,直到线程终止
        server.awaitTermination();
    }
}

实现gRPC客户端

public class ComMsgClient {
    public static void main(String[] args) {
        //创建 gRPC 通道并指定希望连接的服务器地址和端口
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("127.0.0.1", 50059)
                //启用了明文(plaintext),这意味着在客户端和服务器端之间建立的连接不安全。
                .usePlaintext()
                .build();
        //创建Stub,Stub类型不同决定着接收响应的行为
        ComMsgGrpc.ComMsgBlockingStub stub = ComMsgGrpc.newBlockingStub(channel);
        Response response = stub.get(Int32Value.of(1));
        System.out.println(response);
        //关闭连接,回收相应资源
        channel.shutdown();
    }
}

服务端流RPC模式

在流 RPC 模式中,服务端在接收到客户端请求消息后,会发送一个响应序列。这种多个响应所组成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会给客户端发送带有状态信息的 trailer 元数据,从而标记流的结束,随后通信就会被关闭。

创建服务定义

//...
service ComMsg {
  //为Response,也就是服务器返回数据类型加上 stream 关键字
  rpc get(google.protobuf.Int32Value) returns (stream Response) {}
  //...
}
//...

实现gRPC服务端

//继承插件ComMsgImplBase,重载服务端处理逻辑
class ComMsgServerImpl extends ComMsgGrpc.ComMsgImplBase {
  @Override
  public void get(Int32Value request, StreamObserver<Response> responseObserver) {
    //...
    //发送响应给客户端
    responseObserver.onNext(response);
    responseObserver.onNext(response);
    //通过关闭流终结客户端调用
    responseObserver.onCompleted();
    //...
  }
}

实现gRPC客户端

public class ComMsgClient {
    public static void main(String[] args) {
      //...
      //返回一个流进行读取(阻塞stub中表现形式为一个阻塞队列)
      ComMsgGrpc.ComMsgBlockingStub stub = ComMsgGrpc.newBlockingStub(channel);
      Iterator<Response> iterator = stub.get(Int32Value.of(1));
      iterator.forEachRemaining(System.out::println);
      //...
    }
}

客户端流RPC模式

在客户端流 RPC 模式中,客户端向服务器端发送多条消息,服务器端在响应时发送一条消息。注意,服务器端不一定要等到从客户端接收到所有消息后才发送响应。客户端首先通过发送头信息帧来与服务器端建立连接,然后以数据帧的形式,向服务器端发送多条以长度作为前缀的消息,如下图所示。最后,通过在末尾的数据帧中发送 EOS 标记,客户端将连接设置为半关的状态。与此同时,服务器端读取所接收到的来自客户端的消息。在接收到所有的消息之后,客户端发送一条响应消息和 trailer 头信息,并关闭连接。

创建服务定义

//...
service ComMsg {
  //为Request,也就是客户端输入数据类型加上 stream 关键字
  rpc get(stream google.protobuf.Int32Value) returns (Response) {}
  //...
}
//...

实现gRPC服务端

//继承插件ComMsgImplBase,重载服务端处理逻辑
class ComMsgServerImpl extends ComMsgGrpc.ComMsgImplBase {

    @Override
    public StreamObserver<Int32Value> get(StreamObserver<Response> responseObserver) {
        return new StreamObserver<Int32Value>() {
            @Override
            public void onNext(Int32Value value) {
                System.out.println(value.getValue());
            }
            
            @Override
            public void onError(Throwable t) {}
            
            //由于客户端会发送多个请求,但服务端只能回一个响应,因此onNext函数要放在onCompleted中调用
            @Override
            public void onCompleted() {
                responseObserver.onNext(Response.newBuilder().setValue("OK").build());
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public void set(Request request, StreamObserver<Response> responseObserver) {
        super.set(request, responseObserver);
    }
}

实现gRPC客户端

public class ComMsgClient {
    public static void main(String[] args) throws InterruptedException {
      	//...
        //创建Stub,必须创建异步Stub以保证消息正常传输
        ComMsgGrpc.ComMsgStub stub = ComMsgGrpc.newStub(channel);

      	//接收服务端响应
        StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
            //客户端流RPC,服务器只发送一个响应包,该函数只调用一次
            @Override
            public void onNext(Response value) {}

            @Override
            public void onError(Throwable t) {}

            @Override
            public void onCompleted() {
                System.out.println("complete");
            }
        };

        //发送客户端请求
        StreamObserver<Int32Value> requestStreamObserver = stub.get(responseStreamObserver);
        requestStreamObserver.onNext(Int32Value.of(1));
        requestStreamObserver.onNext(Int32Value.of(2));
        requestStreamObserver.onCompleted();

        Thread.sleep(Integer.MAX_VALUE);
    }
}

双向流RPC模式

在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑并且无需等待对方结束。

创建服务定义

//...
service ComMsg {
  //为 Request 和 Response 加上 stream 关键字
  rpc get(stream google.protobuf.Int32Value) returns (stream Response) {}
  //...
}
//...

实现gRPC服务端

//继承插件ComMsgImplBase,重载服务端处理逻辑
class ComMsgServerImpl extends ComMsgGrpc.ComMsgImplBase {

    @Override
    public StreamObserver<Int32Value> get(StreamObserver<Response> responseObserver) {
        return new StreamObserver<Int32Value>() {
            //由于客户端会发送多个请求,服务端顺序响应,resp onNext 放在 req onNext 中
            @Override
            public void onNext(Int32Value value) {
                System.out.println(int32Value.getValue());
                Response response = Response.newBuilder().setValue(int32Value.getValue() + " OK").build();
                responseObserver.onNext(response);
            }
            
            @Override
            public void onError(Throwable t) {}
            
            @Override
            public void onCompleted() {
            	System.out.println("complete");
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public void set(Request request, StreamObserver<Response> responseObserver) {
        super.set(request, responseObserver);
    }
}

实现gRPC客户端

public class ComMsgClient {
    public static void main(String[] args) throws InterruptedException {
      	//...
        //创建Stub,必须创建异步Stub以保证消息正常传输
        ComMsgGrpc.ComMsgStub stub = ComMsgGrpc.newStub(channel);

      	//接收服务端响应
        StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
            @Override
            public void onNext(Response value) {
                System.out.println(response);
            }

            @Override
            public void onError(Throwable t) {}

            @Override
            public void onCompleted() {
                System.out.println("complete");
            }
        };

        //发送客户端请求
        StreamObserver<Int32Value> requestStreamObserver = stub.get(responseStreamObserver);
        requestStreamObserver.onNext(Int32Value.of(1));
        requestStreamObserver.onNext(Int32Value.of(2));
        requestStreamObserver.onCompleted();

        Thread.sleep(Integer.MAX_VALUE);
    }
}

gRPC高级用法

拦截器

在构建 gRPC 应用程序时,无论是客户端应用程序,还是服务器端应用程序,在远程方法执行之前或之后,都可能需要执行一些通用逻辑。在 gRPC 中,提供了可以拦截 RPC 执行的功能,来满足特定的需求,如日志、认证、性能度量指标等,这会使用一种名为拦截器的扩展机制。

服务器端拦截器

class MyInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        //创建Call接收S->C回调内容
        ServerCall<ReqT, RespT> call = new MyServerCall<ReqT, RespT>(serverCall);
        System.out.println("Pre Handler");
        //创建Listener监听C->S内容
        MyServerCallListener<ReqT> listener = new MyServerCallListener<>(serverCallHandler.startCall(call, metadata));
        System.out.println("Back Handler");
        return listener;
    }
}
//接收S->C回调内容
class MyServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {

    protected MyServerCall(ServerCall delegate) {
        super(delegate);
    }

    @Override
    public void sendMessage(RespT message) {
        System.out.println("Pre Server -> Client");
        super.sendMessage(message);
        System.out.println("Back Server -> Client");
    }
}
//Listener监听C->S内容
class MyServerCallListener<R> extends ForwardingServerCallListener<R> {

    private final ServerCall.Listener<R> delegate;

    public MyServerCallListener(ServerCall.Listener<R> delegate){
        this.delegate = delegate;
    }

    @Override
    protected ServerCall.Listener<R> delegate() {
        return delegate;
    }

    @Override
    public void onMessage(R message) {
        System.out.println("Pre Client -> Server");
        super.onMessage(message);
        System.out.println("Back Client -> Server");
    }
}

public class ComMsgServer {

    public static void main(String[] args) throws Exception {
        Server server = ServerBuilder.forPort(50051)
          		  //绑定拦截器
                .addService(ServerInterceptors.intercept(new ComMsgImpl(), new MyInterceptor()))
                .build()
                .start();
        server.awaitTermination();
    }
}
//实现简单的服务器回调处理
class ComMsgImpl extends com.example.demo.ComMsgGrpc.ComMsgImplBase {
    @Override
    public void get(Int32Value request, StreamObserver<StringValue> responseObserver) {
        System.out.println("recv request");
        responseObserver.onNext(StringValue.of("response"));
        responseObserver.onCompleted();
        System.out.println("send response");
    }
}

客户端拦截器

class MyClientInterceptor implements ClientInterceptor {

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
        //Call 接收C->S的回调
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
//                对response内容进行修改
//                super.start(responseListener, headers);
                //Listener 监听S->C的内容
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener){
                    //修改内容
                }, headers);
            }
        };
    }
}

public class ComMsgClient {

    public static void main(String[] args) throws InterruptedException {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("127.0.0.1", 50051)
                .usePlaintext()
                .build();
        //绑定 interceptor
        Channel channel = ClientInterceptors.intercept(managedChannel, new MyClientInterceptor());
        ComMsgGrpc.ComMsgStub stub = ComMsgGrpc.newStub(channel);
        //...
    }
}

Stub扩展

截止时间

在分布式计算中,截止时间(deadline)和超时时间(timeout)是两个常用的模式。超时时间可以指定客户端应用程序等待 RPC 完成的时间 (之后会以 DEADLINE_EXCEEDED 错误结束),它通常会以持续时长的方式来指定,并且在每个客户端本地进行应用。例如,一个请求可能会由多个下游 RPC 组成,它们会将多个服务链接在一起。因此,可以在每个服务调用上,针对每个 RPC 都指定超时时间。这意味着超时时间不能直接应用于请求的整个生命周期,这时需要使用截止时间。

截止时间以请求开始的绝对时间来表示(即使 API 将它们表示为持续时间偏移),并且应用于多个服务调用。发起请求的应用程序设置截止时间,整个请求链需要在截止时间之前进行响应。gRPC API 支持为 RPC 使用截止时间,出于多种原因,在 gRPC 应用程序中使用截止时间始终是一种最佳实践。由于 gRPC 通信是在网络上发生的,因此在 RPC 和响应之间会有延迟。另外,在一些特定的场景中,gRPC 服务本身可能要花费更多的时间来响应,这取决于服务的业务逻辑。如果客户端应用程序在开发时没有指定截止时间,那么它们会无限期地等待自己所发起的 RPC 请求的响应,而资源都会被正在处理的请求所占用。这会让服务和客户端都面临资源耗尽的风险,增加服务的延迟,甚至可能导致整个 gRPC 服务崩溃。

在 Java 中,通过 Stub 来设置超时和截止时间。

public abstract class AbstractStub<S extends AbstractStub<S>> {
  /**
   * Returns a new stub with an absolute deadline.
   *
   * <p>This is mostly used for propagating an existing deadline. {@link #withDeadlineAfter} is the
   * recommended way of setting a new deadline,
   *
   * @since 1.0.0
   * @param deadline the deadline or {@code null} for unsetting the deadline.
   */
  public final S withDeadline(@Nullable Deadline deadline) {
    return build(channel, callOptions.withDeadline(deadline));
  }

  /**
   * Returns a new stub with a deadline that is after the given {@code duration} from now.
   *
   * @since 1.0.0
   * @see CallOptions#withDeadlineAfter
   */
  public final S withDeadlineAfter(long duration, TimeUnit unit) {
    return build(channel, callOptions.withDeadlineAfter(duration, unit));
  }
}

并在 Client 对其超时抛出的异常进行处理

try {
    // Add Order with a deadline
    StringValue result = stub.addOrder(order);
    logger.info("AddOrder Response -> : " + result.getValue());
} catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
        logger.info("Deadline Exceeded. : " + e.getMessage());
    } else {
        logger.info("Unspecified error from the service -> " + e.getMessage());
    }
}

压缩

为了高效利用网络带宽,在执行客户端和服务之间的 RPC 时,可以使用压缩技术。如果要在客户端使用压缩技术,那么可以通过在发送 RPC 时设置一个压缩器来实现。

stub.withCompression("gzip")

在服务器端,已注册的压缩器会自动解码请求消息,并编码响应消息。在 Go 语言中,注册压缩器只需在 gRPC 服务器端应用程序中导入 GoDoc 网站上的 gzip 包即可(获取方式同上)。服务器端始终会使用客户端所指定的压缩方法。如果对应的压缩器没有注册,则会向客户端返回一个 Unimplemented 状态。

取消

Java 不支持

在客户端应用程序和服务器端应用程序之间的 gRPC 连接中,客户端和 服务器端都能够对调用是否成功在本地做出独立判断。例如,可以让同一个 RPC 在服务器端成功完成,但在客户端让其失败。类似地,在不同的情况下,客户端和服务器端可能会对同一个 RPC 得出不同的结论。但是,无论是客户端应用程序,还是服务器端应用程序,当希望终止 RPC 时,都可以通过取消该 RPC 来实现。一旦取消 RPC,就不能再进行与之相关的消息传递了,并且一方已经取消 RPC 的事实会传递到另一方。

异常处理

下面列出了异常状态的枚举

/**
 * The set of canonical status codes. If new codes are added over time they must choose
 * a numerical value that does not collide with any previously used value.
 */
public enum Code {
  /**
   * The operation completed successfully.
   */
    OK(0),

    /**
     * The operation was cancelled (typically by the caller).
     */
    CANCELLED(1),

    /**
     * Unknown error.  An example of where this error may be returned is
     * if a Status value received from another address space belongs to
     * an error-space that is not known in this address space.  Also
     * errors raised by APIs that do not return enough error information
     * may be converted to this error.
     */
    UNKNOWN(2),

    /**
     * Client specified an invalid argument.  Note that this differs
     * from FAILED_PRECONDITION.  INVALID_ARGUMENT indicates arguments
     * that are problematic regardless of the state of the system
     * (e.g., a malformed file name).
     */
    INVALID_ARGUMENT(3),

    /**
     * Deadline expired before operation could complete.  For operations
     * that change the state of the system, this error may be returned
     * even if the operation has completed successfully.  For example, a
     * successful response from a server could have been delayed long
     * enough for the deadline to expire.
     */
    DEADLINE_EXCEEDED(4),

    /**
     * Some requested entity (e.g., file or directory) was not found.
     */
    NOT_FOUND(5),

    /**
     * Some entity that we attempted to create (e.g., file or directory) already exists.
     */
    ALREADY_EXISTS(6),

    /**
     * The caller does not have permission to execute the specified
     * operation.  PERMISSION_DENIED must not be used for rejections
     * caused by exhausting some resource (use RESOURCE_EXHAUSTED
     * instead for those errors).  PERMISSION_DENIED must not be
     * used if the caller cannot be identified (use UNAUTHENTICATED
     * instead for those errors).
     */
    PERMISSION_DENIED(7),

    /**
     * Some resource has been exhausted, perhaps a per-user quota, or
     * perhaps the entire file system is out of space.
     */
    RESOURCE_EXHAUSTED(8),

    /**
     * Operation was rejected because the system is not in a state
     * required for the operation's execution.  For example, directory
     * to be deleted may be non-empty, an rmdir operation is applied to
     * a non-directory, etc.
     *
     * <p>A litmus test that may help a service implementor in deciding
     * between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE:
     * (a) Use UNAVAILABLE if the client can retry just the failing call.
     * (b) Use ABORTED if the client should retry at a higher-level
     * (e.g., restarting a read-modify-write sequence).
     * (c) Use FAILED_PRECONDITION if the client should not retry until
     * the system state has been explicitly fixed.  E.g., if an "rmdir"
     * fails because the directory is non-empty, FAILED_PRECONDITION
     * should be returned since the client should not retry unless
     * they have first fixed up the directory by deleting files from it.
     */
    FAILED_PRECONDITION(9),

    /**
     * The operation was aborted, typically due to a concurrency issue
     * like sequencer check failures, transaction aborts, etc.
     *
     * <p>See litmus test above for deciding between FAILED_PRECONDITION,
     * ABORTED, and UNAVAILABLE.
     */
    ABORTED(10),

    /**
     * Operation was attempted past the valid range.  E.g., seeking or
     * reading past end of file.
     *
     * <p>Unlike INVALID_ARGUMENT, this error indicates a problem that may
     * be fixed if the system state changes. For example, a 32-bit file
     * system will generate INVALID_ARGUMENT if asked to read at an
     * offset that is not in the range [0,2^32-1], but it will generate
     * OUT_OF_RANGE if asked to read from an offset past the current
     * file size.
     *
     * <p>There is a fair bit of overlap between FAILED_PRECONDITION and OUT_OF_RANGE.
     * We recommend using OUT_OF_RANGE (the more specific error) when it applies
     * so that callers who are iterating through
     * a space can easily look for an OUT_OF_RANGE error to detect when they are done.
     */
    OUT_OF_RANGE(11),

    /**
     * Operation is not implemented or not supported/enabled in this service.
     */
    UNIMPLEMENTED(12),

    /**
     * Internal errors.  Means some invariants expected by underlying
     * system has been broken.  If you see one of these errors,
     * something is very broken.
     */
    INTERNAL(13),

    /**
     * The service is currently unavailable.  This is a most likely a
     * transient condition and may be corrected by retrying with
     * a backoff. Note that it is not always safe to retry
     * non-idempotent operations.
     *
     * <p>See litmus test above for deciding between FAILED_PRECONDITION,
     * ABORTED, and UNAVAILABLE.
     */
    UNAVAILABLE(14),

    /**
     * Unrecoverable data loss or corruption.
     */
    DATA_LOSS(15),

    /**
     * The request does not have valid authentication credentials for the
     * operation.
     */
    UNAUTHENTICATED(16);
  }

以下是 Client 对其超时抛出的异常进行处理

try {
    // Add Order with a deadline
    StringValue result = stub.addOrder(order);
    logger.info("AddOrder Response -> : " + result.getValue());
} catch (StatusRuntimeException e) {
    if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
        logger.info("Deadline Exceeded. : " + e.getMessage());
    } else {
        logger.info("Unspecified error from the service -> " + e.getMessage());
    }
}

gRPC 所提供的“开箱即用”的错误模型非常有限,并且与底层的 gRPC 数据格式无关,其中最常见的格式就是 protocol buffers。如果使用 protocol buffers 作为数据格式,那么可以利用 google.rpc 包所提供的更丰富的错误模型。

gRPC 错误状态和详情通常会通过 trailer 头信息在传输层发送。

元数据

gRPC 应用程序通常会通过 gRPC 服务和消费者之间的 RPC 来共享信息。在大多数场景中,与服务业务逻辑和消费者直接相关的信息会作为远程方法调用参数的一部分,但在某些场景中,因为预期共享的关于 RPC 的信息可能与 RPC 业务上下文并没有关联,所以它们不应该作为 RPC 参数的一部分。在这样的场景中,可以使用 gRPC 元数据(gRPC metadata),元数据可以在 gRPC 服务或 gRPC 客户端发送和接收。如下图所示,在客户端或服务器端创建的元数据,可以通过 gRPC 头信息在客户端应用程序和服务器端应用程序之间进行交换。元数据的构造遵循键(字符串)–值对的形式。

元数据最常见的一个用途就是在 gRPC 应用程序之间交换安全头信息。与之类似,可以使用这种方式在 gRPC 应用程序之间交换任意类似信息。拦截器一般会大量使用 gRPC 元数据 API。下面将探讨 gRPC 如何支持在客户端和服务器端之间发送元数据。

metadata.put(Metadata.Key.of("Key_1", ASCII_STRING_MARSHALLER), "Value");
metadata.get(Metadata.Key.of("Key_1", ASCII_STRING_MARSHALLER));

多路复用

Java 无内置实现

除了之前介绍的在给定的 gRPC 服务器端上注册唯一的 gRPC 服务,并且由单个客户端存根使用 gRPC 客户端进行连接。但是,gRPC 还允许在同一个 gRPC 服务器端上运行多个 gRPC 服务,也允许多个客户端存根使用同一个 gRPC 客户端连接,这种功能叫作多路复用(multiplexing)。

对于多个服务或者多个存根使用相同的连接,这只涉及设计形式,与 gRPC 理念无关。在微服务等大多数日常使用场景中,通常并不会在两个服务间共享同一个 gRPC 服务器端。

在微服务架构中,gRPC 多路复用的一个强大的用途就是在同一个服务器端进程中托管同一个服务的多个主版本。这样做能够保证 API 在发生破坏性变更之后,依然能够适应遗留的客户端。一旦服务契约的旧版本不再有效,就可以在服务器端将其移除了。

gRPC安全连接

使用TLS认证gRPC通道

启用单向安全连接

在单向连接中,只有客户端会校验服务器端,以确保它所接收的数据来自预期的服务器。在建立连接时,服务器端会与客户端共享其公开证书,客户端则会校验接收到的证书。这是通过证书授权中心(certificate authority,CA)完成的,也就是 CA 签署的证书。证书校验之后,客户端会发送使用密钥加密的数据。

启用mTLS保护的连接

客户端和服务器端采用 mTLS 连接的主要目的是,控制能够连接服务器端的客户端。与单向安全连接不同,这种方式会将服务器配置为仅接受来自一组范围有限、已验证的客户端的连接。在这种方式中,双方彼此共享公开证书,并校验对方的身份。连接的基本流程如下所示。

  1. 客户端发送一个请求,试图访问服务器端受保护的信息。

  2. 服务器端发送它的 X.509 证书给客户端。

  3. 客户端通过CA对接收到的证书进行校验,判断是否为CA签名的

    证书。

  4. 如果校验成功,则客户端发送其自身的证书到服务器端。

  5. 服务器端也通过CA验证客户端证书。

  6. 验证成功之后,服务器端就允许客户端访问受保护的数据了。

对gRPC调用进行认证

使用 Basic 认证

basic 认证是最简单的认证机制。在这种机制中,客户端发送的请求带有 Authorization 头信息,该头信息的值以单词 Basic 开头,随后是一个空格和 base64 编码的字符串 < 用户名 >:< 密码 >。如果用户名和密码均为 admin,那么头信息将如下所示:

Authorization: Basic YWRtaW46YWRtaW4=

总体而言,gRPC 并不提倡使用用户名/密码来对服务进行认证。这是因为,相对于 JSON Web Token(JWT) 和 OAuth2 Access Token 等其他令牌,用户名/密码没有时间方面的限制。这意味着当生成一个令牌时,我们可以指定它的有效时间,但对于用户名/密码,则不能指定它的有效期。在我们更改密码之前,它始终是有效的。如果需要在应用程序中启用 basic 认证,建议在客户端和服务器端之间的安全连接中共享基本凭证。我们选择 basic 认证,是为了能更方便地阐述 gRPC 中的认证原理。

使用OAuth 2.0

OAuth 2.0 是一个用于访问委托的框架。它允许用户以自己的名义授予服务有限的访问权限,而不会像用户名和密码方式那样给予服务全部访问权限。

在 OAuth 2.0 的流程中,有 4 个主要的角色:客户端、授权服务器、资源服务器和资源所有者。客户端要访问资源服务器上的资源。为了访问资源,客户端需要获取一个来自授权服务器的令牌(这是任意的一个字符串)。这个令牌必须具备恰当的长度,并且应该是不可预知的。客户端接收到该令牌之后,就可以使用它向资源服务器发送请求了。随后,资源服务器会与对应的授权服务器通信,并校验该令牌。如果该资源所有者校验了它,那么客户端就可以访问该资源。

使用JWT

JWT 定义了一个在客户端和服务器端传输身份信息的容器。签名的 JWT 可用作自包含的访问令牌,这意味着资源服务器无须与授权服务器通信来验证客户端的令牌,它可以通过验证签名来校验令牌。客户端请求访问授权服务器,授权服务器校验客户端的凭证,创建 JWT 并将其发送给客户端。带有 JWT 的客户端应用程序就允许访问资源了。

gRPC 内置了对 JWT 的支持。如果具有来自授权服务器的 JWT 文件,则需要传递该文件并创建 JWT 凭证。

使用基于令牌的谷歌认证

识别用户,并决定是否允许他们访问部署在谷歌云平台上的服务,该平台是由可扩展服务代理(extensible service proxy,ESP)控制的。ESP 支持多种认证方法,包括 Firebase、Auth0 和 Google ID Token。不管使用哪种方法,客户端都需要在它们的请求中包含一个有效的 JWT。为了生成认证 JWT,我们必须为每个部署的服务创建一个服务账号。

获取到服务的 JWT 令牌之后,就可以通过和请求一起发送令牌来调用服务方法了。我们可以在创建通道时将凭证传递进来。

参考内容