0%

ZooKeeper 典型应用场景

ZooKeeper 是一个高可用的分布式数据管理与协调框架。基于对 ZAB 算法的实现,该框架能够很好地保证分布式环境中数据的一致性。也正是基于这样的特性,使得 ZooKeeper 成为了解决分布式一致性问题的利器。

典型应用场景及实现

数据发布订阅(配置中心)

数据发布订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

发布订阅系统一般有两种设计模式,分别是推(Push)模式和拉(Pull)模式。在推模式中,服务端主动将数据更新发送给所有订阅的客户端;而拉模式则是由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。关于这两种模式更详细的讲解以及各自的优缺点,这里就不再赘述,读者可以自行到互联网上搜索相关的资料作进一步的了解。 ZooKeeper 采用的是推拉相结合的方式:客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送 Watcher 事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。

实践:分布式日志收集系统

分布式日志收集系统的核心工作是收集分布在不同机器上的日志。收集器通常是按照应用来分配收集任务单元的,因此需要在 ZooKeeper 上创建一个以应用名作为 path 的节点 P,并将这个应用的所有机器的 IP 地址以子节点的形式注册到节点 P 上,这样一来就能够在机器变动的时候实时通知收集器,以调整任务分配。

实践:系统消息实时监控

在传统系统中,有些信息需要动态获取,实时修改,通常的办法是暴露出接口,例如暴露 JMX 接口来获取一些运行时的信息。在引入 ZooKeeper 之后就不用自己实现一套方案了,只要将这些信息存放到指定的 ZooKeeper 节点上即可。

需要注意的是,上面提到的应用场景有一个默认前提:数据量小但数据更新频繁。

负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,以集群方式对外提供服务。而消费者需要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者负载均衡和消费者负载均衡。

一种动态的DNS服务

在实际开发中,往往使用本地 HOST 绑定来实现域名解析的工作。使用本地 HOST 绑定的方法,可以很容易解决域名紧张的问题,基本上每一个系统都可以自行确定系统的域名与目标 IP 地址。同时,这种方法对于开发人员最大的好处就是可以随时修改域名与 IP 的映射,大大提高了开发调试效率。

然而,这种看上去完美的方案,也有其致命的缺陷:当应用的机器规模在一定范围内,并且域名的变更不是特别频繁时,本地 HOST 绑定是非常高效且简单的方式。然而一旦机器规模变大后,就常常会碰到这样的情况:我们在应用上线的时候,需要在应用的每台机器上去绑定域名,但是在机器规模相当庞大的情况下,这种傚法就相当不方便。另外,如果想要临时更新域名,还需要到毎个机器上去逐个进行变更,要消耗大量时间,因此完全无法保证实时性。

下面来介绍一种基于 ZooKeeper 实现的动态 DNS 方案(以下简称该方案为“DDNS”, Dynamic DNS)

域名配置

和配置管理一样,我们首先需要在 ZooKeeper 上创建一个节点来进行域名配置,如下

域名解析

在传统的 DNS 解析中,我们都不需要关心域名的解析过程,所有这些工作都交给了操作系统的域名和 IP 地址映射机制(本地 HOST 绑定)或是专门的域名解析服务器(由域名注册服务商提供)。因此,在这点上,DDNS 方案和传统的域名解析有很大的区别 —— 在 DDNS 中,域名的解析过程都是由每一个应用自己负责的。通常应用都会首先从域名节点中获取一份 IP 地址和端口的配置,进行自行解析。同时,每个应用还会在域名节点上注册一个数据变更 Watcher 监听,以便及时收到域名变更的通知。

域名变更

在运行过程中,难免会碰上域名对应的 IP 地址或是端口变更,这个时候就需要进行域名变更操作。在 DDNS 中,我们只需要对指定的域名节点进行更新操作,ZooKeeper 就会向订阅的客户端发送这个事件通知,应用在接收到这个事件通知后,就会再次进行域名配置的获取。

域名探测

域名探测是指 DDNS 系统需要对域名下所有注册的 IP 地址和端口的可用性进行检测,俗称“健康度检测”。健康度检测一般有两种方式,第一种是服务端主动发起健康度心跳检测,这种方式一般需要在服务端和客户端之间建立起一个 TCP 长链接;第二种则是客户端主动向服务端发起健康度心跳检测。

自动化的DNS服务整体实现介绍

上面我们介绍了如何使用 ZooKeeper 来实现一种动态的 DNS 系统。通过 ZooKeeper 来实现动态 DNS 服务,一方面,可以避免域名数量无限增长带来的集中式维护的成本;另一方面,在域名变更的情况下,也能够避免因逐台机器更新本地 HOST 而带来的繁琐工作。

首先来介绍整个动态 DNS 系统的架构体系中几个比较重要的组件及其职责

  • Register 集群负责域名的动态注册
  • Dispatcher 集群负责域名解析
  • Scanner 集群负责检测以及维护服务状态(探测服务的可用性、屏蔽异常服务节点等)
  • SDK 提供各种语言的系统接入协议,提供服务注册以及査询接口
  • Monitor 负责收集服务信息以及对 DDNS 自身状态的监控
  • Controller 是一个后台管理的 Console,负责授权管理、流量控制、静态配置服务和手动屏蔽服务等功能,另外,系统的运维人员也可以在上面管理 Register Dispatcher 和 Scanner 等集群。

整个系统的核心当然是 ZooKeeper 集群,负责数据的存储以及一系列分布式协调。下面我们再来详细地看下整个系统是如何运行的。在这个架构模型中,我们将那些目标 IP 地址和端口抽象为服务的提供者,而那些需要使用域名解析的客户端则被抽象成服务的消费者。

在 DDNS 架构中的域名探测,使用的是服务提供者主动向 Scanner 进行状态汇报(即第二种健康度检测方式)的模式,即毎个服务提供者都会定时向 Scanner 汇报自己的状态。Scanner 会负责记录每个服务提供者最近一次的状态汇报时间,一旦超过 5 秒没有收到状态汇报,那么就认为该 IP 地址和端口已经不可用,于是开始进行域名清理过程。在域名清理过程中,Scanner 会在 ZooKeeper 中找到该域名对应的域名节点,然后将该 IP 地址和端口配置从节点内容中移除。

命名服务

命名服务(Name Service)是分布式系统中比较常见的一类场景。在分布式系统中,被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等 —— 这些我们都可以统称它们为名字(Name),其中较为常见的就是一些分布式服务框架(如 RPC 、RMI)中的服务地址列表,通过使用命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的信息等。

注意:所有在 ZooKeeper 上注册的地址都是临时节点,这样就可以保证服务提供者和服务消费者能够自动感应资源的变化。

Java 语言中的 JNDI 便是一种典型的命名服务。JNDI 是 Java 命名与目录接口(Java Naming and Directory Interface)的缩写,是 J2EE 体系中重要的规范之一,标准的 J2EE 容器都提供了对 JNDI 规范的实现。因此,在实际开发中,开发人员常常使用应用服务器自带的 JNDI 实现来完成数据源的配置与管理一一使用 JNDI 方式后,开发人员可以完全不需要关心与数据库相关的任何信息,包括数据库类型、JDBC 驱动类型以及数据库账户等。

ZooKeeper 提供的命名服务功能与 JNDI 技术有相似的地方,都能够帮助应用系统通过一个资源引用的方式来实现对资源的定位与使用。另外,广义上命名服务的资源定位都不是真正意义的实体资源 —— 在分布式环境中,上层应用仅仅需要一个全局唯一的名字,类似于数据库中的唯一主键。下面我们来看看如何使用 ZooKeeper 来实现一套分布式全局唯 lD 的分配机制。

一说起全局唯一 ID ,相信读者都会联想到 UUID 。没错,UUID 是通用唯一识别码(Universally Unique Identifier)的简称,是一种在分布式系统中广泛使用的用于唯一标识元素的标准,最典型的实现是 GUID(Globally Unique Identifier, 全局唯一标识符),主流 ORM 框架 Hibernate 有对 UUID 的直接支持。

通过调用 ZooKeeper 节点创建的 API 接口可以创建一个顺序节点,并且在 APl 返回值中会返回这个节点的完整名字。利用这个特性,我们就可以借助 ZooKeeper 来生成全局唯一的 ID 了。

在 ZooKeeper 中, 每一个数据节点都能够维护一份子节点的顺序顺列,当客户端对其创建一个顺序子节点的时候 ZooKeeper 会自动以后缀的形式在其子节点上添加一个序号,在这个场景中就是利用了 ZooKeeper 的这个特性。

分布式协调/通知

分布式协调/通知服务是分布式系统中不可缺少的一个环节,是将不同的分布式组件有机结合起来的关键所在。对于一个在多台机器上部署运行的应用而言,通常需要一个协调者(Coordinator)来控制整个系统的运行流程, 例如分布式事务的处理、机器间的互相协调等。同时,引入这样一个协调者,便千将分布式协调的职责从应用中分离出来,从而可以大大减少系统之间的耦合性,而且能够显著提高系统的可扩展性。

ZooKeeper 中特有的 Watcher 注册与异步通知机制,能够很好地实现分布式环境下不同机器, 甚至是不同系统之间的协调与通知,从而实现对数据变更的实时处理。基于 ZooKeeper 实现分布式协调与通知功能,通常的做法是不同的客户端都对 ZooKeeper 上同一个数据节点进行 Watcher 注册,监听数据节点的变化(包括数据节点本身及其子节点),如果数据节点发生变化,那么所有订阅的客户端都能够接收到相应的Watcher 通知,并做出相应的处理。

除了上面介绍的方法,分布式协调还有另外几种实现方式,具体如下:

  • 心跳检测机制。检测系统和被检测系统之间并不直接关联,而是通过 ZooKeeper 上某个节点进行关联的,大大减少了系统耦合。
  • 系统调度模式。某系统由控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台做的一些操作实际上是修改了 ZooKeeper 上某些节点的状态,而 ZooKeeper 就把这些状态变化通知给注册 Watcher 的客户端,即推送系统,令其做出相应的推送处理。
  • 工作汇报模式。一些任务分发系统在子任务启动后,会到 ZooKeeper 中注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者
    就能够实时知道任务的进度。

总之,使用 ZooKeeper 来进行分布式通知与协调,能够大大降低系统之间的耦合。

MySQL 数据复制总线: Mysql_Replicator

MySQL 数据复制总线(以下简称“复制总线”)是一个实时数据复制框架,用于在不同的 MySQL 数据库实例之间进行异步数据复制和数据变化通知。整个系统是一个由 MySQL 数据库集群、消息队列系统、任务管理监控平台以及 ZooKeeper 集群等组件共同构成的一个包含数据生产者、复制管道和数据消费者等部分的数据总线系统。下图所示是该系统的整体结构图。

在该系统中,ZooKeeper 主要负责进行一系列的分布式协调工作,在具体的实现上,根据功能将数据复制组件划分为三个核心子模块:Core、Server 和 Monitor,每个模块分别为一个单独的进程,通过 ZooKeeper 进行数据交换。

  • Core 实现了数据复制的核心逻辑,其将数据复制封装成管道,并抽象出生产者和消费者两个概念,其中生产者通常是 MySQL 数据库的 Binlog 日志。
  • Server 负责启动和停止复制任务。
  • Monitor 负责监控任务的运行状态,如果在数据复制期间发生异常或出现故障会进行告警。

三个子模块之间的关系如图所示。

每个模块作为独立的进程运行在服务端,运行时的数据和配置信息均保存在 ZooKeeper 上,Web 控制台通过 ZooKeeper 上的数据获取到后台进程的数据,同时发布控制信息。

任务注册

Core 进程在启动的时候,首先会向/mysql_replicator/tasks节点(以下简称“任务列表节点”)注册任务。例如,对于一个“复制热门商品”的任务,Task 所在机器在启动的时候,会首先在任务列表节点上创建一个子节点,例如/mysql_replicator/tasks/copy_hot_item(以下简称“任务节点”),如下图所示。如果在注册过程中发现该子节点已经存在,说明已经有其他 Task 机器注册了该任务,因此自己不需要再创建该节点了。

任务热备份

为了应对复制任务故障或者复制任务所在主机故障,复制组件采用“热备份”的容灾方式,即将同一个复制任务部署在不同的主机上, 我们称这样的机器为“任务机器”,主备任务机器通过 ZooKeeper 互相检测运行健康状况。

为了实现上述热备方案,无论在第一步中是否创建了任务节点,每台任务机器都需要在/mysql_replicator/tasks/copy_hot_item/instances节点上将自己的主机名注册上去。注意,这里注册的节点类型很特殊,是一个临时的顺序节点。在注册完这个子节点后,通常一个完整的节点名如下:/mysql/replicator/tasks/copy_hot_item/instances/[Hostname]-[id],其中最后的序列号是通过临时顺序节点生成的。

在完成该子节点的创建后,每台任务机器都可以获取到自己创建的节点的完成节点名以及所有子节点的列表,然后通过对比判断自己是否是所有子节点中序号最小的。如果自已是序号最小的子节点,那么就将自己的运行状态设置为 RUNNING,其余的任务机器则将自己设置为 STANDBY —— 我们将这样的热备份策略称为“小序号优先”策略。

热备切换

完成运行状态的标识后,任务的客户端机器就能够正常工作了,其中标记为 RUNNING 的客户端机器进行正常的数据复制,而标记为 STANDBY 的客户端机器则进入待命状态。这里所谓待命状态,就是说一且标记为 RUNNING 的机器出现故障停止了任务执行,那么就需要在所有标记为 STANDBY 的客户端机器中再次按照“小序号优先”策略来选出 RUNNING 机器来执行,具体的做法就是标记为 STANDBY 的机器都需要在/mysql_replicator/tasks/copy_hot_item/instances节点上注册一个“子节点列表变更”的 Watcher
监听,用来订阅所有任务执行机器的变化情况 —— 一旦 RUNNTNG 机器宕机与 ZooKeeper 断开连接后,对应的节点就会消失,于是其他机器也就接收到了这个变更通知,从而开始新一轮的 RUNNING 选举。

记录执行状态

既然使用了热备份,那么 RUNNING 任务机器就需要将运行时的上下文状态保留给 STANDBY 任务机器。在这个场景中,最主要的上下文状态就是数据复制过程中的一些进度信息,例如 Binlog 日志的消费位点,因此需要将这些信息保存到 ZooKeeper 上以便共享。在Mysql_Replicator的设计中,选择了/mysql_replicator/tasks/copy_hot_item/lastCommit作为 Binlog 日志消费位点的存储节点,RUNNING 任务机器会定时向这个节点写入当前的 Binlog 日志消费位点。

控制台协调

在上文中我们主要讲解了 Core 组件是如何进行分布式任务协调的,接下来我们再看看 Server 是如何来管理 Core 组件的。在Mysql_Replicator中,Server 主要的工作就是进行任务的控制,通过 ZooKeeper 来对不同的任务进行控制与协调。Server 会将每个复制任务对应生产者的元数据,即库名、表名、用户名与密码等数据库信息以及消费者的相关信息以配置的形式写入任务节点/mysql_replicator/tasks/copy_hot_item 中去,以便该任务的所有任务机器都能够共享该复制任务的配置。

冷备切换

到目前为止我们已经基本了解了Mysql_ Replicator的工作原理,现在再回过头来看上面提到的热备份。在该热备份方案中,针对一个任务,都会至少分配两台任务机器来进行热备份,但是在一定规模的大型互联网公司中,往往有许多 MySQL 实例需要进行数据复制,每个数据库实例都会对应一个复制任务,如果每个任务都进行双机热备份的话,那么显然需要消耗太多的机器。

因此我们同时设计了一种冷备份的方案,它和热备份方案最大的不同点在于,对所有任务进行分组,如下图所示。

和热备份中比较大的区别在于,Core 进程被配置了所属 Group(组)。举个例子来说,假如一个 Core 进程被标记了 group1,那么在 Core 进程启动后,会到对应的 ZooKeeper group1 节点下面获取所有的 Task 列表,假如找到了任务”copy_hot_item” 之后,就会遍历这个 Task 列表的 instances 节点,但凡还没有子节点的,则会创建一个临时的顺序节点:/mysql_replicator/task-groups/group1/copy_hot_item/instances/[Hostname]-1 —— 当然,在这个过程中,其他 Core 进程也会在这个 instances 节点下创建类似的子节点。和热备份中的“小序号优先”策略一样,顺序小的 Core 进程将自己标记为 RUNNING,不同之处在于,其他 Core 进程则会自动将自己创建的子节点删除,然后继续遍历下一个
Task 节点 一一 我们将这样的过程称为“冷备份扫描”。就这样,所有 Core 进程在一个扫描周期内不断地对相应的 Group 下面的 Task 进行冷备份扫描。

一种通用的分布式系统机器间通信方式

在绝大部分的分布式系统中,系统机器间的通信无外乎心跳检测、工作进度汇报和系统调度这三种类型。接下来,我们将围绕这三种类型的机器通信来讲解如何基于 ZooKeeper 去实现一种分布式系统间的通信方式。

心跳检测

机器间的心跳检测机制是指在分布式环境中,不同机器之间需要检测到彼此是否在正常运行,例如 A 机器需要知道 B 机器是否正常运行。在传统的开发中,我们通常是通过主机之间是否可以相互 PING 通来判断,更复杂一点的话, 则会通过在机器之间建立长连接,通过 TCP 连接固有的心跳检测机制来实现上层机器的心跳检测,这些确实都是一些非常常见的心跳检测方法。

基于 ZooKeeper 的临时节点特性,可以让不同的机器都在 ZooKeeper 的一个指定节点下创建临时子节点,不同的机器之间可以根据这个临时节点来判断对应的客户端机器是否存活。通过这种方式,检测系统和被检测系统之间并不需要直接相关联,而是通过 ZooKeeper 上的某个节点进行关联,大大减少了系统耦合。

工作进度汇报

在一个常见的任务分发系统中,通常任务被分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。这个时候就可以通过 ZooKeeper 来实现。在 ZooKeeper 上选择一个节点,每个任务客户端都在这个节点下面创建临时子节点,这样便可以实现两个功能:

  • 通过判断临时节点是否存在来确定任务机器是否存活
  • 各个任务机器会实时地将自己的任务执行进度写到这个临时节点上去,以便中心系统能够实时地获取到任务的执行进度。

系统调度

使用 ZooKeeper,能够实现另一种系统调度模式: 一个分布式系统由控制台和一些客户端系统两部分组成,控制台的职责就是需要将一些指令信息发送给所有的客户端,以控制它们进行相应的业务逻辑。后台管理人员在控制台上做的一些操作,实际上就是修改了 ZooKeeper 上某些节点的数据,而 ZooKeeper 进一步把这些数据变更以事件通知的形式发送给了对应的订阅客户端。

总之,使用 ZooKeeper 来实现分布式系统机器间的通信,不仅能省去大量底层网络通信和协议设计上重复的工作,更为重要的一点是大大降低了系统之间的耦合,能够非常方便地实现异构系统之间的灵活通信。

集群管理

随着分布式系统规模的日益扩大,集群中的机器规模也随之变大,因此,如何更好地进行集群管理也显得越来越重要了。

所谓集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制。在日常开发和运维过程中,我们经常会有类似于如下的需求。

  • 希望知道当前集群中究竟有多少机器在工作。
  • 对集群中每台机器的运行时状态进行数据收集。
  • 对集群中机器进行上下线操作。

在传统的基于 Agent 的分布式集群管理体系中, 都是通过在集群中的每台机器上部署一个 Agent,由这个 Agent 负责主动向指定的一个监控中心系统(监控中心系统负责将所有数据进行集中处理,形成一系列报表,并负责实时报警,以下简称“监控中心”)汇报自己所在机器的状态。在集群规模适中的场景下,这确实是一种在生产实践中广泛使用的解决方案,能够快速有效地实现分布式环境集群监控,但是一且系统的业务场景增
多,集群规模变大之后,该解决方案的弊端也就显现出来了。

大规模升级困难

以客户端形式存在的 Agent,在大规模使用后,一且遇上需要大规模升级的情况,就非常麻烦,在升级成本和升级进度的控制上面临巨大的挑战。

统一的 Agent 无法满足多样的需求

对于机器的 CPU 使用率、负载(Load)、内存使用率、网络吞吐以及磁盘容址等机器基本的物理状态,使用统一的 Agent 来进行监控或许都可以满足。但是,如果需要深入应用内部,对一些业务状态进行监控,例如,在一个分布式消息中间件中,希望监控到每个消费者对消息的消费状态;或者在一个分布式任务调度系统中, 需要对每个机器上任务的执行情况进行监控。很显然,对干这些业务耦合紧密的监控需求,不适合由一个统一的 Agent 来提供。

编程语言多样性

随着越来越多编程语言的出现,各种异构系统层出不穷。如果使用传统的 Agent 方式,那么需要提供各种语言的 Agent 客户端。另一方面,”监控中心” 在对异构系统的数据进行整合上面临巨大挑战。

ZooKeeper 具有以下两大特性

  • 客户端如果对 ZooKeeper 的一个数据节点注册 Watcher 监听,那么当该数据节点的内容或是其子节点列表发生变更时,ZooKeeper 服务器就会向订阅的客户端发送变更通知。
  • 对在 ZooKeeper 上创建的临时节点,一旦客户端与服务器之间的会话失效,那么该临时节点也就被自动清除。

利用 ZooKeeper 的这两大特性,就可以实现另一种集群机器存活性监控的系统。例如,监控系统在/clusterServers节点上注册一个 Watcher 监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[Hostname]。这样一来,监控系统就能够实时检测到机器的变动情况,至于后续处理就是监控系统的业务了。

分布式日志收集系统

分布式日志收集系统的核心工作就是收集分布在不同机器上的系统日志,在这里我们重点来看分布式日志系统(以下简称“日志系统”)的收集器模块。

在一个典型的日志系统的架构设计中,整个日志系统会把所有需要收集的日志机器(下文我们以“日志源机器”代表此类机器)分为多个组别,每个组别对应一个收集器,这个收集器其实就是一个后台机器(下文我们以“收集器机器”代表此类机器),用于收集日志。对于大规模的分布式日志收集系统场景,通常需要解决如下两个问题。

变化的日志源机器

在生产环境中,伴随着机器的变动,每个应用的机器几乎每天都是在变化的(机器硬件问题、扩容、机房迁移或是网络问题等都会导致一个应用的机器变化),也就是说每个组别中的日志源机器通常是在不断变化的。

变化的收集器机器

日志收集系统自身也会有机器的变更或扩容,于是会出现新的收集器机器加或是老的收集器机器退出的情况。

上面两个问题,无论是日志源机器还是收集器机器的变更,最终都归结为一点: 如何快速、合理、动态地为每个收集器分配对应的日志源机器,这也成为了整个日志系统正确稳定运转的前提,也是日志收集过程中最大的技术挑战之一。在这种情况下,引入 ZooKeeper 是个不错的选择,下面我们就来看Zoo Keeper 在这个场景中的使用。

注册收集器机器

使用 ZooKeeper 来进行日志系统收集器的注册,典型做法是在 ZooKeeper 上创建一个节点作为收集器的根节点,例如/logs/collector (下文我们以“收集器节点”代表该数据节点),每个收集器机器在启动的时候,都会在收集器节点下创建自己的节点,例如/logs/lcollector/[Hostname]

任务分发

待所有收集器机器都创建好自己对应的节点后,系统根据收集器节点下子节点的个数,将所有日志源机器分成对应的若干组,然后将分组后的机器列表分别写到这些收集器机器创建的子节点(例如logs/collector/host1)上去。这样一来,毎个收集器机器都能够从自己对应的收集器节点上获取日志源机器列表,进而开始进行日志收集工作。

状态汇报

完成收集器机器的注册以及任务分发后,我们还要考虑到这些机器随时都有挂掉的可能。因此,针对这个问题,我们需要有一个收集器的状态汇报机制:每个收集器机器在创建完自己的专属节点后,还需要在对应的子节点上创建一个状态子节点,例如/logs/collector/host1/status,每个收集器机器都需要定期向该节点写入自己的状态信息。我们可以把这种策略看作是一种心跳检测机制,通常收集器机器都会在这个节点中写入日志收集进度信息。日志系统根据该状态子节点的最后更新时间来判断对应的收集器机器是否存活。

动态分配

如果收集器机器挂掉或是扩容了,就需要动态地进行收集任务的分配。在运行过程中,日志系统始终关注着/logs/collector这个节点下所有子节点的变更,一旦检测到有收集器机器停止汇报或是有新的收集器机器加入,就要开始进行任务的重新分配。无论是针对收集器机器停止汇报还是新机器加入的情况,日志系统都需要将之前分配给该收集器的所有任务进行转移。为了解决这个问题,通常有两种做法。

全局动态分配

这是一种简单粗暴的做法,在出现收集器机器挂掉或是新机器加入的时候,日志系统需要根据新的收集器机器列表,立即对所有的日志源机器重新进行一次分组,然后将其分配给剩下的收集器机器。

局部动态分配

全局动态分配方式虽然策略简单,但是存在一个问题:一个或部分收集器机器的变更,就会导致全局动态任务的分配,影响面比较大,因此风险也就比较大。所谓局部动态分配,顾名思义就是在小范围内进行任务的动态分配。在这种策略中,每个收集器机器在汇报自己日志收集状态的同时,也会把自己的负载汇报上去。请注意,这里提到的负载并不仅仅只是简单地指机器 CPU 负载(Load),而是一个对当前收集器任务执行的综合评估,这个评估算法和 ZooKeeper 本身并没有太大的关系,这里不再赘述。

在这种策略中,如果一个收集器机器挂了,那么日志系统就会把之前分配给这个机器的任务重新分配到那些负载较低的机器上去。同样,如果有新的收集器机器加入,会从那些负载高的机器上转移部分任务给这个新加入的机器。

注意事项

在上面的介绍中,我们已经了解了 ZooKeeper 是如何协调一个分布式日志收集系统工作的,接下来再来看看一些细节问题。

节点类型

我们首先来看/logs/collector这个节点下面子节点的节点类型。在上面已经提到,logs/collector节点下面的所有子节点都代表了每个收集器机器,那么初步认为这些子节点必须选择临时节点,原因是日志系统可以根据这些临时节点来判断收集器机器的存活性。但是,同时还需要注意的一点是:在分布式日志收集这个场景中,收集器节点上还会存放所有已经分配给该收集器机器的日志源机器列表,如果只是简单地依靠 ZooKeeper 自身的临时节点机制,那么当一个收集器机器挂掉或是当这个收集器机器中断“心跳汇报”的时候,待该收集器节点的会话失效后,ZooKeeper 就会立即删除该节点,于是,记录在该节点上的所有日志源机器列表也就随之被清除掉了。

从上面的描述中可以知道,临时节点显然无法满足这里的业务需求,所以我们选择了使用持久节点来标识毎一个收集器机器,同时在这个持久节点下面分别创建logs/collector/[Hostname]/status节点来表征每一个收集器机器的状态。这样一来,既能实现日志系统对所有收集器的监控,同时在收集器机器挂掉后,依然能够准确地将分配于其中的任务还原。

日志系统节点监听

在实际生产运行过程中,每一个收集器机器更改自己状态节点的频率可能非常高(如每秒1次或更短),而且收集器的数量可能非常大,如果日志系统监听所有这些节点变化,那么通知的消息量可能会非常大。另一方面,在收集器机器正常工作的情况下,日志系统没有必要去实时地接收每次节点状态变更,因此大部分这些状态变更通知都是无用的。因此我们考虑放弃监听设置,而是采用日志系统主动轮询收集器节点的策略,这样就节省了不少网卡流量,唯一的缺陷就是有一定的延时(考虑到分布式日志收集系统的定位,这个延时是可以接受的)。

在线云主机管理

在线云主机管理通常出现在那些虚拟主机提供商的应用场景中。在这类集群管理中,有很重要的一块就是集群机器的监控。这个场景通常对于集群中的机器状态,尤其是机器在线率的统计有较高的要求,同时需要能够快速地对集群中机器的变更做岀响应。

在传统的实现方案中,监控系统通过某种手段(比如检测主机的指定端口)来对每台机器进行定时检测,或者每台机器自己定时向监控系统汇报“我还活着”。但是这种方式需要毎一个业务系统的开发人员自己来处理网络通信、协议设计、调度和容灾等诸多琐碎的问题。下面来看看使用 ZooKeeper 实现的另一种集群机器存活性监控系统。针对这个系统,我们的需求点通常如下。

  • 如何快速地统计出当前生产环境一共有多少台机器?
  • 如何快速地获取到机器上/下线的情况?
  • 如何实时监控集群中每台主机的运行时状态?

机器上/下线

为了实现自动化的线上运维,我们必须对机器的上/下线情况有一个全局的监控。通常在新增机器的时候,需要首先将指定的 Agent 部署到这些机器上去。Agent 部署启动之后,会首先向 ZooKeeper 的指定节点进行注册,具体的做法就是在机器列表节点下面创建一个临时子节点,例如/XAE/machine/[Hostname](下文我们以“主机节点”代表这个节点),如下图所示。

当 Agent 在 ZooKeeper 上创建完这个临时子节点后,对/XAE/machines节点关注的监控中心就会接收到“子节点变更”事件,即上线通知,于是就可以对这个新加入的机器开启相应的后台管理逻辑。另一方面,监控中心同样可以获取到机器下线的通知,这样便实现了对机器上/下线的检测,同时能够很容易地获取到在线的机器列表,对于大规模的扩容和容量评估都有很大的帮助。

机器监控对于一个在线云主机系统,不仅要对机器的在线状态进行检测,还需要对机器的运行时状态进行监控。在运行的过程中,Agent 会定时将主机的运行状态信息写入 ZooKeeper上的主机节点,监控中心通过订阅这些节点的数据变更通知来间接地获取主机的运行时信息。

随着分布式系统规模变得越来越庞大,对集群机器的监控和管理显得越来越重要。上面提到的这种借助 ZooKeeper 来实现的方式,不仅能够实时地检测到集群中机器的上/下线情况,而且能够实时地获取到主机的运行时信息,从而能够构建岀一个大规模集群的主机图谱。

Master选举

Master 选举是一个在分布式系统中非常常见的应用场景。分布式最核心的特性就是能够将具有独立计算能力的系统单元部署在不同的机器上,构成一个完整的分布式系统。而与此同时,实际场景中往往也需要在这些分布在不同机器上的独立系统单元中选出一个所谓的“老大”,在计算机科学中,我们称之为 Master。

在分布式系统中,Master 往往用来协调集群中其他系统单元,具有对分布式系统状态变更的决定权。例如,在一些读写分离的应用场景中,客户端的写请求往往是由 Master 来处理的;而在另一些场景中,Master 则常常负责处理一些复杂的逻辑,并将处理结果同步给集群中其他系统单元。Master 选举可以说是 ZooKeeper 最典型的应用场景了,

在分布式环境中,经常会碰到这样的应用场景:集群中的所有系统单元需要对前端业务提供数据,比如一个商品 ID,或者是一个网站轮播广告的广告 ID(通常出现在一些广告投放系统中)等,而这些商品 ID 或是广告 ID 往往需要从一系列的海量数据处理中计算得到 —— 这通常是一个非常耗费 IO 和 CPU 资源的过程。鉴于该计算过程的复杂性,如果让集群中的所有机器都执行这个计算逻辑的话,那么将耗费非常多的资源。一种比较好的方法就是只让集群中的部分,甚至只让其中的一台机器去处理数据计算,一旦计算出数据结果,就可以共享给整个集群中的其他所有客户端机器,这样可以大大减少重复劳动,提升性能。

这里我们以一个简单的广告投放系统后台场景为例来讲解这个模型。整个系统大体上可以分成客户端集群、分布式缓存系统、海量数据处理总线和 ZooKeeper 四个部分,如下图所示。

首先我们来看整个系统的运行机制。上图中的 Client 集群每天定时会通过 ZooKeeper 来实现 Master 选举。选举产生 Master 客户端之后,这个 Master 就会负责进行一系列的海量数据处理,最终计算得到一个数据结果,并将其放置在一个内存数据库中。同时,Master 还需要通知集群中其他所有的客户端从这个内存/数据库中共享计算结果。

接下去,我们将重点来看 Master 选举的过程,首先来明确下 Master 选举的需求:在集群的所有机器中选举出一台机器作为 Master。针对这个需求,通常情况下,我们可以选择常见的关系型数据库中的主键特性来实现:集群中的所有机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮助我们自动进行主键冲突检査,也就是说,所有进行插入操作的客户端机器中,只有一台机器能够成功 —— 那么,我们就认为向数据库中成功插入数据的客户端机器成为 Master。

乍一看,这个方案确实可行,依靠关系型数据库的主键特性能够很好地保证在集群中选举出唯一的一个 Master。但是我们需要考虑的另一个问题是,如果当前选举出的 Master 挂了,那么该如何处理?谁来告诉我 Master 挂了呢?显然,关系型数据库没法通知我们这个事件。那么,如果使用 ZooKeeper 是否可以做到这一点呢?

ZooKeeper 创建节点的 API 接口,其中提到的一个重要特性便是:利用 ZooKeeper 的强一致性,能够很好地保证在分布式高并发情况下节点的创建定能够保证全局唯一性,即 ZooKeeper 将会保证客户端无法重复创建一个已经存在的数据节点。也就是说,如果同时有多个客户端请求创建同一个节点,那么最终一定只有个客户端请求能够创建成功。利用这个特性,就能很容易地在分布式环境中进行 Master 选举了。

在这个系统中,首先会在 ZooKeeper 上创建一个日期节点,例如“2013-09-20”,

客户端集群每天都会定时往 ZooKeeper 上创建一个临时节点,例如/master/election/2013-09-20/binding。在这个过程中,只有一个客户端能够成功创建这个节点,那么这个客户端所在的机器就成为了 Master。同时,其他没有在 ZooKeeper 上成功创建节点的客户端,都会在节点/master_election/2013-09-20上注册一个子节点变更的 Watcher,用于监控当前的 Master 机器是否存活,一旦发现当前的 Master 挂了,那么其余的客户端将会重新进行 Master 选举。

从上面的讲解中,我们可以看到,如果仅仅只是想实现 Master 选举的话,那么其实只需要有一个能够保证数据唯一性的组件即可,例如关系型数据库的主键模型就是非常不错的选择。但是,如果希望能够快速地进行集群 Master 动态选举,那么基于 ZooKeeper 来实现是一个不错的新思路。

分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要通过一些互斥手段来防止彼此之间的干扰,以保证一致性,在这种情况下,就需要使用分布式锁了。

在平时的实际项目开发中,我们往往很少会去在意分布式锁,而是依赖于关系型数据库固有的排他性来实现不同进程之间的互斥。这确实是一种非常简便且被广泛使用的分布式锁实现方式。然而有一个不争的事实是,目前绝大多数大型分布式系统的性能瓶颈都集中在数据库操作上。因此,如果上层业务再给数据库添加一些额外的锁,例如行锁、表锁甚至是繁重的事务处理,那么是不是会让数据库更加不堪重负呢?

排他锁

排他锁(Exclusive Locks,简称 X 锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务 T1 对数据对象 O1 加上了排他锁,那么在整个加锁期间,只允许事务 T1 对 O1 进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作一直到 T1 释放了排他锁。

从上面讲解的排他锁的基本概念中,我们可以看到,排他锁的核心是如何保证当前有且仅有一个事务获得锁,并且锁被释放后,所有正在等待获取锁的事务都能够被通知到。

定义锁

在通常的 Java 开发编程中,有两种常见的方式可以用来定义锁,分别是 synchronized 机制和 ReentrantLock。然而,在 ZooKeeper 中,没有类似于这样的 API 可以直接使用,而是通过 ZooKeeper 上的数据节点来表示一个锁,例如/exclusive_lock/lock节点就可以被定义为一个锁,如下图所示。

获取锁

在需要获取排他锁时,所有的客户端都会试图通过调用create()接口,在/exclusive_lock节点下创建临时子节点/exclusive_lock/lock。在前面几节中我们也介绍了,ZooKeeper 会保证在所有的客户端中,最终只有一个客户端能够创建成功,那么就可以认为该客户端获取了锁。同时,所有没有获取到锁的客户端就需要到/exclusive_lock节点上注册一个子节点变更的 Watcher 监听,以便实时监听到 lock 节点的变更情况。

释放锁

在“定义锁”部分,我们已经提到,/exclusive_lock/lock是一个临时节点,因此在以下两种情况下,都有可能释放锁。

  • 当前获取锁的客户端机器发生宕机,那么 ZooKeeper 上的这个临时节点就会被移除。
  • 正常执行完业务逻辑后,客户端就会主动将自己创建的临时节点删除。

无论在什么情况下移除了 lock 节点,ZooKeeper 都会通知所有在/exclusive_lock节点上注册了子节点变更 Watcher 监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取,即重复“获取锁”过程。整个排他锁的获取和释放流程,如下图所示。

共享锁

共享锁(Shared Locks,简称 S 锁),又称为读锁,同样是一种基本的锁类型。如果事务 T1 对数据对象 O1 加上了共享锁,那么当前事务只能对 O1 进行读取操作,其他事务也只能对这个数据对象加共享锁 —— 直到该数据对象上的所有共享锁都被释放。

共享锁和排他锁最根本的区别在于,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对所有事务都可见。

一般实现

定义锁

和排他锁一样,同样是通过 ZooKeeper 上的数据节点来表示一个锁,是一个类似于/shared_lock/[Hostname}-请求类型-请求序号的临时顺序节点,例如/shared_lock/192.168.0.1-R-000000001,那么,这个节点就代表了一个共享锁,如下图所示。

获取锁

在需要获取共享锁时,所有客户端都会到/shared_lock这个节点下面创建一个临时顺序节点,如果当前是读请求,那么就创建例如/shared_lock/192.168.0.1-R-000000006的节点;如果是写请求,那么就创建例如/shared_lock/192.168.0.1-W-000000007的节点。

判断读写顺序

根据共享锁的定义,不同的事务都可以同时对同一个数据对象进行读取操作,而更新操作必须在当前没有任何事务进行读写操作的情况下进行。基于这个原则,我们来看看如何通过 ZooKeeper 的节点来确定分布式读写顺序,大致可以分为如下 4 个步骤。

  1. 创建完节点后,获取/shared_lock节点下的所有子节点,并对该节点注册子节点变更的 Watcher 监听。

  2. 确定自己的节点序号在所有子节点中的顺序。

  3. 节点处理

    对于读请求:如果没有比自己序号小的子节点,或是所有比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑。如果比自己序号小的子节点中有写请求,那么就需要进入等待。

    对于写请求:如果自己不是序号最小的子节点,那么就需要进入等待。

  4. 接收到 Watcher 通知后,重复步骤 1。

释放锁

释放锁的逻辑和排他锁是一致的。

羊群效应

上面讲解的这个共享锁实现,大体上能够满足一般的分布式集群竞争锁的需求,并且性能都还可以 —— 这里说的一般场景是指集群规模不是特别大,一般是在 10 台机器以内。但是如果机器规模扩大之后,会有什么问题呢?我们着重来看上面“判断读写顺序”过程的步骤 3,结合图给出的实例,看看实际运行中的情况。

针对上图中的实际情况,我们看看会发生什么事情。

  1. 192.168.0.1 这台机器首先进行读操作,完成读操作后将节点/192.168.0.1-R-0000000001删除。
  2. 余下的 4 台机器均收到了这个节点被移除的通知,然后重新从/shared_lock节点上获取一份新的子节点列表。
  3. 每个机器判断自己的读写顺序。其中 192.168.0.2 这台机器检测到自己已经是序号最小的机器了,于是开始进行写操作,而余下的其他机器发现没有轮到自己进行读取或更新操作,于是继续等待。
  4. 继续上面这个过程就是共享锁在实际运行中最主要的步骤了,我们着重看下上面步骤 3 中提到的:“而余下的其他机器发现没有轮到自己进行读取或更新操作,于是继续等待。”很明显,我们看到,192.168.0.1这个客户端在移除自己的共享锁后,ZooKeeper 发送了子节点变更 Watcher 通知给所有机器,然而这个通知除了给 192.168.0.2 这台机器产生实际影响外,对于余下的其他所有机器都没有任何作用。

相信读者也已经意识到了,在这整个分布式锁的竞争过程中,大量的“Watcher通知”和“子节点列表获取”两个操作重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知 —— 这个看起来显然不怎么科学。客户端无端地接收到过多和自己并不相关的事件通知,如果在集群规模比较大的情况下,不仅会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击,更为严重的是,如果同一时间有多个节点对应的客户端完成事务或是事务中断引起节点消失,ZooKeeper 服务器就会在短时间内向其余客户端发送大量的事件通知 —— 这就是所谓的羊群效应。

上面这个 ZooKeeper 分布式共享锁实现中出现羊群效应的根源在于,没有找准客户端真正的关注点。我们再来回顾一下上面的分布式锁竞争过程,它的核心逻辑在于:判断自己是否是所有子节点中序号最小的。于是,很容易可以联想到,每个节点对应的客户端只需要关注比自己序号小的那个相关节点的变更情况就可以了—而不需要关注全局的子列表变更情况。

改进的实现

现在我们来看看如何改进上面的分布式锁实现。首先,我们需要肯定的一点是,上面提到的共享锁实现,从整体思路上来说完全正确。这里主要的改动在于:每个锁竞争者,只需要关注/shared_lock节点下序号比自己小的那个节点是否存在即可,具体实现如下。

  1. 客户端调用create()方法创建一个类似于/shared_lock/[Hostname]-请求类型-序号的临时顺序节点。

  2. 客户端调用getChildren()接口来获取所有已经创建的子节点列表,注意,这里不注册任何 Watcher

  3. 如果无法获取共享锁,那么就调用exist()来对比自己小的那个节点注册 Watcher。注意,这里“比自己小的节点”只是一个笼统的说法,具体对于读请求和写请求不一样

    读请求:向比自己序号小的最后一个写请求节点注册 Watcher 监听。
    写请求:向比自己序号小的最后一个节点注册 Watcher监听。

  4. 等待 Watcher 通知,继续进入步骤 2。

改进后的分布式锁流程如下图所示。

分布式队列

业界有不少分布式队列产品,不过绝大多数都是类似于 ActiveMQ、Metamorphosis、Kafka 和 HornetQ 等的消息中间件(或称为消息队列)。在本节中,我们主要介绍基于 ZooKeeper 实现的分布式队列。分布式队列,简单地讲分为两大类,一种是常规的先入先出队列,另一种则是要等到队列元素集聚之后才统一安排执行的 Barrier 模型。

FIFO:先入先出

FIFO(First Input First Output,先入先出)的算法思想,以其简单明了的特点,广泛应用于计算机科学的各个方面。而 FIFO 队列也是一种非常典型且应用广泛的按序执行的队列模型:先进入队列的请求操作先完成后,才会开始处理后面的请求。

使用 ZooKeeper 实现 FIFO 队列,和共享锁的实现非常类似。FIFO 队列就类似于一个全写的共享锁模型,大体的设计思路其实非常简单:所有客户端都会到/queue_fifo这个节点下面创建一个临时顺序节点,例如/queue_fifo/192.168.01-000000001,如下图所示。

创建完节点之后,根据如下 4 个步骤来确定执行顺序。

  1. 通过调用getChildren()接口来获取/queue_fifo节点下的所有子节点,即获取队列中所有的元素。
  2. 确定自己的节点序号在所有子节点中的顺序。
  3. 如果自己不是序号最小的子节点,那么就需要进入等待,同时向比自己序号小的最后一个节点注册 Watcher 监听。
  4. 接收到 Watcher 通知后,重复步骤 1。

整个 FIFO 队列的工作流程,可以用下图来表示。

Barrier:分布式屏障

Barrier 原意是指障碍物、屏障,而在分布式系统中,特指系统之间的一个协调条件,规定了一个队列的元素必须都集聚后才能统一进行安排,否则一直等待。这往往出现在那些大规模分布式并行计算的应用场景上:最终的合并计算需要基于很多并行计算的子结果来进行。这些队列其实是在 FIFO 队列的基础上进行了增强,大致的设计思想如下开始时,/queue_barrier节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字 n 来代表 Barrier 值,例如n = 10表示只有当/queue_barrier节点下的子节点个数达到 10 后,才会打开 Barrier。之后,所有的客户端都会到/queue_barrier节点下创建一个临时节点,例如/queue_barrier/192.168.0.1,如下图所示。

创建完节点之后,根据如下 5 个步骤来确定执行顺序。

  1. 通过调用getData()接口获取/queue_barrier节点的数据内容:10。
  2. 通过调用getchildren()接口获取/queue_barrier节点下的所有子节点,即获取队列中的所有元素,同时注册对子节点列表变更的 Watcher 监听。
  3. 统计子节点的个数。
  4. 如果子节点个数还不足 10 个,那么就需要进入等待。
  5. 接收到 Watcher 通知后,重复步骤 2。

整个 Barrier 队列的工作流程,可以用下图来表示。

在大型分布式系统中的应用

Hadoop

Hadoop 是 Apache 开源的一个大型分布式计算框架,由 Lucene 创始人 Doug Cutting 牵头创建,其定义了一种能够开发和运行处理海量数据的软件规范,用来实现一个在大规模集群中对海量数据进行分布式计算的软件平台。Hadoop 的核心是 HDFS 和 MapReduce 分别提供了对海量数据的存储和计算能力,自 0.23.0 版本开始,Hadoop 又引入了全新一代 MapReduce 框架 YARN。

在海量数据存储及处理领域,Hadoop 是目前业界公认的最成熟也是最卓越的开源解决方案。本书不会去过多地介绍 Hadoop 技术本身,感兴趣的读者可以访问 Hadoop 的官方网站了解更多关于这一分布式计算框架的内容。本书主要讨论 ZooKeeper 在 Hadoop 中的使用场景。

在 Hadoop 中,ZooKeeper 主要用于实现 HA(High Availability),这部分逻辑主要集中在 Hadoop Common 的 HA 模块中,HDFS 的 NameNode 与 YARN 的 ResourceManager 都是基于此 HA 模块来实现自己的 HA 功能的。同时,在 YARN 中又特别提供了 ZooKeeper 来存储应用的运行状态。下面将以 Cloudera 的 5.0 发布版本为例,围绕 YARN 中 ZooKeeper 的使用场景来讲解。

YARN介绍

YARN 是 Hadoop 为了提高计算节点 Master(JT)的扩展性,同时为了支持多计算模型和提供资源的细粒度调度而引入的全新一代分布式调度框架。其上可以支持 MapReduce 计算引擎,也支持其他的一些计算引擎,如 Tez、Spark、Storm、Imlala 和 Open MPI 等。

其架构体系如下图所示。YARN 主要由 ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)和 Container 四部分组成。其中最为核心的就是 ResourceManager,它作为全局的资源管理器,负责整个系统的资源管理和分配。

Resource Manager

单点问题看完 YARN 的架构体系之后,相信细心的读者也已经看出了上述架构体系中存在的一个明显的缺陷:ResourceManager 的单点问题。ResourceManager 是 YARN 中非常复杂的个组件,负责集群中所有资源的统一管理和分配,同时接收来自各个节点(NodeManager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序(Application Manager),其内部维护了各个应用程序的 Appliction Master 信息、NodeManager 信息以及资源使用信息等。因此,ResourceManager 的工作状况直接决定了整个 YARN 框架是否可以正常运转。

Resource Manager HA

为了解决 ResourceManager 的这个单点问题,YARN 设计了一套 Active/Standby 模式的 ResourceManager HA 架构,如下图所示。

从图中可以看出,在运行期间,会有多个 ResourceManager 并存,并且其中只有个 ResourceManager 处于 Active 状态,另外的一些(允许一个或者多个)则是处于 Standby 状态,当 Active 节点无法正常工作(如机器挂掉或重启等)时,其余处于 Standby 状态的节点则会通过竞争选举产生新的 Active 节点。

主备切换

下面我们就来看看 YARN 是如何实现多个 ResourceManager 之间的主备切换的。ResourceManager 使用基于 ZooKeeper 实现的 ActiveStandbyElector 组件来确定 ResourceManager 的状态:Active 或 Standby。具体做法如下。

  1. 创建锁节点

    在 ZooKeeper 上会有一个类似于/yarn-leader-election/pseudo-yarn-rm-cluster的锁节点,所有的 ResourceManager 在启动的时候,都会去竞争写一个 Lock 子节点:/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock,同时需要注意的是,该子节点的类型是临时节点。ZooKeeper 能够为我们保证最终只有一个 ResourceManager 能够创建成功。创建成功的那个 ResourceManager 就切换为 Active 状态,没有成功的那些 ResourceManager 则切换为 Standby 状态。

  2. 注册 Watcher 监听

    所有 Standby 状态的 ResourceManager 都会向/yarn-leader-election/pseudo-yarn-rm-cluster/ActiveStandbyElectorLock节点注册一个节点变更的 Watcher 监听,利用临时节点的特性,能够快速感知到 Active 状态的 ResourceManager 的运行情况。

  3. 主备切换

    当 Active 状态的 ResourceManager 出现诸如重启或挂掉的异常情况时,其在 ZooKeeper 上创建的 Lock 节点也会随之被删除。此时其余各个 Standby 状态的 ResourceManager 都会接收到来自 ZooKeeper 服务端的 Watcher 事件通知,然后会重复进行步骤 1 的操作。

以上就是利用 ZooKeeper 来实现 ResourceManager 的主备切换的过程。ActiveStandbyElector 组件位于 Hadoop-Common 工程的 org.apache.hadoop.ha 包中,其封装了 ResourceManager 和 ZooKeeper 之间的通信与交互过程。

HDFS 中的 NameNode 和 ResourceManager 模块都是使用该组件来实现各自的 HA 的,感兴趣的读者可以结合其源代码做进一步的详细了解。

Fencing(隔离)

在分布式环境中,经常会出现诸如单机“假死”的情况。所谓的“假死”是指机器由于网络闪断或是其自身由于负载过高(常见的有 GC 占用时间过长或 CPU 的负载过高等)而导致无法正常地对外进行及时响应。在上述主备切换过程中,我们假设 RM 集群由 ResourceManager1 和 ResourceManager2 两台机器组成,且 ResourceManager1 为 Active 状态,ResourceManager2 为 Standby 状态。某一时刻,ResourceManager1 发生了“假死”现象,此时 ZooKeeper 认为 ResourceManager 挂了,根据上述主备切换逻辑,ResourceManager2 就会成为 Active状态。但是在随后,ResourceManagerI恢复了正常,其依然认为自己还处于 Active 状态。这就是我们常说的分布式“脑裂”(Brain-Split)现象,即存在了多个处于 Active 状态的 ResourceManager 各司其职。那么该如何解决这样的问题呢?

YARN 中引入了 Fencing 机制,借助 ZooKeeper 数据节点的 ACL 权限控制机制来实现不同 RM 之间的隔离。具体做法其实非常简单,在上文的“主备切换”部分中我们讲到,多个 RM 之间通过竞争创建锁节点来实现主备状态的确定。这个地方需要改进的一点是,创建的根节点必须携带 ZooKeeper 的 ACL 信息,目的是为了独占该根节点,以防止其他 RM 对该节点进行更新。

经过上述改进后,我们再回过头来看,在主备切换过程中,Fencing 机制是如何避免“脑裂”现象出现的。延续上述提到的实例,RM1 出现假死后,ZooKeeper 就会将其创建的锁节点移除掉,此时 RM2 会创建相应的锁节点,并切换为 Active 状态。RM 恢复之后,会试图去更新 ZooKeeper 的相关数据,但是此时发现其没有权限更新 ZooKeeper 的相关节点数据,也就是说,RMI 发现 ZooKeeper 上的相关节点不是自己创建的,于是就自动切换为 Standby 状态,这样就避免了“脑裂”现象的出现。

ResourceManager状态存储

在 ResourceManager 中,RMStateStore 能够存储一些 RM 的内部状态信息,包括 Application 以及它们的 Attempts 信息、Delegation Token 及 Version Information 等。需要注意的是,RMState Store 中的绝大多数状态信息都是不需要持久化存储的,因为很容易从上下文信息中将其重构出来,如资源的使用情况。在存储的设计方案中,提供了三种可能的实现,分别如下。

  • 基于内存实现,一般是用于日常开发测试。
  • 基于文件系统的实现,如 HDFS。
  • 基于 ZooKeeper 的实现。

由于这些状态信息的数据量都不是特别大,因此 Hadoop 官方建议基于 ZooKeeper 来实现状态信息的存储。在 ZooKeeper 上,ResourceManager 的状态信息都被存储在/restore这个根节点下面,其数据节点的组织结构如下图所示。

通过上图我们可以大致了解 RMStateStore 状态信息在 ZooKeeper 上的存储结构,其中 RMAppRoot 节点下存储的是与各个 Application 相关的信息,RMDTSecretManagerRoot 存储的是与安全相关的 Token 等信息。每个 Active 状态的 ResourceManager 在初始化阶段都会从 ZooKeeper 上读取到这些状态信息,并根据这些状态信息继续进行相应的处理。

HBase

HBase,全称 Hadoop Database,是 Google Bigtable 的开源实现,是一个基于 Hadoop 文件系统设计的面向海量数据的高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用 HBase 技术可以在廉价的 PC 服务器上搭建起大规模结构化的存储集群。

与大部分分布式 NOSQL 数据库不同的是,HBase 针对数据写入具有强一致性的特性,甚至包括索引列也都实现了强一致性,因此受到了很多互联网企业的青睞。根据公开报道的数据,Facebook 和阿里集团都分别拥有数千台的 HBase 服务器,存储和使用了数以 PB 计的在线数据。面对如此海量的数据以及如此大规模的服务器集群,如何更好地进行分布式状态协调成为了整个 HBase 系统正常运转的关键所在。

HBase 在实现上严格遵守了 Google BigTable 论文的设计思想。BigTable 使用 Chubby 来负责分布式状态的协调。Chubby 是 Google 实现的一种基于 Paxos 算法的分布式锁服务,而 HBase 则采用了开源的 ZooKeeper 服务来完成对整个系统的分布式协调工作。下图展示了整个 HBase 架构及其与 ZooKeeper 之间的结构关系。

从上图可以看到,在 HBase 的整个架构体系中,ZooKeeper 是串联起 HBase 集群与 Client 的关键所在。有趣的是,在 2009 年以前的 HBase 代码中,还看不到 ZooKeeper 的影子,因为当时 HBase 的定位是离线数据库。随着 HBase 逐步向在线分布式存储方向发展,出现了一系列难以解决的问题,例如开发者发现如果有 RegionServer 服务器挂掉时,系统无法及时得知信息,客户端也无法知晓,因此服务难以快速迁移至其他 RegionServer 服务器上 —— 类似问题都是因为缺少相应的分布式协调组件,于是后来 ZooKeeper 被加入到 HBase 的技术体系中。直到今天,ZooKeeper 依然是 HBase 的核心组件,而且 ZooKeeper 在 HBase 中的应用场景范围也己经得到了进一步的拓展。下面我们从系统冗错、RootRegion 管理、Region 状态管理、分布式 SplitLog 任务管理和 Replication 管理五大方面来讲解 ZooKeeper 在 HBase 中的应用场景。

系统冗错

当 HBase 启动的时候,每个 RegionServer 服务器都会到 ZooKeeper 的/hbase/rs节点下创建一个信息节点(下文中,我们称该节点为“rs状态节点”),例如/hbase/rs/[Hostname],同时,HMaster 会对这个节点注册监听。当某个 RegionServer 挂掉的时候,ZooKeeper 会因为在一段时间内无法接收其心跳信息(即 Session 失效),而删除掉该 RegionServer 服务器对应的 rs 状态节点。与此同时,Master 则会接收到 ZooKeeper 的 NodeDelete 通知,从而感知到某个节点断开,并立即开始冗错工作 —— 在 HBase 的实现中,HMaster 会将该 RegionServer 所处理的数据分片(Region)重新路由到其他节点上,并记录到 Meta 信息中供客户端査询。

讲到这里,可能有的读者会发问:HBase 为什么不直接让 Master 来负责进行 RegionServer 的监控呢?HBase 之所以不使用 Master 直接通过心跳机制等来管理 RegionServer 状态,是因为在这种方式下,随着系统容量的不断增加,Master 的管理负担会越来越重,另外它自身也有挂掉的可能,因此数据还需要有持久化的必要。在这种情况下,ZooKeeper 就成为了理想的选择。

RootRegion管理

对于 HBase 集群来说,数据存储的位置信息是记录在元数据分片,也就是 RootRegion 上的。每次客户端发起新的请求,需要知道数据的位置,就会去查询 RootRegion,而 RootRegion 自身的位置则是记录在 ZooKeeper 上的(默认情况下,是记录在 ZooKeeper 的/hbase/root-region-server节点中)当 RootRegion 发生变化,比如 Region 的手工移动、Balance 或者是 RootRegion 所在服务器发生了故障等时,就能够通过 ZooKeeper 来感知到这一变化并做出一系列相应的容灾措施,从而保障客户端总是能拿到正确的 RootRegion 信息。

Region 状态管理 Region 是 HBase 中数据的物理切片,每个 Region 中记录了全局数据的一小部分,并且不同的 Region 之间的数据是相互不重复的。但对于一个分布式系统来说,Region 是会经常发生变更的,这些变更的原因来自于系统故障、负载均衡、配置修改、Region 分裂与合并等。一旦 Region 发生移动,它必然会经历 Offline 和重新 Online 的过程。

在 Offline 期间数据是不能被访问的,并且 Region 的这个状态变化必须让全局知晓,否则可能会出现某些事务性的异常。而对于 HBase 集群来说,Region 的数量可能会多达 10 万级别,甚至更多,因此这样规模的 Region 状态管理也只有依靠 ZooKeeper 这样的系统才能做到。

分布式SplitLog任务管理

当某台 RegionServer 服务器挂掉时,由于总有一部分新写入的数据还没有持久化到 HFile 中,因此在迁移该 RegionServer 的服务时,一个重要的工作就是从 HLog 中恢复这部分还在內存中的数据,而这部分工作最关键的一步就是 SplitLog,即 Master 需要遍历该 RegionServer 服务器的 HLog,并按 Region 切分成小块移动到新的地址下,并进行数据的 Replay。

由于单个 RegionServer 的日志量相对庞大(可能有数千个 Region,上 GB 的日志),而用户又往往希望系统能够快速完成日志的恢复工作。因此一个可行的方案是将这个处理 HLog 的任务分配给多台 RegionServer 服务器来共同处理,而这就又需要一个持久化组件来辅助 HMaster 完成任务的分配。当前的做法是,HMaster 会在 ZooKeeper 上创建一个 splitinglog 的节点(默认情况下,是/hbase/splitlog节点),将“哪个 RegionServer 处理哪个 Region”这样的信息以列表的形式存放到该节点上,然后由各个 RegionServer 服务器自行到该节点上去领取任务并在任务执行成功或失败后再更新该节点的信息,以通知 HMaster 继续进行后面的步骤。ZooKeeper 在这里担负起了分布式集群中相互通知和信息持久化的角色。

Replication管理

Replication 是实现 HBase 中主备集群间的实时同步的重要模块。有了 Replication,HBase 就能实现实时的主备同步,从而拥有了容灾和分流等关系型数据库才拥有的功能,从而大大加强了 HBase 的可用性,同时也拓展了其应用场景。和传统关系型数据库的 Replication 功能所不同的是,HBase 作为分布式系统,它的 Replication 是多对多的,且每个节点随时都有可能挂掉,因此在这样的场景下做 Replication 要比普通数据库复杂得多。

HBase 同样借助 ZooKeeper 来完成 Replication 功能。做法是在 ZooKeeper 上记录一个 replication 节点(默认情况下,是/base/replication节点),然后把不同的 RegionServer 服务器对应的 HLog 文件名称记录到相应的节点上,HMaster 集群会将新增的数据推送给 Slave 集群,并同时将推送信息记录到 ZooKeeper 上(我们将这个信息称为“断点记录”),然后再重复以上过程。当服务器挂掉时,由于 ZooKeeper 上己经保存了断点信息,因此只要有 HMaster 能够根据这些断点信息来协调用来推送 HLog 数据的主节点服务器,就可以继续复制了。

ZooKeeper部署

下面我们再来看下 HBase 中是如何进行 ZooKeeper 部署的。HBase 的启动脚本(hbase-emsh)中可以选择是由 HBase 启动其自带的默认 ZooKeeper,还是使用一个已有的外部 ZooKeeper 集群。一般的建议是使用第二种方式,因为这样就可以使得多个 HBase 集群复用同一套 ZooKeeper 集群,从而大大节省机器成本。当然,如果一个 ZooKeeper 集群需要被几个 HBase 复用的话,那么务必为每一个 HBase 集群明确指明对应的 ZooKeeper 根节点配置(对应的配置项是 zookeeper.znode.parent),以确保各个 HBase 集群间互不干扰。而对于 HBase 的客户端来说,只需要指明 ZooKeeper 的集群地址以及对应的 HBase 根节点配置即可,不需要任何其他配置。当 HBase 集群启动的时候,会在 ZooKeeper 上逐个添加相应的初始化节点,并在 HMaster 以及 RegionServer 进程中进行相应节点的 Watcher 注册。

小结

以上就是一些 HBase 系统中依赖 ZooKeeper 完成分布式协调功能的典型场景。但事实上,HBase 对于 ZooKeeper 的依赖还不止这些,比如 HMaster 依赖 ZooKeeper 来完成 ActiveMaster 的选举、BackupMaster 的实时接管、Table 的 enable/disable 状态记录,以及 HBase 中几乎所有的元数据存储都是放在 ZooKeeper 上的。有趣的是,HBase 甚至还通过 ZooKeeper 来实现 DrainingServer 这样的增强功能(相当于降级标志)。事实上,由于 ZooKeeper 出色的分布式协调能力以及良好的通知机制,HBase 在各版本的演进过程中越来越多地增加了 ZooKeeper 的应用场景,从趋势上来看两者的交集越来越多。HBase 中所有对 ZooKeeper 的操作都封装在了org.apache.hadoop.hbase.zookeeper这个包中,感兴趣的读者可以自行研究。

Kafka

Kafka 是知名社交网络公司 LinkedIn 于 2010 年 12 月份开源的分布式消息系统,主要由 Scala 语言开发,于 2012 年成为 Apache 的顶级项目,目前被广泛应用在包括 Twitter、Netflix 和 Tumblr 等在内的一系列大型互联网站点上。Kafka 主要用于实现低延迟的发送和收集大量的事件和日志数据 —— 这些数据通常都是活跃的数据。所谓活跃数据,在互联网大型的 Web 网站应用中非常常见,通常是指网站的 PV 数和用户访问记录等。这些数据通常以日志的形式记录下来,然后由一个专门的系统来进行日志的收集与统计。

Kafka 是一个吞吐量极高的分布式消息系统,其整体设计是典型的发布与订阅模式系统。在 Kafka 集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置更改的情况下实现服务器的添加与删除,同样,消息的生产者和消费者也能够做到随意重启和机器的上下线。Kafka 服务器及消息生产者和消费者之间的部署关系如图所示。

相关术语

  • 消息生产者,即 Producer,是消息产生的源头,负责生成消息并发送到 Kafka 服务器上。
  • 消息消费者,即 Consumer,是消息的使用方,负责消费 Kafka 服务器上的消息。
  • 主题,即 Topic,由用户定义并配置在 Kafka 服务端,用于建立生产者和消费者之间的订阅关系:生产者发送消息到指定 Topic 下,消费者从这个 Topic 下消费消息
  • 分区,即 Partition,一个 Topic 下面会分为多个分区,例如“kafka-test”这个 Topic 可以分为 10 个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供 5 个分区,假设服务器 ID 分别为 0 和 1,则所有分区为 0-0、0-1、0-2、0-3、0-4 和 1-0、1-1、1-2、1-3、1-4。消息分区机制和分区的数量与消费者的负载均衡机制有很大关系,后面将会重点展开讲解。
  • Broker,即 Kafka 的服务器,用于存储消息,在消息中间件中通常被称为 Broker。
  • 消费者分组,即 Group,用于归组同类消费者。在 Kafka 中,多个消费者可以共同消费一个 Topic 下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset,消息存储在 Kafka 的 Broker 上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的 Offset。

Broker注册

Kafka 是一个分布式的消息系统,这也体现在其 Broker、Producer 和 Consumer 的分布式部署上。虽然 Broker 是分布式部署并且相互之间是独立运行的,但还是需要有一个注册系统能够将整个集群中的 Broker 服务器都管理起来。在 Kafka 的设计中,选择了使用 ZooKeeper 来进行所有 Broker的管理。

在 ZooKeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点,下文中我们称之为”Broker节点”,其节点路径为/brokers/ids

每个 Broker 服务器在启动时,都会到 ZooKeeper 上进行注册,即到 Broker 节点下创建属于自己的节点,其节点路径为/brokers/ids/[0..N]

从上面的节点路径中,我们可以看出,在 Kafka 中,我们使用一个全局唯一的数字来指代每一个 Broker 服务器,可以称其为”Broker Id”,不同的 Broker 必须使用不同的 Broker ID 进行注册,例如/brokers/ids/1/brokers/ids/2分别代表了两个 Broker 服务器。创建完 Broker 节点后,毎个 Broker 就会将自己的 IP 地址和端口等信息写入到该节点中去。

请注意,Broker 创建的节点是一个临时节点,也就是说,一旦这个 Broker 服务器宕机或是下线后,那么对应的 Broker 节点也就被删除了。因此我们可以通过 ZooKeeper 上 Broker 节点的变化情况来动态表征 Broker 服务器的可用性。

Topic注册

在 Kafka 中,会将同一个 Topic 的消息分成多个分区并将其分布到多个 Broker 上,而这些分区信息以及与 Broker 的对应关系也都是由 ZooKeeper 维护的,由专门的节点来记录,其节点路径为/brokers/topics。下文中我们将这个节点称为”Topic节点”。Kafka 中的每一个 Topic,都会以/brokers/topics/[topIc]的形式记录在这个节点下,例如/brokers/topics/login/brokers/topics/search等。

Broker 服务器在启动后,会到对应的 Topic 节点下注册自己的 Broker Id,并写入针对该 Topic 的分区总数。例如,/brokers/topics/login/3→2这个节点表明 Broker Id 为 3 的一个 Broker 服务器,对于“login”这个 Topic 的消息,提供了 2 个分区进行消息存储。同样,这个分区数节点也是一个临时节点。

生产者负载均衡

在上面的内容中,我们讲解了 Kafka 是分布式部署 Broker 服务器的,会对同一个 Topic 的消息进行分区并将其分布到不同的 Broker 服务器上。因此,生产者需要将消息合理地发送到这些分布式的 Broker 上 —— 这就面临一个问题:如何进行生产者的负载均衡。对于生产者的负载均衡,Kafka 支持传统的四层负载均衡,同时也支持使用 ZooKeeper 方式来实现负载均衡,这里我们首先来看使用四层负载均衡的方案。

四层负载均衡

四层负载均衡方案在设计上比较简单,一般就是根据生产者的 IP 地址和端口来为其确定一个相关联的 Broker。通常一个生产者只会对应单个 Broker,然后该生产者生成的所有消息都发送给这个 Broker。从设计上,我们可以很容易发现这种方式的优缺点:好处是整体逻辑简单,不需要引入其他三方系统,同时毎个生产者也不需要同其他系统建立额外的 TCP 链接,只需要和 Broker 维护单个 TCP 链接即可。

但这种方案的弊端也是显而易见的,事实上该方案无法做到真正的负载均衡。因为在系统实际运行过程中,每个生产者生成的消息量,以及每个 Broker 的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的 Broker 接收到的消息总数非常不均匀。另一方面,生产者也无法实时感知到 Broker 的新增与删除,因此,这种负载均衡方式无法做到动态的负载均衡。

使用ZooKeeper进行负载均衡

在 Kafka 中,客户端使用了基于 ZooKeeper 的负载均衡策略来解决生产者的负载均衡问题。在前面内容中也已经提到,每当一个 Broker 启动时,会首先完成 Broker 注册过程,并注册一些诸如“有哪些可订阅的 Topic”的元数据信息。生产者就能够通过这个节点的变化来动态地感知到 Broker 服务器列表的变更。在实现上,Kafka 的生产者会对 ZooKeeper 上的“Broker 的新增与减少”、“Topic 的新增与减少”和“Broker 与 Topic 关联关系的变化”等事件注册 Watcher 监听,这样就可以实现一种动态的负载均衡机制了。

此外,在这种模式下,还能够允许开发人员控制生产者根据一定的规则(例如根据消费者的消费行为)来进行数据分区,而不仅仅是随机算法而已 —— Kafka 将这种特定的分区策略称为“语义分区”。显然,ZooKeeper 在整个生产者负载均衡的过程中扮演了非常重要的角色,通过 ZooKeeper 的 Watcher 通知能够让生产者动态地获取 Broker 和 Topic 的变化情况。

消费者负载均衡

与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息。Kafka 有消费者分组的概念,每个消费者分组中都包含了若干个消费者,每一条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定 Topic 下面的消息,互不干扰,也不需要互相进行协调。因此消费者的负载均衡也可以看做是同一个消费者分组内部的消息消费策略。

消息分区与消费者关系

对于每个消费者分组,Kafka 都会为其分配一个全局唯一的 Group ID,同一个消费者分组内部的所有消费者都共享该 ID。同时,Kafka 也会为每个消费者分配一个 Consumer Id,通常采用Hostname:UUID的形式来表示。在 Kafka 的设计中,规定了每个消息分区有且只能同时有一个消费者进行消息的消费,因此,需要在 ZooKeeper 上记录下消息分区与消费者之间的对应关系。每个消费者一旦确定了对一个消息分区的消费权利,那么需要将其 Consumer ID 写入到对应消息分区的临时节点上,例如/consumers/[group_id]/owners/[topic]/[broker_id-partition_id],其中[broker_id-partition_id]就是一个消息分区的标识,节点内容就是消费该分区上消息的消费者的 Consumer Id。

消息消费进度Offset

记录在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度,即 Offset 记录到 ZooKeeper 上去,以便在该消费者进行重启或是其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息的消费。Offset 在 ZooKeeper 上的记录由一个专门的节点负责,其节点路径为/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],其节点内容就是 Offset 值。

消费者注册

下面我们再来看看消费者服务器在初始化启动时加入消费者分组的过程。

  1. 注册到消费者分组。

    每个消费者服务器在启动的时候,都会到 ZooKeeper 的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id]

    完成节点创建后,消费者就会将自己订阅的 Topic 信息写入该节点。注意,该节点也是一个临时节点,也就是说,一旦消费者服务器出现故障或是下线后,其对应的消费者节点就会被删除掉。

  2. 对消费者分组中消费者的变化注册监听。

    每个消费者都需要关注所属消费者分组中消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的 Watcher 监听。一旦发现消费者新增或减少,就会触发消费者的负载均衡。

  3. 对 Broker 服务器的变化注册监听。
    消费者需要对/broker/ids/[0…N]中的节点进行监听的注册,如果发现 Broker 服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者的负载均衡。

  4. 进行消费者负载均衡。

    所谓消费者负载均衡,是指为了能够让同一个 Topic 下不同分区的消息尽量均衡地被多个消费者消费而进行的一个消费者与消息分区分配的过程。通常,对于一个消费者分组,如果组内的消费者服务器发生变更或 Broker 服务器发生变更,会触发消费者负载均衡。

负载均衡

Kafka 借助 ZooKeeper 上记录的 Broker 和消费者信息,采用了一套特殊的消费者负载均衡算法。由于该算法和 ZooKeeper 本身关系并不是特别大,因此这里只是结合官方文档来对该算法进行简单的陈述,不做详细讲解。

我们将一个消费者分组的每个消费者记为 C1, C2, · · · , CG,那么对于一个消费者Ci,其对应的消息分区分配策略如下。

  1. 设置 PT 为指定 Topic 所有的消息分区。
  2. 设置 CG 为同一个消费者分组中的所有消费者。
  3. 对 PT 进行排序,使分布在同一个 Broker 服务器上的分区尽量靠在一起。
  4. 对 CG 进行排序。
  5. 设置 i 为 Ci 在 CG 中位置的索引值,同时设置N=size(PT)/size(CG)
  6. 将编号为i × N ~ (i + 1) × N - 1 的消息分区分配给消费者 Ci。
  7. 重新更新 ZooKeeper 上消息分区与消费者 Ci 的关系。

关干 Kafka 消费者的负载均衡算法,读者可以访问其官方网站进行更深入的了解。

小结

Kafka 从设计之初就是一个大规模的分布式消息中间件,其服务端存在多个 Broker,同时为了达到负载均衡,将每个 Topic 的消息分成了多个分区,并分布在不同的 Broker 上,多个生产者和消费者能够同时发送和接收消息。Kafka 使用 ZooKeeper 作为其分布式协调框架,很好地将消息生产、消息存储和消息消费的过程有机地结合起来。同时借助 ZooKeeper,Kafka 能够在保持包括生产者、消费者和 Broker 在内的所有组件无状态的情况下,建立起生产者和消费者之间的订阅关系,并实现了生产者和消费者的负载均衡。

在阿里巴巴的实践与应用

消息中间件Metamorphosis

Metamorphosis 是一个高性能、高可用、可扩展的分布式消息中间件,其思路起源于 Kafka,但并不是 Kafka 的一个简单复制。Metamorphosis 具有消息存储顺序写、吞吐量大和支持本地 XA 事务等特性,适用于大吞吐量、顺序消息、消息广播和日志数据传输等分布式应用场景,目前在淘宝和支付宝都有着广泛的应用,其系统整体部署结构如下图所示。

和传统的消息中间件采用推(Push)模型所不同的是,Metamorphosis 是基于拉(Pull)模型构建的,由消费者主动从 Metamorphosis 服务器拉取数据并解析成消息来进行消费,同时大量依赖 ZooKeeper 来实现负载均衡和 Offset 的存储。

生产者的负载均衡

和 Kafka 系统一样,Metamorphosis 假定生产者、Broker 和消费者都是分布式的集群系统。生产者可以是一个集群,多台机器上的生产者可以向相同的 Topic 发送消息。而服务器 Broker 通常也是一个集群,多台 Broker 组成一个集群对外提供一系列的 Topic 消息服务,生产者按照一定的路由规则向集群里某台 Broker 发送消息,消费者按照一定的路由规则拉取某台 Broker 上的消息。每个 Broker 都可以配置一个 Topic 的多个分区,但是在生产者看来,会将一个 Topic 在所有 Broker 上的所有分区组成一个完整的分区列表来使用。

在创建生产者的时候,客户端会从 ZooKeeper 上获取已经配置好的 Topic 对应的 Broker 和分区列表,生产者在发送消息的时候必须选择一台 Broker 上的一个分区来发送消息,默认的策略是一个轮询的路由规则,如下图所示。

生产者在通过 ZooKeeper 获取分区列表之后,会按照 Broker Id 和 Partition 的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。考虑到我们的 Broker 服务器软硬件配置基本一致,因此默认的轮询策略已然足够。

在 Broker 因为重启或者故障等因素无法提供服务时,Producer 能够通过 ZooKeeper 感知到这个变化,同时将失效的分区从列表中移除,从而做到 Fail Over 需要注意的是,因为从故障到生产者感知到这个变化有一定的延迟,因此可能在那一瞬间会有部分的消息发送失败。

消费者的负载均衡

消费者的负载均衡则会相对复杂一些,我们这里讨论的是单个分组内的消费者集群的负载均衡,不同分组的负载均衡互不干扰。消费者的负载均衡跟Topi的分区数目和消费者的个数紧密相关,我们分几个场景来讨论。

消费者数和 Topic 分区数一致

如果单个分组内的消费者数目和 Topic 总的分区数目相同,那么每个消费者负责消费一个分区中的消息,一一对应,如下图所示。

消费者数大于 Topic 分区数

如果单个分组内的消费者数目比 Topic 总的分区数目多,则多出来的消费者不参与消费,如下图所示。

消费者数小于 Topic 分区数

如果分组内的消费者数目比 Topic 总的分区数目小,则有部分消费者需要额外承担消息的消费任务,具体如下图所示。

当分区数目(n)大于单个 Group 的消费者数目(m)的时候,则有n % m个消费者需要额外承担1/n的消费任务,我们假设 n 无限大,那么这种策略还是能够达到负载均衡的目的的。

综上所述,单个分组内的消费者集群的负载均衡策略如下。

  • 毎个分区针对同一个 Group 只能挂载一个消费者,即每个分区至多同时允许被一个消费者进行消费。
  • 如果同一个 Group 的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  • 如果同一个 Group 的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

Metamorphosis 的客户端会自动处理消费者的负载均衡,将消费者列表和分区列表分别排序,然后按照上述规则做合理的挂载。

从上述内容来看,合理地设置分区数目至关重要。如果分区数目太小,则有部分消费者可能闲置;如果分区数目太大,则对服务器的性能有影响。

在某个消费者发生故障或者发生重启等情况时,其他消费者会感知到这一变化(通过 ZooKeeper 的“节点变化”通知),然后重新进行负载均衡,以保证所有的分区都有消费者进行消费。

消息消费位点Offset存储

为了保证生产者和消费者在进行消息发送与接收过程中的可靠性和顺序性,同时也是为了尽可能地保证避免出现消息的重复发送和接收,Metamorphosis 会将消息的消费记录 Offset 记录到 ZooKeeper 上去,以尽可能地确保在消费者进行负载均衡的时候,能够正确地识别出指定分区的消息进度。

RPC服务框架Dubbo

Dubbo 是阿里巴巴于 2011 年 10 月正式开源的一个由 Java 语言编写的分布式服务框架,致力于提供高性能和透明化的远程服务调用方案和基于服务框架展开的完整 SOA 服务治理方案。

Dubbo 的核心部分包含以下三块:

  • 远程通信:提供对多种基于长连接的 NIO 框架抽象封装,包括多种线程模型、序列化,以及“请求-响应”模式的信息交换方式。
  • 集群容错:提供基于接口方法的远程过程透明调用,包括对多协议的支持,以及对软负载均衡、失败容错、地址路由和动态配置等集群特性的支持。
  • 自动发现:提供基于注册中心的目录服务,使服务消费方能动态地查找服务提供方,使地址透明,使服务提供方可以平滑地增加或减少机器。

此外,Dubbo 框架还包括负责服务对象序列化的 Serialize 组件、网络传输组件 Transport、协议层 Protocol 以及服务注册中心 Registry 等,其整体模块组成和协作方式如下图所示。在本节中,我们将主要关注 Dubbo 中基于 ZooKeeper 实现的服务注册中心。

注册中心是 RPC 框架最核心的模块之一,用于服务的注册和订阅。在 Dubbo 的实现中,对注册中心模块进行了抽象封装,因此可以基于其提供的外部接口来实现各种不同类型的注册中心,例如数据库、ZooKeeper 和 Redis 等。在本书前面部分我们已经多次提到,ZooKeeper 是一个树形结构的目录服务,支持变更推送,因此非常适合作为 Dubbo 服务的注册中心,下面我们着重来看基于 ZooKeeper 实现的 Dubbo 注册中心。

在 Dubbo 注册中心的整体架构设计中,ZooKeeper 上服务的节点设计如下图所示。

  • /dubbo:这是 Dubbo 在 ZooKeeper 上创建的根节点
  • /dubbo/com.foo.BarService:这是服务节点,代表了 Dubbo 的一个服务。
  • /dubbo/com.foo.BarService/providers:这是服务提供者的根节点,其子节点代表了每一个服务的真正提供者。
  • /dubbo/com.foo.BarService/consumers:这是服务消费者的根节点,其子节点代表了每一个服务的真正消费者。

服务提供者

服务提供者在初始化启动的时候,会首先在 ZooKeeper 的/dubbo/com.foo.BarService/providers节点下创建一个子节点,并写入自己的 URL 地址,这就代表了com.foo.BarService这个服务的一个提供者。

服务消费者

服务消费者会在启动的时候,读取并订阅 ZooKeeper 上/dubbo/com.foo.BarService/providers节点下的所有子节点,并解析出所有提供者的 URL 地址来作为该服务地址列表,然后开始发起正常调用。

同时,服务消费者还会在 ZooKeeper 的/dubbo/com.foo.BarService/consumers节点下创建一个临时节点,并写入自己的 URL 地址,这就代表了com.foo.BarService这个服务的一个消费者。

监控中心

监控中心是 Dubbo 中服务治理体系的重要一部分,其需要知道一个服务的所有提供者和订阅者,及其变化情况。因此,监控中心在启动的时候,会通过 ZooKeeper 的/dubbo/com.foo.BarService节点来获取所有提供者和消费者的 URL 地址,并注册 Watcher 来监听其子节点变化。

另外需要注意的是,所有提供者在 ZooKeeper 上创建的节点都是临时节点,利用的是临时节点的生命周期和客户端会话相关的特性,因此一旦提供者所在的机器出现故障导致该提供者无法对外提供服务时,该临时节点就会自动从 ZooKeeper 上删除,这样服务的消费者和监控中心都能感知到服务提供者的变化。

在 ZooKeeper 节点结构设计上,以服务名和类型作为节点路径,符合 Dubbo 订阅和通知的需求,这样保证了以服务为粒度的变更通知,通知范围易于控制,即使在服务的提供者和消费者变更频繁的情况下,也不会对 ZooKeeper 造成太大的性能影响。

基于MySQL Binlog的增量订阅和消费组件:Canal

Canal 是阿里巴巴于 2013 年 1 月正式开源的一个由纯 Java 语言编写的基于 MySQL 数据库 Binlog 实现的增量订阅和消费组件。

项目名 Canal 取自“管道”的英文单词,寓意数据的流转,是一个定位为基于 MySQL 数据库的 Binlog 增量日志来实现数据库镜像、实时备份和增量数据消费的通用组件早期的数据库同步业务,大多都是使用 MySQL 数据库的触发器机制(即 Trigger)来获取数据库的增量变更。不过从 2010 年开始,阿里系下属各公司开始逐步尝试基于数据库的日志解析来获取增量变更,并在此基础上实现数据的同步,由此衍生出了数据库的增量订阅和消费业务 —— Canal 项目也由此诞生了 Canal 的工作原理相对比较简单,其核心思想就是模拟 MySQL SIave 的交互协议,将自己伪装成一个 MySQL 的 Slave 机器,然后不断地向 Master 服务器发送 Dump 请求 Master 收到 Dump 请求后,就会开始推送相应的 Binary log 给该 Slave(也就是 Canal )。Canal 收到 Binary Log,解析岀相应的 Binary log 对象后就可以进行二次消费了,其基本工作原理如下图所示。

Canal Server主备切换设计

在 Canal 的设计中,基于对容灾的考虑,往往会配置两个或更多个 Canal Server 来负责一个 MySQL 数据库实例的数据增量复制。另一方面,为了减少 Canal Server 的 Dump 请求对 MySQL Master 所带来的性能影响,就要求不同的 Canal Server 上的 instance 在同一时刻只能有一个处于 Running 状态,其他的 instance 都处于 Standby 状态,这就使得 Canal 必须具备主备自动切换的能力。在 Canal 中,整个主备切换过程控制主要是依赖于 ZooKeeper 来完成的,如下图所示。

  1. 尝试启动

    每个 CanalServer 在启动某个 Canal instance 的时候都会首先向 ZooKeeper 进行一次尝试启动判断。具体的做法是向 ZooKeeper 创建一个相同的临时节点,哪个 CanalServer 创建成功了,那么就让哪个 Server 启动。

    以 example 这个 Instance 为例来说明,所有的 CanalServer 在启动的时候,都会去创建/otter/canal/destinations/example/running节点,并且无论有多少个 CanalServer 同时并发启动,ZooKeeper 都会保证最终只有一个 CanalServer 能够成功创建该节点。

  2. 启动 instance

    假设最终 IP 地址为10.20.144.51的 CanalServer 成功创建了该节点,那么它就会将自己的机器信息写入到该节点中去:

    {"active", true, "address": "10.20.144.51:111", "cid": 1}

    并同时启动 instance。而其他 Canal Server 由于没有成功创建节点,于是就会将自己的状态置为 Standby,同时对/otter/canal/destinations/example/running节点注册 Watcher 监听,以监听该节点的变化情况。

  3. 主备切换
    Canal Server 在运行过程中,难免会发生一些异常情况导致其无法正常工作,这个时候就需要进行主备切换了。基于 ZooKeeper 临时节点的特性,当原本处于 Running 状态的 Canal Server 因为挂掉或网络等原因断开了与 ZooKeeper 的连接,那么/otter/canal/destinations/example/running节点就会在一段时间后消失。

    由于之前处于 Standby 状态的所有 Canal Server 已经对该节点进行了监听,因此它们在接收到 ZooKeeper 发送过来的节点消失通知后,会重复进行步骤 1 —— 以此实现主备切换。

下面我们再来看看在主备切换设计过程中最容易碰到的一个问题,就是“假死”。所谓假死状态是指,Canal Server 所在服务器的网络出现闪断,导致 ZooKeeper 认为其会话失效,从而释放了 Running 节点 —— 但此时 Canal Server 对应的 JVM 并未退出,其工作状态是正常的。

在 Canal 的设计中,为了保护假死状态的 Canal Server,避免因瞬间 Running 节点失效导致 Instance 重新分布带来的资源消耗,所以设计了一个策略:

状态为 Standby 的 Canal Server 在收到 Running 节点释放的通知后,会延迟一段时间抢占 Running 节点,而原本处于 Running 状态的 Instance,即 Running 节点的拥有者可以不需要等待延迟,直接取得 Running 节点。

这样就可以尽可能地保证假死状态下一些无谓的资源释放和重新分配了。目前延迟时间的默认值为 5 秒,即 Running 节点针对假死状态的保护期为 5 秒。

Canal Client的HA设计

Canal Client 在进行数据消费前,首先当然需要找到当前正在提供服务的 Canal Server,即 Master。在上面“主备切换”部分中我们已经讲到,针对每一个数据复制实例,例如 example,都会在/otter/canal/destinations/example/running节点中记录下当前正在运行的 Canal Server。因此,Canal Client 只需要连接 ZooKeeper,并从对应的节点上读取 Canal Server 信息即可。

  1. 从 ZooKeeper 中读取出当前处于 Running 状态的 Server。

    Canal Client 在启动的时候,会首先从otter/canal/destinations/example/running节点上读取出当前处于 Running 状态的 Server。同时,客户端也会将自己的信息注册到 ZooKeeper 的/otter/canal/destinations/example/1001/running节点上,其中“1001”代表了该客户端的唯一标识,其节点内容如下

    {"active": true, "address": "10.12.48.171:50544", "clientId": 1001}
  2. 注册 Running 节点数据变化的监听。
    由于 Canal Server 存在挂掉的风险,因此 Canal Client 还会对/otter/canal/destinations/example/running节点注册一个节点变化的监听,这样一旦发生 Server 的主备切换,Client 就可以随时感知到。

  3. 连接对应的 Running Server 进行数据消费。

数据消费位点记录

由于存在 Canal Client 的重启或其他变化,为了避免数据消费的重复性和顺序错乱,Canal 必须对数据消费的位点进行实时记录。数据消费成功后,Canal Server 会在 ZooKeeper 中记录下当前最后一次消费成功的 Binary Log 位点,一旦发生 Client 重启,只需要从这最后一个位点继续进行消费即可。具体的做法是在 ZooKeeper 的/otter/canal/destinations/example/1001/cursor节点中记录下客户端消费的详细位点信息:

{
    "type": "com.alibaba.otter.canal.protocol.position.LogPosition",
    "identity": {
        "slaveId":-l, 
        "sourceAddress": {"address": "10.20.144.15", "port": 3306}},
    "position": {
        "included": false, 
        "journalName": "mysql-bin.002253", 
        "position": 2574756,
        "timestamp":1363688722600
    }
}

分布式数据库同步系统:Otter

Otter 是阿里巴巴于 2013 年 8 月正式开源的一个由纯 Java 语言编写的分布式数据库同步系统,主要用于异地双 A 机房的数据库数据同步,致力于解决长距离机房的数据同步及双 A 机房架构下的数据一致性问题。

项目名 otter 取自“水獭”的英文单词,寓意数据搬运工,是一个定位为基于数据库增量日志解析,在本机房或异地机房的 MySQL/Oracle 数据库之间进行准实时同步的分布式数据库同步系统。Otter 的第一个版本可以追溯到 2004 年,初衷是为了解决阿里巴巴中美机房之间的数据同步问题,从 4.0 版本开始开源,并逐渐演变成一个通用的分布式数据库同步系统。其基本架构如下图所示。在 Otter 中也是使用 ZooKeeper 来实现一些与分布式协调相关的功能。

分布式SEDA模型调度

SEDA(Staged Event-Driven Architecture)是阶段事件驱动架构的简称,也称为阶段式服务器模型。为了更好地提高整个系统的扩展性和灵活性,在 Otter 中将整个数据同步流程抽象为类似于 ETL 的处理模型,具体分为四个阶段(Stage)。

  • Select:数据接入。
  • Extract:数据提取。
  • Transform:数据转换。
  • Load:数据载入。

其中 Select 阶段是为了解决数据来源的差异性,比如可以接入来自 Canal 的增量数据,也可以接入其他系统的数据源。Extract/Transform/Load 阶段则类似于数据仓库的 ETL 模型,具体可分为数据 Join、数据转化和数据 Load 等过程。同时,为了保证系统的高可用性,SEDA 的每个阶段都会有多个节点进行协同处理。如下图所示是该 SEDA 模型的示意图。

整个模型分为 Stage 管理和 Schedule 调度两部分。

Stage管理

Stage管理主要就是维护一组工作线程,在接收到 Schedule 的 Event 任务信号后,分配个工作线程来进行任务处理,并在任务处理完成后,反馈信息到 Schedule。

Schedule调度

Schedule 调度主要是指基于 ZooKeeper 来管理 Stage 之间的任务消息传递,其具体实现逻辑如下。

  1. 创建节点Otter

    首先会为每个 Stage 在 ZooKeeper 上创建一个节点,例如/seda/stage/s1,其中 s1 即为该 Stage 的名称,每个任务事件都会对应于该节点下的一个子节点,例如/seda/stage/s1/RequestA

  2. 任务分配

    当 s1 的上一级 Stage 完成 RequestA 任务后,就会通知 Schedule 调度器其已完成了该请求。根据预先定义的 Stage 流程,Schedule 调度器便会在 Stage s1 的目录下创建一个 RequestA 的子节点,告知 s1 有一个新的请求需要其处理 —— 以此完成一次任务的分配。

  3. 任务通知

    每个 Stage 都会有一个 Schedule 监听线程,利用 Zookeeper 的 Watcher 机制来关注 ZooKeeper 中对应 Stage 节点的子节点变化,比如关注 s1 就是关注/seda/stage/s1的子节点的变化情况。此时,如果步骤 2 中调度器在 s1 的节点下创建了一个 RequestA,那么 ZooKeeper 就会通过 Watcher 机制通知到该 Schedule 线程,然后 Schedule 就会通知 Stage 进行任务处理 —— 以此完成一次任务的通知。

  4. 任务完成
    当 s1 完成了 RequestA 任务后,会删除 s1 目录下的 RequestA 任务,代表处理完成。然后继续步骤 2,分配下一个 Stage 的任务。

在上面的步骤 3 中,还有一个需要注意的细节是,在真正的生产环境部署中,往往都会由多台机器共同组成一个 Stage 来处理 Request,因此就涉及多个机器节点之间的分布式协调。

如果 s1 有多个节点协同处理,每个节点都会有该 Stage 的一个 Shedule 线程,其在 s1 目录变化时都会收到通知。在这种情况下,往往可以采取抢占式的模式,尝试在 RequestA 目录下创建一个 lock 节点,谁创建成功就可以代表当前谁抢到了任务,而没抢到该任务的节点,便会关注该 lock 节点的变化(因为一旦该 lock 节点消失,那么代表当前抢到任务的节点可能出现了异常退出,没有完成任务),然后继续抢占模型。

中美跨机房ZooKeeper集群的部署

由于 Otter 主要用于异地双 A 机房的数据库同步,致力于解决长距离机房的数据同步及双 A 机房架构下的数据一致性问题,因此其本身就有面向中美机房服务的需求,也就会有每个机房都要对 ZooKeeper 进行读写操作的需求。于是,希望可以部署一个面向全球机房服务的 ZooKeeper 集群,保证读写数据一致性。

这里就需要使用 ZooKeeper 的 Observer 功能了。从3.3.0版本开始, ZooKeeper新增了 Observer 模式,该角色提供只读服务,且不参与事务请求的投票,主要用来提升整个 ZooKeeper 集群对非事务请求的处理能力。

因此,借助 ZooKeeper 的 Observer 特性,Otter 将 ZooKeeper 集群进行了三地部署。

  • 杭州机房部署 Leader/Follower 集群,为了保障系统高可用,可以部署 3 个机房。每个机房的部署实例可为 1/1/1 或者 3/2/2 的模式
  • 美国机房部署 Observer 集群,为了保证系统高可用,可以部署 2 个机房,每个机房的部署实例可以为 1/1。
  • 青岛机房部署 Observer 集群。

当美国机房的客户端发起一个非事务请求时,就直接从部署在美国机房的 Observer ZooKeeper 读取数据即可,这将大大减少中美机房之间网络延迟对 ZooKeeper 操作的影响。而如果是事务请求,那么美国机房的 Observer 就会将该事务请求转发到杭州机房的 Leader/Follower 集群上进行投票处理,然后再通知美国机房的 Observer,最后再由美国机房的 Observer 负责响应客户端。

上面这个部署结构,不仅大大提升了 ZooKeeper 集群对美国机房客户端的非事务请求处理能力,同时,由于对事务请求的投票处理都是在杭州机房内部完成,因此也大大提升了集群对事务请求的处理能力。

轻量级分布式通用搜索平台:终搜

终搜(Terminator)是阿里早期的一款产品,最早应用在淘江湖,基于 Lucene、Solr、ZooKeeper 和 Hadoop 等开源技术构建,全方位支持各种检索需求,是一款实时性高、接入成本低、支持个性化检索定制的分布式全文检索系统。历经发展,终搜目前已成为服务于阿里集团内部各大业务线的通用搜索平台,截止2014年4月,已经有 200 多个不同规模、不同查询特征的应用接入使用。

终搜系统主要由前端业务查询处理、后台索引构建、数据存储和后台管理四大部分组成,其整体架构如图所示。

CenterNode

该节点收集和监控整个集群平台所有检索节点机器和引擎 SolrCore 的状态,并且根据这些状态信息来决定业务对应的引擎是否需要进行发布、变更、删除、容灾恢复和在线扩容等操作。

CoreNode

该节点负责从 CenterNode 的任务池领取任务指令,并对相应的业务引擎 SolrCore 进行创建、变更和删除等动作,同时在引擎 SolrCore 对象正常创建后提供检索服务。

JobNode

该节点接收 CoreNode 提交的业务对应的全量任务指令,并根据 TaskNode 当时的空闲程度将任务分配给最空闲的 TaskNode 节点进行全量索引构建任务。

TaskNode

该节点接收来自 JobNode 节点分配的全量任务,根据 JobNode 提交的任务配置项启动全量任务,将 HDFS 上对应业务的全量源数据构建成 Lucene 的索引文件,构建索引完毕后再回流到 HDFS。

TriggerNode

该节点根据每个业务所配置的时间表达式定时触发业务方的 ClientNode 客户端的增量和全量任务。

ClientNode

该节点是业务方发起查询请求的节点,如果本节点从 ZooKeeper 上抢到执行导入的锁,那么该节点将会接收到 TriggerNode 的定时触发指令,然后会根据分库分表规则将数据库的源数据通过增量和全量模式导入到 HDFS。

ManagerNode

该节点是整个引擎平台的后台管理节点,负责所有接入业务的发布、扩容和配置变更等指令的触发,并提供整个引擎平台所有业务状态信息的可视化查询。

ZooKeeper

该平台负责整个引擎平台所有业务引擎所需要的源数据和索引数据的存储。终搜系统大量依赖 ZooKeeper 来实现分布式协调和分布式锁功能,接下来我们就从元数据管理、中心节点架构、应用配置文件管理和全量任务执行等方面来讲解 ZooKeeper 在终搜中的使用。

Hadoop

该平台负责整个引擎平台所有业务引擎所需要的源数据和索引数据的存储终搜系统大量依赖 ZooKeeper 来实现分布式协调和分布式锁功能。

元数据管理

为了对所有业务实例的生命周期进行全局的管理,必须对所有业务实例元数据信息进行结构化的管理。通过各种技术调研,最终选择了 ZooKeeper 来进行元数据管理——准确地讲,在终搜中并不是直接简单地拿 ZooKeeper 来做这件事,而是开发了一个封装了 ZooKeeper 内核的中心节点(Center Node)集群来负责引擎状态数据收集和搜索业务实例元数据保存。具体来讲就是 CenterNode 内部关于搜索业务实例持久化的工作统一交给了 ZooKeeper。之所以选择 ZooKeeper,主要考虑以下两个因素。

  • 元数据信息属于目录型的轻量级数据,而 ZooKeeper 对目录型的轻量级数据的存储有天然的优势。
  • 元数据的信息非常重要,需要副本容灾,而 ZooKeeper 是用来解决分布式数据多副本存储及数据一致性问题的。

下面我们就来看看如何利用 ZooKeeper 对业务实例进行元数据信息的持久化,核心的实现思路是让 CenterNode 掌控整个搜索集群平台所有业务的客户端机器视图和机器状态等信息,同时监控各个 CoreNode 节点(在这里我们将承载搜索实例的节点称为 CoreNode)的健康状态,CenterNode 节点主要收集的内容如下:

  • 机器状态信息收集,包括:
    • 机器操作系统版本
    • 机器磁盘使用率
    • 机器内存使用率
    • 机器 CPU Load 情况
    • JVM 版本信息
    • JVM 内存使用率
  • 检索服务状态收集,包括:
    • 素引构建时间和容量大小
    • 每秒响应请求次数
    • 索引数据总量
    • 请求平均响应时间

这些状态信息收集后需要和具体的 CoreNode 一一对应起来,在 CenterNode 内存中 CoreNode 状态信息的视图关系如下图所示。

在 CenterNode 中,主要包括两种数据结构。

  • NameSpaceFile 中的静态 Core
  • CorenodeDescriptor 中的动态 DynCore

DynCore 不在本书讨论范围内,这里主要介绍下 NameSpaceFile 中的静态 Core。NameSpaceFile 是在创建搜索业务时就会在 CenterNode 中生成的一个元数据结构,是搜索业务在 CenterNode 中的一个管理抽象和业务抽象,主要内容包括该业务 Shard 的数量副本数量以及涉及的配置文件名称等。例如,某个业务存在 3 个 Shard 分片,那么就会在 NameSpaceFile 中存在 3 个 Core 的抽象。这些信息一旦发布基本都不大会改变,除非出现扩容情况。

CenterNode 对于每个业务的管理和操作都是基于 NameSpaceFile 进行的,例如扩容和容灾等。同时,这些信息是需要持久化存储的,所以在这里使用 ZooKeeper 来做持久化,其在 ZooKeeper 上的数据节点结构如下:

  • /tsearchercenternode/namespace/search4A/seq
  • /searcher/centernode/namespace/search4B/seq
  • /tsearchercenternode/namespace/search4C/seq

其中每个 seq 节点中保存的都是一个序列化的 NameSpaceFile 数据。

Leader/Follower模式的中心节点架构

在上文中我们已经提到,中心节点(Center Node)在整个终搜平台中起到了中心调度的作用,是终搜系统完成信息收集、汇总和分发的中转节点,是把整个系统串联在一起的一个重要组成部分。因此,中心节点是整个终搜的核心,如果中心节点机器宕机导致无法对外服务的话,那么终搜所有业务机器的状态信息将全部丢失。于是,如何处理好中心节点的容灾问题成为了终搜中最关键也是最棘手的一环。

旧版本终搜的中心节点采用的是类似于 HDFS 的 NameNode 处理方式:使用两台机器来保证中心节点的稳定性,一台用来部署中心节点的组件,另一台用来同步中心节点的数据文件到本地,实现中心节点中元数据文件的远程备份,该节点称为 ImageNode。ImageNode 对中心节点进行数据冗余备份,负责对中心节点中业务元数据信息(NameSpaceFile 信息)的定期快照,如下图所示。

利用 HDFS 的 ImageNode 解决中心节点单点失败的方式虽然可以在一定程度上恢复宕机之前的业务元数据信息,但是还是会存在一些问题。

  • 该方案必须通过人工手动处理的方式寻找并复制在远程 ImageNode 机器中保存的快照文件,然后手工重启中心节点 —— 无法自动化完成在宕机之后的数据复制和机器重启,从而自动完成中心节点的恢复。
  • 在中心节点失败期间,无法收集机器的状态信息,也无法对业务进行操作,系统不可用时间完全取决于人工恢复中心节点的时间长短。
  • 中心节点和 ImageNode 之间的异步化的数据同步,在一些极端情况下会出现数据丢失的情况。

正是由于以上三个问题的存在,使得虽然可以在中心节点失败后利用 ImageNode 中保存的快照文件对业务进行恢复,但还是会存在一些不足之处,所以考虑采用 ZooKeeper 多机器副本原理改造中心节点,使得中心节点能具备多副本概念,当其中一台主节点宕机的时候,能够自动地从其余从节点中选举出主节点来,再重新提供服务,如下图所示。

选主机制

终搜的中心节点都是基于 ZooKeeper 架构的。事实上,中心节点就是在 ZooKeeper 基础上进行二次开发的,其中 Leader 选举完全使用 ZooKeeper 的底层实现,这样就能很好地在多台中心节点中选举出一台主节点来。
当某一个中心节点失败时,如果该中心节点是 Leader 节点,那么就从其他 Follower 节点中重新进行选举(这些都是依靠底层 ZooKeeper 原生支持的),选举出来的 Leader 节点充当主节点作用;如果挂掉的中心节点不是 Leader 节点,则不用进行选举,同时所有和这台失败的机器连接的 Search 节点会自动重连到其他中心节点机器,对于后续的读写请求,则依然交给 Leader 节点进行处理,这对于用户来说是透明的,可以说是完成了个平滑的恢复。

CenterNode基于Zookeeper版本的二次开发工作

CenterNode 是基于 3.4.5 版本的 ZooKeeper 进行二次开发的,其核心改造点如下图所示。

中心节点对 ZooKeeper 进行的二次开发,主要集中在 CenterNodePeer 和 CenterNodeCnxn 这两个类上。

CenterNodePeer

CenterNodePeer 类继承自 ZooKeeper 中原生的 QuorumPeer 类。QuorumPeer 是一个线程类,继承自 Thread,主要负责检测 ZooKeeper 服务器状态并触发 Leader 选举。一个 QuorumPeer 代表了一个 ZooKeeper 节点,或者说一个 ZooKeeper 进程。

QuorumPeer 线程启动之后,首先会进行 Leader 选举。在运行期间,QuorumPeer 共有 4 种可能的状态,分别是 LOOKING、FOLLOWING、LEADING 和 OBSERVING。启动时的初始状态是 LOOKING,表示正在寻找确定新的 Leader 服务器。在 ZooKeeper 中,Leader 选举的默认算法是基于 TCP 实现的 FastLeaderElection。

当某一台 ZooKeeper 服务器被选举成为 Leader 节点后,会调用被 CenterNodePeer 重写了的 setLeader 方法,来初始化 CenterNodeCnxn 服务,这样就完成了正常的调用逻辑。同样道理,被选举成为 Follower 节点或是 Observer 节点的 CenterNode,也会调用对应的 Set 方法来完成相关逻辑。

当出现因为某些机器宕机了而造成集群需要重新选举 Leader 的情况时,首先会调用对应的 Set 方法,通过传递 NULL 参数的方式来标识当前服务要重新选举 Leader,服务需要暂停,CenterNodeCnxn 就会处理一系列的逻辑故障从而恢复逻辑。

所以,在 CenterNodePeer 类中,终搜只是重写了 QuorumPeer 的setLeader(Leader leader)setFollower(Follower follwer)以及setObserver(Observer observer)这 3 个方法,加上终搜服务对应的处理逻辑,就能完成 CenterNode 基于 ZooKeeper 的二次开发。

CeterNodeCnxn

CeterNodeCnxn 类主要就是中心节点对外提供服务的入口类,所有 Search 节点的请求都会先发送到 CenterNodeCnxn 类,然后 CenterNode 会根据自己是否是 Leader 节点来对请求做出相应的处理逻辑。

请求处理

中心节点的请求处理也是参考 ZooKeeper 的请求实现的,即 Leader 节点负责请求处理,Follower 节点负责转发。具体当用户发起一个业务创建请求的时候,处理过程如下:

中心节点收到业务请求,首先会检查自身是否是 Leader 角色,如果是 Leader 角色,则进行正常的业务处理;否则把该请求发送到 Leader 节点上去,然后等待 Leader 节点返回操作结果,如果 Leader 节点长时间未响应或者请求失败,则给请求方返回异常信息,否则返回正常的业务响应。

应用配置文件管理

在终搜构建索引的过程中,会使用到的关键配置包括 schema.xml 和 solrconfig.xml 两个文件,分别定义了索引结构和查询入口,是串联应用和索引之间的桥梁,因此需要为每个应用定制特有的配置。在终搜中,使用 ZooKeeper 对这些配置文件进行了管理,基本步骤如下。

  1. 配置初始化

    例如,对于某应用 App1,首先会在本地根据该应用的结构化特征数据和查询特性配置好 schema.xml 和 solrconfig.xml 两个配置文件,然后将这两份配置分别写入 ZooKeeper 指定数据节点:

    /terminator/terminator-node/[Hostname]/search4App1-0/schema.xml
    /terminator/terminator-node/[Hostname]/search4App1-0/solrconfig.xml

    其中的[Hostname]是指该应用的数据内容所在的终搜机器。

  2. 动态更新配置。
    上述配置文件初始化完毕后,应用 App1 会到 ZooKeeper 指定节点(即上述两个节点)上获取相关配置,同时注册对这两个节点的“数据变更” Watcher 监听 —— 这样,一旦配置文件发生变化,应用就可以实时获取到最新的配置了。

使用 ZooKeeper 来实现应用配置文件的管理,能够做到配置的实时性和全局的一致性,同时解除了应用系统和终搜后台索引系统的耦合,但同时受限于 ZooKeeper 数据节点数据大小的限制,配置文件的配置需要非常精简。

选举机器执行全量任务

在分布式系统中,有些特别耗费资源(包括网络、CPU 和内存等)的任务,通常只需要选举集群中的一台机器来执行,然后再将执行结果同步给集群中的其他机器,这样能够大大提高集群对外的整体服务能力 —— 在终搜中,数据的定时全量 DUMP 就是这样一个典型的任务。

通常应用会被部署在多台机器上,如果每台机器都进行增量和全量数据导入,那么会存在多份重复数据,如果只让其中一台机器进行导入操作,那么该机器出现宕机后,导入任务将会终止。因为,基于对容灾的考虑,我们需要解决如下问题:在保证全局执行导入的机器只有一台的同时,还要在该台机器出现宕机后,保证将有其他机器能够继续执行下一次的增量和全量导入任务。而解决该问题最好的实现方式便是利用 ZooKeeper 的分布式锁。

  1. 注册节点

    我们还是以应用 App1 为例,在应用启动初始化的时候,会检查 ZooKeeper 的指定节点(该节点是临时节点,下文中我们称该节点为“Master节点”)是否存在:

    /terminator/terminator-node/search4App1-0/full-dump/master

    • 如果节点不存在,那么就创建该临时节点,同时将自己所在的服务器 IP 地址写入该节点。
    • 如果节点已经存在,或者是在上述创建过程中出现“被其他机器抢先创建导致节点创建失败”的现象,那么就对已经存在的节点注册“节点变更”的 Watcher 监听。
  2. 执行任务

    在应用集群开始执行定时全量任务时,会首先访问 ZooKeeper 上的 Master 节点,读取出节点的 IP 信息,如果该 IP 信息和自身服务器地址一致,则说明自己有执行全量任务的权限;如果和自身服务器地址不一致,则不进行全量任务。

  3. Master选举

    在整个系统运行过程中,会出现 Master 节点上 IP 信息对应的服务器出现问题导致 Master 节点也随之消失的情况。由于我们在步骤 1 中已经注册了对该 Master 节点的“节点变更” Watcher 监听,因此所有其他机器都会收到通知,于是再次按照步骤 1 的逻辑进行节点注册

服务路由

应用在使用终搜的过程中,初始化阶段需要找到“查询服务”的提供方,我们称这个过程为服务路由。在传统的方案中,可以使用域名的方式来实现 —— 通过分配不同的域名,为其配置不同的 IP,而在终搜中是使用 ZooKeeper 来完成服务器路由的功能。在一个应用申请接入的过程中,终搜后台会为其分配查询服务的分组,对应一个集群的机器,并将这个集群的机器配置到指定节点:

/Terminator/terminator-node/search4App1-0/query-group

应用服务器在启动的过程中,会首先从 ZooKeeper 集群上读取出查询服务的分组信息,并同时对该节点注册“数据变化”通知。另外,客户端还会将从 ZooKeeper 上获取到的数据信息持久化存储到本地文件系统中,以便在出现多次尝试连接 ZooKeeper 服务器失败时,能够使用这份本地的信息。通过这种方式,每个应用就可以动态获取查询服务的分组信息,完成服务路由,同时也便于终搜的运维人员进行全局运维,提高了实时性。

索引分区

在传统的关系型数据库中,随着数据库数据量的不断增加,单台数捃库的存储空间査询性能已经不能满足业务需求,这时候就需要进行分库操作。在终搜中也同样面临这样的问题,主要体现在索引上,不断增长的索引量成为了制约査询性能的瓶颈,针对这个问题,约定俗成的解决方案通常就是将索引进行分区 —— 终搜基于 ZooKeeper 配置来进行索引分区。

在终搜中,每次完成全量索引构建后,都会将当前应用的索引分区同步到 ZooKeeper 上,如下图所示。

该应用的全量索引被分成了 0、1 和 2 三个分区,同时每个分区里面又分配了两台机器来存储索引副本。应用在启动的时候,会首先到 ZooKeeper 节点上获取相应的索引分区,以及每个分区索引副本的服务器地址。

垂直扩容

所谓垂直扩容,是指为每一个索引分区添加更多的机器以保证分区数据的安全性。假如一次垂直扩容,添加了一台 P3 的机器,那么垂直扩容后 ZooKeeper 上的分区如下图所示。

水平扩容

水平扩容和垂直扩容非常相近,只是水平扩容是对分区的扩容,因此改动的是 ZooKeeper 上对应的分区节点,如下图所示。

上图就是在原来索引分区的基础上,进行了分区扩容,添加了新的分区:3。

实时计算引擎:JStorm

随着互联网大数据技术的不断发展,人们对数据实时性的要求越来越高,传统 Hadoop 的 Map Reduce 技术已经逐渐无法满足这些需求,因此实时计算成为了眼下大数据领域最热门的硏究方向之一,出现了诸如 Storm 和 JStorm 这样的实时计算引擎。Storm 是 Twitter 开源的一个高容错的分布式实时计算系统,而 Storm 是阿里巴巴集团中间件团队在 Storm 基础上改造和优化的一个分布式实时计算引擎,使用 Java 语言编写,于2013年9月正式开源。相较于 Storm,Storm 在功能上更强大,在稳定性和性能上有更卓越的表现,目前广泛应用于日志分析、消息转化器和统计分析器等一系列无状态的实时计算系统上。

JStorm 是一个类似于 Hadoop MapReduce 的分布式任务调度系统,用户按照指定的接口编写一个任务程序,然后将这个任务程序提交给 JStorm 系统,JSTorm 会负责7×24小时运行并调度该任务。在运行过程中如果某个任务执行器(Worker)发生意外情况或其他故障,调度器会立即分配一个新的 Worker 替换这个失效的 Worker 来继续执行任务。

JStorm 是一个典型的分布式调度系统,其系统整体架构如图所示。

其核心部分由 Nimbus、Supervisor、Worker、Task 和 ZooKeeper 五部分组成。

  • Nimbus 是任务的中央调度器。
  • Supervisor 作为 Worker 的代理角色,负责管理 Worker 的生命周期。
  • Worker 是 Task 的容器。
  • Task 对应每一个任务的真正执行体。
  • ZooKeeper 是整个系统中的协调者。

无论是 Storm 还是 JStorm,都高度依赖 ZooKeeper 来实现诸如同步心跳、同步任务配置和调度器选举等功能,可以说,如果脱离了 ZooKeeper,这两个实时计算系统都无法正常工作。

同步心跳

在 JStorm 中,需要在集群内部实时同步三种心跳检测。

  • Worker 向 Supervisor 汇报心跳。
  • Supervisor 向 Nimbus 汇报心跳。
  • Task 向 Nimbus 汇报心跳。

其中后两种心跳检测机制都是通过 ZooKeeper 来实现的。

在 JStorm 的实现中,Supervisor 每隔 10 秒就会将自己拥有的资源数同步到 ZooKeeper 的/supervisors节点上,Nimbυs 就可以通过査询这些节点来检测有哪些机器是活着的,并且能够清楚地知道这些机器上有哪些资源。

而每个 Task 同样会每隔10秒就将自己的心跳和运行状态同步到 ZooKeeper 的/tasks节点上,这样 Nimbus 就能够检测到哪些 Task 是活着的。同时,一旦检测到某个 Task 的心跳超时,则会触发 Nimbus 对该 Task 执行 Reassign 动作(重新分配任务)。

同步任务配置

在上文中已经提到,JStorm 是一个类似于 Hadoop MapReduce 的分布式任务调度系统,用户按照指定的接口编写一个任务程序,然后将这个任务程序提交给 JStorm 系统,由 JStorm 来负责运行并调度该任务,因此同步任务配置是 JStorm 的一大核心功能。整个同步任务配置过程大体可以分为提交任务和同步 Topology 状态两大环节。

提交任务

提交任务的过程如下。

  1. 客户端提交一个 JAR 包到 Nimbus。
  2. Nimbus 扫描 ZooKeeper 上的/supervisors节点,来获取本集群中的所有资源信息。
  3. Nimbus 还会扫描 ZooKeeper 上的/assignments节点,来获取已经分配的任务的资源占用情况。
  4. Nimbυs 根据平衡算法,将 Task 分配到每台机器上,同时确定 Task 绑定的端口和资源占用情况(CPU SIot、Memory Slot 和 Disk Slot)。
  5. 完成任务分配后,Nimbus 会将任务的分配结果写入 ZooKeeper 的/assignments节点
  6. Nimbus 还需要设置 Topology 的状态为 Active,做法就是在 ZooKeeper 上的/topolog点下找到以该 Topology 的 topology-id 命名的对应子节点,并将其设置为 Active。
  7. 重新分配任务。每个 Supervisor 都会监听 ZooKeeper 上的/assignments节点,当检测到节点发生变更时,就会立即获取本机的任务配置,然后启动或杀死对应的 Worker。

同步Topology状态

JStorm 提供了一系列的命令来控制 Storm 服务,这里以客户端的 deactivate 命令为例来说明 JStorm 是如何借助 ZooKeeper 来同步 Topology 状态的。

  1. 客户端发出 deactivate 命令
  2. Nimbus 在接收到该命令后,会设置 ZooKeeper 中的 Stormbase 节点对应的 Topology 的状态为 deactivate
  3. 同时,Worker 进程会对 Zookeeper 中的/StormBase节点注册监听,当节点发生变更时,立即设置 Worker 的状态为 deactivate

  4. Worker 内部的 Task 每执行一个 batch 操作后,就会检查 Worker 的状态,如果状态变更为 deactivate,那么 Task 就会立即将自己置为挂起状态。

调度器选举

和 JStorm 相比,JStorm 中增加了调度器的 HA 机制,用于实现调度器的动态选举。每一个 Nimbus 在启动的时候,都会试图到 ZooKeeper 上创建一个临时节点/nimbus_master。在创建的过程中,如果发现该节点已经存在,则表示 Nimbus 的 Master 已经存在,那么当前 Nimbus 就会在 ZooKeeper 的/nimbus_slave节点下创建一个临时子节点,并将自己的机器名和端口号写入到该节点中,同时注册对/nimbus_master节点的监听。

在运行过程中,该 Nimbus(这里指创建/nimbus_slave节点对应的机器)还会启动一个 Follower 线程,用于

  • 反复扫描/nimbus_master是否存在。
  • 如果/nimbus_master节点存在,则同步/nimbus_master的 Topology 到本机中。
  • 如果/nimbus_master节点已经消失,则会触发调度器的重新选举,具体流程和上面提到的初始化流程是一致的,简单地讲,就是集群中所有机器都去创建/nimbus_master节点,如果节点创建成功,那么该机器就是 Master,创建失败,那么就是 Slave。

ZooKeeper使用优化

JStorm 是从 Storm 中改造而来的,在使用 ZooKeeper 方面也进行了大量的改进与优化。

减少对 Zookeeper 的全量扫描

在 Storm 中,判断一个 Task 是否存活的方法非常复杂,首先会通过扫描/StormBase节点来获取 Topology 列表,将存活的 Topology 提取出来;然后扫描/assignments节点,获取每一个 Task 的任务配置,然后以此来判断 Task 是否存活。相信读者很容易发现,在整个过程中,几乎扫描了整个 ZooKeeper 上的数据节点,这显然增加了 ZooKeeper 的压力。

而在 JStorm 中,判断一个 Task 是否存活的方法只需要扫描/tasks中该 Topology 的节点,通过对心跳时间进行判断即可。

减少无用的 Watcher 操作

在 JStorm 中,Nimbus 取消了对 supervisors 节点的 Watcher 操作,因为增加或减少 Supervisor 没有必要触发 Rebalance 动作,而 Storm 的设计却画蛇添足地触发了 Rebalance 动作,直到 0.9.0 版本后,Storm 官方才取消了该 Rebalance 动作。

在 JStorm 中,Supervisor 取消了对 Stormbase 节点的 Watcher 操作,Supervisor 只需监听/assignments节点即可,没有必要重复性地监听 Stormbase 节点 —— 举个例子,假如有 200 台机器,那么后者至少额外增加了 200 多次的 Watcher 通知。

延长心跳设置

  • Task 的心跳频率,由原来的 3 秒改为了 10 秒。这个改动使得 JStorm 对 ZooKeeper 的压力减轻了许多。

    在 JStorm 中,毎一台机器上通常会运行 20 多个 Worker,假设当集群的规模上升到 200 台时,整个集群可能运行着 5000 个以上的 Task,这样就会造成对 ZooKeeper 每秒至少 1600 次的心跳请求。同时,每一个 Task 的心跳包大小为 200 多个字节,因此,将 Task 的心跳频率延长到 10 秒,可以明显减轻对 ZooKeeper 的压力。

  • 增加 ZooKeeper 的 Timeout 重连次数。

    在 Strom 中,当失去与 ZooKeeper 连接的时候会进行 5 次重连操作。但在实际运行过程中,ZooKeeper 很容易在某个瞬间处于无应答的状态,一旦 Storm 连续 5 次请求连接 ZooKeeper 失败后,Nimbus、Supervisor 和 Worker 就会自动退出,而如果 Nimbus 自动退出,就很容易导致集群丧失中央调度器功能。而在大部分的情况下,ZooKeeper 只是短暂地处于无应答状态,一段时间后就会恢复正常。因此,增加重试次数,可以明显降低 Supervisor、 Nimbus和 Worker的自动退出概率。