0%

架构设计 MessageQueue方案

Message Queue 从最初的架构设计到现在的贴近云原生的架构风格已经演进的非常成熟,本文通过将 RocketMQ、Kafka、Pulsar 进行对比来整理当下 MQ 中间件对于存储、通信的实现。

消息获取模式

Push

Push 即服务端主动发送数据给客户端。在服务端收到消息之后立即推送给客户端。

Push 模型最大的好处就是实时性。因为服务端可以做到只要有消息就立即推送,所以消息的消费没有“额外”的延迟。

但是 Push 模式在消息中间件的场景中会面临以下一些问题:

  • Broker 需要维护 Consumer 的状态,不利于 Broker 去支持大量的 Consumer 的场景。
  • Consumer 的消费速度是不一致的,由 Broker 进行推送难以处理不同的 Consumer 的状况。
  • Broker 难以处理 Consumer 无法消费消息的情况(Broker 无法确定 Consumer 的故障是短暂的还是永久的)。
  • 大量的推送消息会加重 Consumer 的负载或者冲垮 Consumer。

Pull

Pull 模式可以很好的应对 Push 模式不擅长的场景。Pull 模式由 Consumer 主动从 Broker 获取消息。

这样带来了一些好处:

  • Broker 不再需要维护 Consumer 的状态(每一次 Pull 都包含了偏移量等必要的信息)。
  • Consumer 可以很容易的根据自身的负载等状态来决定从 Broker 获取消息的频率。
  • Consumer 可以批量获取一段时间内 Broker 接到的消息,节省带宽。

但是,和 Push 模式正好相反,Pull 就面临了实时性的问题。因为由 Consumer 主动来 Pull 消息,所以实时性和 Pull 的周期相关,这里就产生了“额外”延迟。如果为了降低延迟来提升 Pull 的执行频率,可能在没有消息的时候产生大量的 Pull 请求(消息中间件是完全解耦的,Broker 和 Consumer 无法预测下一条消息在什么时候产生);如果频率低了,那延迟自然就大了)。

另外,Pull 模式状态维护在 Consumer,所以多个 Consumer 之间需要相互协调,这里就需要引入 ZK 等来完成。

Long-Polling

参考:Kafka RocketMQ

Long-Polling 结合了 Push 和 Pull 的优势,是 Pull 模式的一个变种。Consumer 主动发起请求到 Broker,正常情况下 Broker 响应消息给 Consumer;在没有消息或者其他一些特殊场景下,可以将请求阻塞在服务端延迟返回。

那么:

  • 在 Broker 一直有可读消息的情况下,Long-Polling 就等价于 Pull 模式。
  • 在 Broker 没有可读消息的情况下,请求阻塞在了 Broker,在产生下一条消息或者请求“超时之前”响应请求给 Consumer。

以上两点避免了多余的 Pull 请求,同时也解决 Pull 请求的执行频率导致的“额外”的延迟。

请求返回和阻塞超时

注意上面有一个概念:“超时之前”。每一个请求都有超时时间,Pull 请求也是。“超时之前”的含义是在 Consumer 的“Pull”请求超时之前。

基于 Long-Polling 的模型,Broker 需要保证在请求超时之前返回一个结果给 Consumer,无论这个结果是读取到了消息或者没有可读消息。因为 Consumer 和 Broker 之间的时间是有偏差的,且请求从 Consumer 发送到 Broker 也是需要时间的,所以如果一个请求的超时时间是 5 秒,而这个请求在 Broker 端阻塞了 5 秒才返回,那么 Consumer 在收到 Broker 响应之前就会判定请求超时。所以 Broker 需要保证在 Consumer 判定请求超时之前返回一个结果。

通常的做法时在 Broker 端可以阻塞请求的时间总是小于 Long-Polling 请求的超时时间。比如 Long-Polling 请求的超时时间为 30 秒,那么 Broker 在收到请求后最迟在 25s 之后一定会返回一个结果。中间 5s 的差值来应对 Broker 和 Consumer 的始终存在偏差和网络存在延迟的情况。(可见 Long-Polling 模式的前提是 Broker 和 Consumer 之间的时间偏差没有“很大”)

消息消费延迟

在 Broker 一直有可读消息的情况下,Long-Polling 就等价于执行间隔为 0 的 Pull 模式(每次收到 Pull 结果就发起下一次 Pull 请求)。在这个情况下,一条消息如果在 Long-Polling 请求返回时到达服务端,那么它被 Consumer 消费到的延迟是:

假设 Broker 和 Consumer 之间的一次网络开销时间为 R 毫秒,那么这条消息需要经历 3R 才能到达Consumer

第一个R:消息已经到达 Broker,但是 Long-Polling 请求已经读完数据准备返回 Consumer,从 Broker 到 Consumer 消耗了 R
第二个R:Consumer 收到了 Broker 的响应,发起下一次 Long-Polling,这个请求到达 Broker 需要一个 R 的时间
第三个R:Broker 收到请求读取了这条数据,那么返回到 Consumer 需要一个 R 的时间

所以总共需要3R(不考虑读取的开销,只考虑网络开销)

在这种情况下 Broker 和 Consumer 之间一直在进行请求和响应。

img

Dynamic Push/Pull

参考:Reactive Stream Processing with Akka Streams

Dynamic Push/Pull 有 Long-Polling 的优势,同时能减少在有消息可读的情况下由 Broker 主动 Push 消息给 Consumer,减少不必要的请求。

Dynamic Push/Pull 通过在消息中间件的 Consumer 中新增一个 Buffer 来缓存从 Broker 获取的消息,而用户的消费线程从这个 Buffer 中获取消费来消息,获取消息的线程和消费线程通过这个 Buffer 进行数据传递。有了这个 Buffer 存在,当 Consumer 向 Broker 通过 Long-Polling 请求时,将 Buffer 剩余空间告知给 Broker,当 Consumer 容量对于 Broker 是透明时,Broker 就可以控制推送行为,可以避免过量的请求冲垮 Consumer,也可以避免过多的请求和时延。

这个时候滞后的 Buffer 容量信息,总会带来一些容量和性能上的浪费。

Broker 和 Consumer 协调消息发送的方式有两种:

  • Consumer 通过 Long-Polling 主动批量拉取数据,目的是减少请求次数,能更高效地处理在 Broker 中堆积的数据。
  • Consumer 向 Broker 发送定量 PUSH 命令,Broker 可以在 Consumer 容量允许的条件下实时推送消息。

这两种发送策略全部都是在 Consumer 加以控制,可以保证在 Consumer 不会因为上游数据源数据过大被冲垮。这两种方式也有一种统一的命名:pull based backpressure。

C 消息订阅模式

Consumer 订阅模式是用来定义我们的消息如何分配给不同的消费者,不同消息队列中间件都有自己的订阅模式,一般我们常见的订阅模式有:

  • 集群模式:一条消息只能被一个集群内的消费者所消费。
  • 广播模式:一条消息能被集群内所有的消费者消费。

Pulsar

在 Pulsar 中提供了 4 种消费者订阅模式,分别是独占,灾备,共享,键共享:

  • exclusive:消费组里有且仅有一个 Consumer 能够进行消费,其它的无法连接。适用于全局有序的消息。

  • failover:消费组每个节点都能连接到 Partition 所在的 Broker,但有且仅有一个 Consumer 能消费到数据。当消费节点崩溃,会选出一个新的接班。

  • shared:消费组每个节点都能消费 Topic 中所有的 Partition,消息以 round-robin 发放给消费者。

    RocketMQ 是以 Partition 维度,同一个 Partition 的数据都会被发到一个机器上;Pulsar 直接 round-robin 给所有消费者。

  • key-shared:消费组每个节点都能消费 Topic 中所有的 Partition,但 HASH 值相同的 Key 的消息会保证发送给同一个消费者。

    RocketMQ 中同一个 Key 的顺序消息都会被发送到一个 Partition;

    Pulsar 不会有 Partition 维度,是按照 Key 的 Hash 去分配到固定的 Consumer。

P 消息发送模式

Send 分为 Async 和 Sync 两种模式,但实际上在 Pulsar 内部 Sync 模式也是采用的 Async 模式,在 Sync 模式下模拟回调阻塞,达到同步的效果,这个在 Kafka 中也是采用的这个模式,但是在 RocketMQ 中,所有的 Send 都是真正的同步,都会直接请求到 Broker。

物理存储设计

Kafka

Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 Topic 的。Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 都对应一个 Log 文件,该 Log 文件中存储的就是 Producer 生产的数据。

在创建主题时,Kafka 首先会决定如何在 Broker 间分配分区。假设有 6 个 Broker,打算创建一个包含 10 个分区的主题,并且复制系数为 3(确保至少有 3 台 Broker)。那么 Kafka 就会有 30 个分区副本,它们可以被平均分配给 6 个 Broker 并且每个副本会尽可能分布在不同的 Broker 上。但是,当新的 Broker 加入并不会导致 Partition 的 rebalance,只会处理新的 Topic;如果需要分担旧 topic-partition 的压力,需要手动迁移 Partition,这时会占用大量集群带宽。

由于生产者生产的消息会不断追加到 Log 文件末端,为防止 Log 文件过大导致数据定位效率低,Kafka 采取了分片和索引机制,将每个 Partition 分为多个 Segment(逻辑上的概念,index+log 文件)

每个 Partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 Segment(片段)数据文件中(每个 Segment 文件中消息数量不一定相等),这种特性也方便 Old Segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 Partition 只需要支持顺序读写就行,Segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours} 等若干参数)决定。

由于上面物理储存逻辑的限制,可以轻易的得出以下不足:

  1. Broker 上 Partition 过多会使得磁盘顺序写几乎退化成随机写。
  2. Broker 不能弹性扩容,需要人工手动迁移将旧 Partition 迁移到新 Broker。

RocketMQ

RocketMQ 的消息存储采用的是混合型的存储结构,所谓混合型存储结构,就是 Broker 单个实例下所有的队列共用一个 CommitLog 来存储。Producer 生产任意的消息后,无论 Topic 是否相同,都会将消息内容存储到 CommitLog 中,当 CommitLog 存储满了,则会创建一个新的 CommitLog 文件继续存储新生成的消息。此外,在 Broker 端,会有一个后台服务线程——ReputMessageService 会不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)。而 Consumer 端可以根据 ConsumeQueue 中的数据查找具体的消息。IndexFile 则为消息查询提供了一种通过 key 或时间区间来查询消息的方法。虽然混合型存储结构会导致查询的时候无法快速定位具体的某个消息,但由于 ConsumeQueue 和 Index 两个逻辑存储文件的存在可以一定程度上缓解这种查询复杂度。

RocketMQ 虽然将多个 Topic 对应一个写入文件,让写入变成了顺序写,但是我们的读取很容易导致我们的 Page Cache 被各种覆盖刷新,这对于我们的 IO 的影响是非常大的。

Pulsar

Pulsar 采用的是存储计算分离的架构,而存储端使用的是 BookKeeper 作为实现。

下图是 Bookie 的读写架构图,里面有一些名词需要先介绍一下

  • Entry:Entry 是存储到 bookkeeper 中的一条记录,其中包含 Entry ID,记录实体等。
  • Ledger:多个 Entry序列组成一个 Ledger。
  • Entry Log:存储 Entry 的文件,Ledger 是一个逻辑上的概念,Entry 会先按 Ledger 聚合,然后写入 Entry Log 文件中。Entry Log 会有一个最大值,达到最大值后会新起一个新的 Entry Log 文件
  • Index file:Ledger 的索引文件,Ledger 中的 Entry 被写入到了 Entry Log 文件中,索引文件用于 Entry Log 文件中每一个 Ledger 做索引,记录每个 Ledger 在 Entry Log 中的存储位置以及数据在 Entry Log 文件中的长度。
  • Journal:其实就是 Bookkeeper 的 WAL(write ahead log),用于存 Bookkeeper 的事务日志,Journal 文件有一个最大大小,达到这个大小后会新起一个 Journal 文件。
  • MetaData Storage:元数据存储,是用于存储 Bookie 相关的元数据,比如 Bookie 上有哪些 Ledger,Bookkeeper 目前使用的是 zk 存储,所以在部署 Bookkeeper 前,要先有 zk 集群。

针对 Kafka 和 RocketMQ 的问题,Pulsar 都对其进行了针对性的优化。

  • 写流程:顺序写 + Page Cache。在写流程中我们的所有的文件都是独立磁盘,并且同步刷盘的只有 Journal,Journal 是顺序写一个 journal-wal 文件,顺序写效率非常高。Ledger 和 index 虽然都会存在多个文件,但是我们只会写入 Page Cache,异步刷盘,所以随机写不会影响我们的性能。
  • 读流程:Broker Cache + Bookie Cache,在 Pulsar 中对于追尾读(tailing read)非常友好基本不会走 IO,一般情况下我们的 Consumer 是会立即去拿 Producer 发送的消息的,所以这部分在持久化之后依然在 Broker 中作为 Cache 存在,当然就算 Broker 没有 Cache(比如 Broker 是新建的),我们的 Bookie 也会在 memtable 中有自己的 Cache,通过多重 Cache 减少读流程走 IO。