0%

RocketMQ 原理及应用场景

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

消息队列功能

应用解耦

各个子系统之间的耦合性太高,整体系统的可用性就会大幅降低。多个低错误率的子系统强耦合在一起,得到的是一个高错误率的整体系统。如果通过消息队列实现一个缓冲,会使各个系统即使在故障后恢复的情况下,也能正常的处理相应的数据。值得庆幸的是,这个故障对用户是无感知的。

流量削峰

将瞬时的高流量通过消息队列进行分散消费处理,主要的目的是通过分散的方式应对瞬时高流量,在不影响用户体验和系统功能的情况下提升对数据的总体处理能力。

消息分发

各个微系统根据需求订阅消息,并不需要耦合到数据生产的项目代码中。支持广播模式(不同的微系统互相独立的消费模式)或者集群模式(多个相同服务的微系统共同消费一组消息的消费模式)。

系统模块

RocketMQ 可宏观的分为四个模块:负责发送信息的 Producer、负责接收信息的 Consumer、负责中转信息的 Broker 以及协调各方的 NameServer。为了消除单点故障,增加可靠性或增大吞吐量,可以在多台机器上部署多个 NameServer 和 Broker,为每个 Broker 部署一个或多个 Slave。

无论是 Consumer 还是 Producer,都是根据 Topic 在 NameServer 中找到持有该 Topic 的 Broker。然后在发送消息时,轮询 Topic 下的 MessageQueue。

模块之间的对应关系:

在 Producer -> Topic -> Consumer 的消费逻辑链路中,他们是多对多的关系。例如:多个 Producer 可以向同一个 Topic 写数据,一个 Producer 也可以向多个 Topic 写数据。

在 NameServer -> Broker -> Topic -> MessageQueue的包含逻辑链路中,Broker 需要在全部的 NameServer 上注册,但 Broker 和 Topic 是多对多的关系。每一个 Topic 都可以包含多个 MessageQueue,它们可以分散在不同的 Broker 中。但需要注意的是,每一个 MessageQueue 都只能属于一个 Topic。Topic 是一个逻辑概念,MessageQueue 是其逻辑概念的物理实现。

Consumer

消息推送方式

在 RocketMQ 中,有 PUSH(DefaultMQPushConsumer) 和 PULL(DefaultMQPullConsumer) 两种推送方式,但主动权全部都在 Consumer 身上。 PUSH 方法和 PULL 方式各有优劣,具体看使用场景进行取舍。

PULL 方式以从服务端向客户端拉取消息的形式进行工作。因此,客户端可以依据自己的消费能力进行消费,即使传输失败了再次请求即可。但主动到服务端拉取消息需要定义科学的拉取间隔时间。如果间隔时间太短,对服务器请求压力过大。间隔时间过长,那必然会造成一部分数据的延迟。

当然拉取消息时导致的数据延迟有以下解决方案:

  1. 间隔时间指数增加。即当无可消费数据时,PULL 间隔时间指数级增长,例如间隔时间为 5ms、10ms、20ms、40ms、80ms…… 然后再回到 5ms。但是如果在 41ms 时来了数据,那么到 80ms 就有 40ms 左右的时间延迟。

  2. 长轮询的解决方案。取数据时要是没有数据可消费,不是直接返回而是连接等待,一直有数据来了再返回。长轮询方式的局限性是在 HOLD 住 Consumer 请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。长轮询方式的局限性,是在 HOLD 住 Consumer 请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中 。

PUSH 方式相比 PULL 方式可以从服务端主动向客户端推送消息,比 PULL 方式的及时性更强。但是如果客户端消费能力低于服务端推送速度时,会造成消息堆积(严重时会造成服务崩溃),这也是消息队列需要流控功能限制消费数量的原因。服务端对每一条消息都进行推送,不仅增大了服务端压力,也会影响服务端性能。服务端还需要维护每次传输状态,以防消息传递失败进行重试。

RocketMQ长轮训实现方式

需要注意的是,RocketMQ 实现的 Push 推送方式通过 Pull 长轮训的方式来实现。从 Broker 的源码中可以看出,服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次 waitForRunning 一段时间(默认是5秒),然后后再 Check。默认情况下当 Broker 一直没有新消息,第三次 Check 的时候,等待时间超过 Request 里面的 Broker­SuspendMaxTimeMillis(默认 15s),就返回空结果。在等待的过程中,Broker 收到了新的消息后会直接调用 notifyMessageArriving 函数返回请求结果。“长轮询”的核心是,Broker 端 HOLD 住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer。“长轮询”的主动权还是掌握在 Consumer 手中,Broker 即使有大量消息积压,也不会主动推送给 Consumer。

消息消费方式

  • Clustering 模式

    同一个 ConsumerGroup(GroupName 相同)里的每个 Consumer 只消费所订阅消息的一部分内容,同一个 ConsumerGroup 里所有的 Consumer 消费的内容合起来才是所订阅 Topic 内容的整体,从而达到负载均衡的目的。

  • Broadcasting 模式

    同一个 ConsumerGroup 里的每个 Consumer 都能消费到所订阅 Topic 的全部消息,也就是一个消息会被多次分发,被多个 Consumer 消费。

监控与控制

在 RocketMQ 中,Pull 获得的消息,如果直接提交到线程池里执行,很难监控和控制,比如,如何得知当前消息堆积的数量?如何重复处理某些消息?如何延迟处理某些消息?

RocketMQ 定义了一个快照类 ProcessQueue 来解决这些问题,在 PushConsumer 运行的时候,每个 Message Queue 都会有个对应的 ProcessQueue 对象,保存了这个 Message Queue 消息处理状态的快照 。

ProcessQueue 对象里主要的内容是一个 TreeMap 和一个读写锁。 TreeMap 里以 Message Queue 的 Offset 作为 Key,以消息内容的引用为 Value,保存了 所有从 MessageQueue 获取到,但是还未被处理的消息;读写锁控制着多个线程对 TreeMap 对象的并发访问。

有了 ProcessQueue 对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息。顺序消息是通过 ConsumeMessageOrderlyService 类实现的 ,主要流程和 ConsumeMessageConcurrentlyService类似 ,区别只是在对并发消费的控制上。因此也可以很轻松的通过它实现流量控制。

消息开始读取的位置

了解 OffsetStore 的存储机制以后,我们 看看如何 。 类里有个函数用来设置从哪儿开始消费 消 息:比如

Consumer.setConsumeFromWhere()可以设置 Consumer 读取消息的初始位置。如果从队列开始到感兴趣的消息之间有很大的范围,用 CONSUME_FROM_FIRST_OFFSET 参数并不合适,可以设置从某个时间开始消费消息,比如 Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP)Consumer. setConsumeTimestamp("0131223171201”),时间戳格式是精确到秒的。

注意:设置读取位置不是每次都有效,它的优先级默认在 OffsetStore 后面,比如在 DefaultMQPushConsumer 的 BROADCASTING 方式 下,默认是从 Broker 里读取某个 Topic 对应 ConsumerGroup 的 Offset,当读取不到 Offset 的时候,ConsumeFromWhere 的设置才生效。大部分情况下这个设置在 ConsumerGroup 初次启动时有效。如果 Consumer 正常运行后被停止,然后再启动,会接着上次的 Offset开始消费,ConsumeFromWhere 的设置无效。

Producer

写入策略

消息的发送有同步和异步两种方式。

自定义消息发送规则

一个 Topic 会有多个 Message Queue,如果使用 Producer 的默认配置,这个 Producer 会轮流向各个 Message Queue 发送消息。Consumer 在消费消息的时候,会根据负载均衡策略,消费被分配到的 Message Queue,如果不经过特定的设置,某条消息被发往哪个 Message Queue,被哪个 Consumer 消费是未知的。

public class SimpleSelector implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int id = Integer.parseInt(arg.toString());
        int idMainIndex = id / 100;
        int size = mqs.size();
        int index = idMainIndex % size;
        return mqs.get(index);
    }
}

发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg) 函数发送消息即可。在 MessageQueueSelector 的实现中,根据传入的 Object 参数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被选中的 Message Queue。

对事务的支持

RocketMQ 采用两阶段提交的方式实现事务消息,TransactionMQProducer 处理的流程如下:

  1. 发送方向 RocketMQ 发送“待确认”消息。
  2. RocketMQ 将收到的“待确认” 消息持久化成功后,向发送方回复消息。已经发送成功,此时第一阶段消息发送完成。
  3. 发送方开始执行本地事件逻辑。
  4. 发送方根据本地事件执行结果向 RocketMQ 发送二次确认(Commit 或 Rollback)消息,RocketMQ 收到 Commit 状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息,收到 Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息。
  5. 如果出现异常情况,步骤 4 提交的二次确认最终未到达 RocketMQ。服务器在经过固定时间段后将对“待确认”消息、发起回查请求。
  6. 发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer),通过检查对应消息的本地事件执行结果返回 Commit 或 Rollback 状态。
  7. RocketMQ 收到回查请求后,按照步骤 4 的逻辑处理。

上面的逻辑似乎很好地实现了事务消息功能,它也是 RocketMQ 之前的版本实现事务消息的逻辑。但是因为 RocketMQ 依赖将数据顺序写到磁盘这个特征来提高性能,步骤 4 却需要更改第一阶段消息的状态,这样会造成磁盘 Catch 的脏页过多,降低系统的性能。所以 RocketMQ 在 4.x 的版本中将这部分功能去除。系统中的一些上层 Class 都还在,用户可以根据实际需求实现自己的事务功能。

客户端有三个类来支持用户实现事务消息,第一个类是 LocalTransactionExecuter,用来实例化步骤 3 的逻辑,根据情况返回 ROLLBACK 或者 COMMIT 状态。第二个类是 TransactionMQProducer,它的用法和 DefaultMQProducer 类似,要通过它启动一个 Producer 并发消息,但是比 DefaultMQProducer 多设置本地事务处理函数和回查状态函数。第三个类是 TransactionCheckListener,实现步骤 5 中 MQ 服务器的回查请求,返回 ROLLBACK 或者 COMMIT 状态。

存储队列位置信息

RocketMQ 中,一种类型的消息会放到一个 Topic 里,为了能够并行,一般一个 Topic 会有多个 Message Queue (也可以设置成一个), Offset 是指某个 Topic 下的一条消息在某个 Message Queue 里的位置,通过 Offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后继续处理。

下图所示是 Offset 的类结构,主要分为本地文件类型和 Broker 代存的类型两种。

对于 DefaultMQPushConsurner 来说,默认是 CLUSTERING 模式,也就是同一个 ConsumerGroup 里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。在 DefaultMQPushConsumer 里的 BROADCASTING 模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰,RocketMQ 使用 LocalfileOffsetStore,把 Offset 存到本地。

在使用 DefaultMQPushConsumer 的时候,Broker 帮助管理了 Offset。但是如果使用 PullConsumer,需要自己处理 OffsetStore 了。代码实现里把 Offset 存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失 Offset,在实际应用中不推荐这样做。接下来给出在磁盘存储 Offset 的示例程序,参照 LocalFileOffsetStore 的源码编写。

public class LocalFileOffsetStoreExt  {
    private final String groupName;
    private final String storePath;
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
            new ConcurrentHashMap<MessageQueue, AtomicLong>();

    public LocalFileOffsetStoreExt(String storePath, String groupName) {
        this.storePath = storePath;
        this.groupName = groupName;
    }

    public void load() {
        OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
        if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
            offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

            for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
                AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                log.info("load consumer's offset, {} {} {}",
                        this.groupName,
                        mq,
                        offset.get());
            }
        }
    }

    public void updateOffset(MessageQueue mq, long offset) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            } else {
                offsetOld.set(offset);
            }
        }
    }

    public long readOffset(final MessageQueue mq) {
        if (mq != null) {
            AtomicLong offset = this.offsetTable.get(mq);
            if (offset != null){
                return offset.get();
            }
        }
        return 0;
    }

    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            if (mqs.contains(entry.getKey())) {
                AtomicLong offset = entry.getValue();
                offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
            }
        }

        String jsonString = offsetSerializeWrapper.toJson(true);
        if (jsonString != null) {
            try {
                MixAll.string2File(jsonString, this.storePath);
            } catch (IOException e) {
                log.error("persistAll consumer offset Exception, " + this.storePath, e);
            }
        }
    }

    private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
        String content = null;
        try {
            content = MixAll.file2String(this.storePath);
        } catch (IOException e) {
            log.warn("Load local offset store file exception", e);
        }
        if (null == content || content.length() == 0) {
            return null;
        } else {
            OffsetSerializeWrapper offsetSerializeWrapper = null;
            try {
                offsetSerializeWrapper =
                        OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
            } catch (Exception e) {
                log.warn("readLocalOffset Exception, and try to correct", e);
            }
            return offsetSerializeWrapper;
        }
    }
}

Broker

Broker 是 RocketMQ 的核心,大部分‘重量级”工作都是由 Broker 完成的,包括接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持久化存储、消息的 HA 机制以及服务端过滤功能等。

消息储存与发送

分布式队列因为有高可靠性的要求,所以数据要通过磁盘进行持久化存储。

用磁盘存储消息,速度会不会很慢呢?能满足实时性和高吞吐量的要求吗?实际上,磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。

目前的高性能磁盘,顺序写速度可以达到 600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。

消息存储结构

RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 Commitlog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 MessageQueue 都有一个对应的 ConsumeQueue 文件。文件地址在 ${$storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}

CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:${user.home}\store\${commitlog}\${filename}。在 CommitLog 中,一个消息的存储长度是不固定的,RocketMQ 采取一些机制,尽量向 Commit 中顺序写,但是随机读。ConsumeQueue 的内容也会被写到磁盘里作持久存储。

存储机制这样设计有以下几个好处:

  1. CommitLog 顺序写,可以大大提高写入效率。
  2. 虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
  3. 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构,因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 Consume Queues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。

高可用机制

RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个 Broker 是 Master,大于0表明这个 Broker 是 Slave,同时 brokerRole 参数也会说明这个 Broker 是 Master 还是 Slave。Master 角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。

在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。

如何达到发送端的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足,需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。

同步刷盘和异步刷盘

RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer写入 RocketMQ 的时候,有两种写磁盘方式,下面逐一介绍。

  • 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到定程度时,统一触发写磁盘动作,快速写入。
  • 同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

同步刷盘还是异步刷盘,是通过 Broker 配置文件里的 flushDiskType 参数设置的,这个参数被配置成 SYNC_FLUSH、ASYNC_FLUSH 中的一个。

同步复制和异步复制

如果一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。

同步复制方式是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态;异步复制方式是只要 Master 写成功即可反馈给客户端写成功状态。这两种复制方式各有优劣,在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会丢失;在同步复制方式下,如果 Master 出故障,Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统昋吐量。

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是 SYNC_FLUSH 方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把 Master 和 Slave 配置成 ASYNC_FLUSH 的刷盘方式,主从之间配置成 SYNC_MASTER 的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

可靠性优先的使用场景

可靠性优先,指的是消息不受单点依赖,即使某台机器出现极端故障也不会丢消息。

推荐的配置如下:

  1. 多 Master,每个 Master 带有 Slave
  2. 主从之间设置成 SYNC_MASTER
  3. Producer 用同步方式写
  4. 刷盘策略设置成 SYNC_FLUSH

消息顺序消费

在某些业务场景下,必须保证消息的产生和消费顺序一致,如订单的生成、付款、发货。

,在有些业务逻辑下,必须保证顺序。比如,这3个消息必须按顺序处理才行顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指;

全局消息顺序消费

全局消息顺序消费是指某个 Topic 下的所有消息都要保证顺序消费。

RocketMQ 在默认情况下不保证顺序,比如创建一个 Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写入的顺序是否一致是不确定的。

要保证全局顺序消息,需要先把 Topic 的读写队列数设置为一,然后 Producer 和 Consumer 的并发设置也要是一。简单来说,为了保证整个 Topic 的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。这时高并发、高吞吐量的功能完全用不上了。

在实际应用中,更多的是像订单类消息那样,只需要部分有序即可。在这种情况下,我们经过合适的配置,依然可以利用 RocketMQ 高并发、高吞吐量的能力。

部分消息顺序消费

部分消息顺序消费是指保证每一组消息被顺序消费即可。

要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务 ID 的消息发送到同一个 Message Queue;在消费过程中,要做到从同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序。

在实现方面,需要服务端的 MessageQueueSelector 控制将相同业务 ID 的消息发送给同一个 Message Queue,并且消费端的 MessageListenerOrderly 配合解决单 MessageQueue 被并发处理的问题。

消息重复

对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,也就是所谓的“有且仅有一次”。RocketMQ 选择了确保一定投递,保证消息不丢失,但有可能造成消息重复。

消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重复就是个大概率事件。比如 Producer 有个函数 setRetryTimesWhenSendFailed,设置在同步方式下自动重试的次数,默认值是 2,这样当第一次发送消息时,Broker 端接收到了消息但是没有正确返回发送成功的状态,就造成了消息重复。

解决消息重复有两种方法:第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同);另一种方法是维护一个已消费消息的记录,消费前查询这个消息是否被消费过。这两种方法都需要使用者自己实现。

动态增减机器

一个消息队列集群由多台机器组成,持续稳定地提供服务,因为业务需求或硬件故障,经常需要增加或减少各个角色的机器,下面介绍如何在不影响服务稳定性的情况下动态地增减机器。

动态增减NameServer

NameServer 是 RocketMQ 集群的协调者,集群的各个组件是通过 NameServer 获取各种属性和地址信息的。主要功能包括两部分:

  1. 各个 Broker 定期上报自己的状态信息到 NameServer。
  2. 各个客户端,包括 Producer、Consumer,以及命令行工具,通过 NameServer 获取最新的状态信息。

所以,在启动 Broker、生产者和消费者之前,必须告诉它们 NameServer 的地址,为了提高可靠性,建议启动多个 NameServer。NameServer 占用资源不多,可以和 Broker 部署在同一台机器。有多个 NameServer 后,减少某个 NameServer 不会对其他组件产生影响。

有四种方式可设置 NameServer 的地址,下面按优先级由高到低依次介绍:

  1. 通过代码设置,比如在 Producer 中,通过 Producer.setNamesrvAddr("name-server-ip1:port;name-server-ip2:port"); 来设置。在 mqadmin 命令行工具中,是通过 -n name-server-ip1:port;name-server-ip2:port 参数来设置的,如果自定义了命令行工具,也可以通过 defaultMQAdminExt.setNamesrvAddr("name-server-ip1:port;name-server-ip2:port"); 来设置
  2. 使用 Java 启动参数设置,对应的 option 是 rocketmq.namesrv.addr
  3. 通过 Linux 环境变量设置,在启动前设置变量:NAMESRV_ADDR
  4. 通过 HTTP 服务来设置,当上述方法都没有使用,程序会向一个 HTTP 地址发送请求来获取 NameServer 地址,默认的URL是 http://jmenv.tbsite.net:8080/rocketmq/nsaddr(淘宝的测试地址),通过设置 rocketmq.namesrv.domain 参数来覆盖 jmenv.tbsite.net:8080;通过 rocketmq.namesrv.domain.subgroup 参数来覆盖 nsaddr

第 4 种方式是唯一支持动态增加 NameServer,无须重启其他组件的方式。使用这种方式后其他组件会每隔 2 分钟请求一次该 URL,获取最新的 NameServer 地址。

动态增减Broker

只增加 Broker 不会对原有的 Topic 产生影响,原来创建好的 Topic 中数据的读写依然在原来的那些 Broker 上进行。集群扩容后,一是可以把新建的 Topic 指定到新的 Broker 机器上,均衡利用资源;另一种方式是通过 updateTopic 命令更改现有的 Topic 配置,在新加的 Broker 上创建新的队列。

如果因为业务变动或者置换机器需要减少 Broker,此时该如何操作呢?减少 Broker 要看是否有持续运行的 Producer,当一个 Topic 只有一个 Master Broker,停掉这个 Broker 后,消息的发送肯定会受到影响,需要在停止这个 Broker 前,停止发送消息。

当某个 Topic 有多个 Master Broker,停了其中一个,这时候是否会丢失消息呢?答案和 Producer 使用的发送消息的方式有关,如果使用同步方式 send(msg) 发送,在 DefaultMQProducer 内部有个自动重试逻辑,其中一个 Broker 停了,会自动向另一个 Broker 发消息,不会发生丢消息现象。如果使用异步方式发送 send(msg, callback),或者用 sendOneWay(msg) 方式,会丢失切换过程中的消息。因为在异步和 sendOneWay(msg) 这两种发送方式下,Producer.setRetryTimesWhenSendFailed() 设置不起作用,发送失败不会重试。DefaultMQProducer 默认每 30 秒到 NameServer 请求最新的路由消息,Producer 如果获取不到已停止的 Broker 下的队列信息,后续就自动不再向这些队列发送消息。

故障对消息的影响

假设所有情况都是在 Topic 配有多个带有 Master-Slave Broker 的 RocketMQ 集群上进行的。

  1. Broker正常关闭,启动;
  2. Broker 异常 Crash,然后启动;
  3. OS Crash,重启;
  4. 机器断电,但能马上恢复供电;
  5. 磁盘损坏;
  6. CPU、主板、内存等关键设备损坏。

Broker 正常关闭、启动

该问题属于可控软件问题,内存数据并不会丢失。

如果重启过程中有持续运行的 Consumer,Master 机器出故障后,Consumer 会自动重连到对应的 Slave 机器,不会有消息丢失和偏差。当 Master 角色的机器重启以后,Consumer 又会重新连接到 Master 机器。

注意:在启动 Master 机器的时候,如果 Consumer 正在从 Slave 消费消息,不要停止 Consumer。假如此时先停止 Consumer 后再启动 Master 机器,然后再启动 Consumer,这个时候 Consumer 就会去读 Master 机器上已经滞后的 offset 值,造成消息大量重复。

如果有持续运行的 Producer,一台 Broker Master 出故障后,Producer 只能向 Topic 下其他的 Broker Master 机器发送消息,如果 Producer 采用同步发送方式,不会有消息丢失。

软件故障

  1. Broker 异常 Crash 后启动
  2. OS Crash 后重启
  3. 机器断电,但能马上恢复供电

以上三种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果 Master、Slave 都配置成 SYNC_FLUSH,可以达到和第 1 种情况相同的效果。

硬件故障

  1. 磁盘损坏
  2. CPU、主板、内存等关键设备损坏

以上两种情况属于硬件故障,原有机器的磁盘数据可能会丢失。如果 Master 和 Slave 机器间配置成同步复制方式,某一台机器发生硬件故障,也可以达到消息不丢失的效果。如果 Master 和 Slave 机器间是异步复制,两次 Sync 间的消息会丢失。

消息优先级

有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ 是个先入先出的队列,不支持消息级别或者 Topic 级别的优先级。业务中简单的优先级需求,可以通过以下三个思路间接地解决。

  1. 根据优先级区分成不同的 Topic

  2. 根据工作量区分成不同的 Topic。

    举个实际应用场景:一个订单处理系统,接收从 100 家快递门店过来的请求,把这些请求通过 Producer 写入 RocketMQ;订单处理程序通过 Consumer 从队列里读取消息并处理,每天最多处理 1 万单。如果这 100 个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出 2 万单消息请求,这样其他的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处理,显然很不公平。

    这时可以创建一个 Topic,设置 Topic 的 MessageQueue 数量超过 100 个,Producer 根据订单的门店号,把每个门店的订单写入一个 MessageQueue。DefaultMQPushConsumer 默认是采用循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。(可以通过调整 pullBatchSize 来改变 Consumer 一次从 MessageQueue 读取消息的个数)

  3. 不区分 Topic,改变 Producer.pullMessage() 的遍历逻辑和 Consumer.consumeMessage() 的读取逻辑。

吞吐量优先的使用场景

吞吐量优先的场景主要从四个方面考虑处理大流量:

  • 提升 Producer 的发送强度
  • 通过 Broker 进行信息过滤
  • 提升 Consumer 的消费能力
  • Consumer 的负载均衡

提高Producer发送强度

发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求,三是服务器向客户端返回应答,一次消息的发送耗时是上述三个步骤的总和。

在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用 OneWay 方式发送,OneWay 方式只发送请求不等待应答,即将数据写入客户端的 Socket 缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。

另一种提高发送速度的方法是增加 Producer 的并发量,使用多个 Producer 同时发送,我们不用担心多 Producer 同时写会降低消息写磁盘的效率, RocketMQ 引入了一个并发窗口,在窗口内消息可以并发地写入 DirectMem 中,然后异步地将连续一段无空洞的数据刷入文件系统当中。顺序写 CommitLog 可让 RocketMQ 无论在 HDD 还是 SSD 磁盘情况下都能保持较高的写入性能。目前在阿里内部经过调优的服务器上,写入性能达到 90万+ 的 TPS,我们可以参考这个数据进行系统优化在 Linux 操作系统层级进行调优,推荐使用 EXT4 文件系统,IO 调度算法使用 deadline 算法。如下图所示,EXT4 创建/删除文件的性能比 EXT3 及其他文件系统要好,因为 RocketMQ 会有频繁的创建/删除动作。

在Broker端进行消息过滤

在 Broker 端进行消息过滤,可以减少无效消息发送到 Consumer,少占用网络带宽从而提高吞吐量。

通过消息的Tag和Key进行过滤

对于应用来说,尽可能只用一个 Topic 来传递消息。不同的消息子类型用 Tag 来标识(每条消息只能有一个 Tag)。服务器端基于 Tag 进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了 Tag 以后,消费方在订阅消息时,才可以利用 Tag 在 Broker 端做消息过滤。

其次是消息的 Key。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。所以这个 Key 一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker 会创建专门的索引文件,来存储 Key 到消息的映射,由于是哈希索引,应尽量使 Key 唯一,避免潜在的哈希冲突。

Tag 和 Key 的主要差别是使用场景不同,Tag 用在 Consumer 的代码中,用来进行服务端消息过滤,Key 主要用于通过命令行查询消息。

MessageSelector.byTag("CUSTOM_TAG");

通过SQL表达式方式进行过滤

使用 Tag 方式过滤虽然高效,但是支持的逻辑比较简单,在构造 Message 的时候,还可以通过 putUserProperty 函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑。

需要知道的是,Tag、Key 和设置的自定义属性值一样,均储存在 Message 的 Map<String, String> properties 字段中。

目前只支持在 PushConsumer 中实现这种过滤)

MessageSelector.bySql("a between 0 and 3");

SQL 表达式方式的过滤需要 Broker 先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有 Tag 方式高效。

FilterServer方式过滤

Filter Server 是一种比 SQL 表达式更灵活的过滤方式,允许用户自定义 Java 函数,根据 Java 函数的逻辑对消息进行过滤。

要使用 Filter Server,首先要在启动 Broker 前在配置文件里加上 FilterServerNums=3 这样的配置,Broker 在启动的时候,就会在本机启动 3 个 FilterServer 进程。FilterServer 类似一个 RocketMQ 的 Consumer 进程,它从本机 Broker 获取消息,然后根据用户上传过来的 Java 函数进行过滤,过滤后的消息再传给远端的 Consumer。这种方式会占用很多 Broker 机器的 CPU 资源,要根据实际情况谨慎使用。上传的 Java 代码也要经过检査,不能有申请大内存、创建线程等这样的操作,否则容易造成 Broker 服务器宕机。

提高Consumer处理能力

当 Consumer 的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高 Consumer 的处理能力。

  1. 提高消费并行度在同一个 Consumer Group 下(Clustering方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个 Consumer 进程都可以增加 Consumer 实例数。还可以修改单个 Consumer 实例中的并行处理的线程数来提高吞吐量。

注意:总的 Consumer 数量不要超过 Topic 下 Read Queue 数量,超过的 Consumer 实例接收不到消息。

  1. 以批量方式进行消费某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库,一次 update 10 条的时间会大大小于十次 update 1 条数据的时间。这时可以通过批量方式消费来提高消费的吞吐量。
  1. 检测延时情况,跳过非重要消息。Consumer 在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使 Consumer 尽快追上 Producer 的进度。

    //当某个队列的消息数堆积到90000条以上,就直接丢弃,以便快速追上发送消息的进度。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 90000) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        //handle message
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

需要了解的是,上述提高 Consumer 处理能力的三种方法相关配置:

  • 设置 Consumer 并行线程:consumeThreadMin 和 consumeThreadMax
  • 设置 Consumer 批量消费:consumeMessageBatchMaxSize (最多可收到 N 条消息)

Consumer负载均衡

想要提高 Consumer 的处理速度,可以启动多个 Consumer 并发处理,这个时候就涉及如何在多个 Consumer 之间负载均衡的问题。

要做负载均衡,必须知道一些全局信息,也就是一个 Consumer Group 里到底有多少个 Consumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个 Consumer。

在 RocketMQ 中,负载均衡或者消息分配是在 Consumer 端代码中完成的,Consumer 从 Broker 处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

负载均衡的结果与 Topic 的 Message Queue 数量,以及 Consumer Group 里的 Consumer 的数量有关。负载均衡的分配粒度只到 Message Queue,把 Topic下的所有 Message Queue 分配到不同的 Consumer 中,所以 Message Queue和 Consumer的数量关系,或者整除关系影响负载均衡结果。

DefaultMQPushConsumer的负载均衡

DefaultMQPushConsumer 的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个 DefaultMQPushConsumer 启动后,会马上会触发一个 doRebalance 动作;而且在同一个 ConsumerGroup 里加入新的 DefaultMQPushConsumer 时,各个 Consumer 都会被触发 doRebalance 动作。

org.apache.rocketmq.client.consumer.rebalance 包下有六种负载均衡策略实现。

1. AllocateMessageQueueAveragely 平均负载策略

RocketMQ 默认使用的就是这种方式,如果某个 Consumer 集群,订阅了某个 Topic,Topic 下的 MessageQueue 会被平均分配给集群中的 Consumer。

以下是具体实现:

//consumer的排序后的
int index = cidAll.indexOf(currentCID);
//取模
int mod = mqAll.size() % cidAll.size();
//如果队列数小于消费者数量,则将分到队列数设置为1,如果余数大于当前消费者的index,则
//能分到的队列数+1,否则就是平均值
int averageSize =
  mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                       + 1 : mqAll.size() / cidAll.size());
//consumer获取第一个MessageQueue的索引
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 如果消费者大于队列数,rang会是负数,循环也就不会执行 
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
  result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

2. AllocateMessageQueueAveragelyByCircle 环形平均负载策略

和平均负载策略唯一的区别就是,在分队列的时候的选组策略不同。但环形平均分配和平均分配,每个 Consumer 拿到的 MessageQueue 数量是不变的。

以下是具体实现:

//当前consumer排序后的索引 
int index = cidAll.indexOf(currentCID);  
//index会是consumer第一个拿到的消息队列索引
for (int i = index; i < mqAll.size(); i++) {
  //这里采用了取模的方式
  if (i % cidAll.size() == index) { 
    result.add(mqAll.get(i));  
  }
}

3. AllocateMessageQueueByMachineRoom 机房负载策略

这个策略就是当前 Consumer 只负载处在指定的机房内的 MessageQueue,还有 brokerName 的命名必须要按要求的格式来设置:机房名@brokerName。

以下是用法参考:

AllocateMessageQueueByMachineRoom allocateMachineRoom = new AllocateMessageQueueByMachineRoom();
//指定机房名称  machine_room1、machine_room2
allocateMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("machine_room1","machine_room2")));

以下是具体实现:

 //当前consumer的下标
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
  return result;
}
//符合机房条件的队列
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //brokerName命名规则   machine_room1@broker-a
  String[] temp = mq.getBrokerName().split("@");
  //判断是否符合指定的机房条件
  if (temp.length == 2 && consumeridcs.contains(temp[0])) {
    premqAll.add(mq);
  }
}
//分配到的队列数
int mod = premqAll.size() / cidAll.size();
//取模
int rem = premqAll.size() % cidAll.size();
//当前分配到的第一个队列索引
int startIndex = mod * currentIndex;
//分配到的最后一个队列索引
int endIndex = startIndex + mod;
//取startIndex到endIndex的队列
for (int i = startIndex; i < endIndex; i++) {
  result.add(mqAll.get(i));
}
//MessageQueue数量和Consumer不是整数倍时  有点像平均分配因为队列下标取到的也是连续的
if (rem > currentIndex) {
  result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}

4. AllocateMachineRoomNearby 机房邻近负载策略

该策略的处理方式要比 AllocateMessageQueueByMachineRoom 更加灵活,还考虑到了那些同机房只有 MessageQueue 却没有 Consumer 的情况。使用该策略需要实现 AllocateMachineRoomNearby.MachineRoomResolver,来区分每个 Broker 处于哪个机房。

具体用法参考 org.apache.rocketmq.client.consumer.rebalance.AllocateMachineRoomNearByTest

先同机房的 Consumer 和 MessageQueue 进行负载,这里按照平均负载来分(根据创建机房就近策略使用的负载策略),然后将游离态的 Message Queue 通过设置的负载策略来分。

以下是具体实现:

//消息队列按机房分组
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
  //机房不为空,将broker放到分组中
  if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    if (mr2Mq.get(brokerMachineRoom) == null) {
      mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    }
    mr2Mq.get(brokerMachineRoom).add(mq);
  } else {
    throw new IllegalArgumentException("Machine room is null for mq " + mq);
  }
}

//consumer按机房分组
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
  if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    if (mr2c.get(consumerMachineRoom) == null) {
      mr2c.put(consumerMachineRoom, new ArrayList<String>());
    }
    mr2c.get(consumerMachineRoom).add(cid);
  } else {
    throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
  }
}

//当前consumer分到的所有MessageQueue
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

//1.给当前consumer分当前机房的那些MessageQeueue
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
//得到当前机房的MessageQueue
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
//得到当前机房的Consumer
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
  //得到当前机房所有MessageQueue和Consumers后根据指定的策略再负载
  allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}

//2.如果该MessageQueue的机房 没有同机房的consumer,将这些MessageQueue按配置好的备用策略分配给所有的consumer
for (String machineRoom : mr2Mq.keySet()) {
  if (!mr2c.containsKey(machineRoom)) { 
    //添加分配到的游离态MessageQueue
    allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
  }
}

5. AllocateMessageQueueConsistentHash 一致性哈希负载策略

一致性哈希有一个哈希环的概念,哈希环由数值 0 到 2^32-1 组成,不管内容多长的字符,经过哈希计算都能得到一个等长的数字,最后都会落在哈希环上的某个点,哈希环上的点都是虚拟的,比如我们这里使用 Consumer 的 ID 来进行哈希计算,得到的这几个是物理的点,然后把得到的点存到 TreeMap 里面,然后将所有的 MessageQueue 依次进行同样的哈希计算,得到距离 MessageQueue 顺时针方向最近的那个 Consumer 点,这个就是 MessageQueue 最终归属的那个 Consumer。

以下是具体实现:

//将所有consumer变成节点 到时候经过hash计算 分布在hash环上
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
  cidNodes.add(new ClientNode(cid));
}

final ConsistentHashRouter<ClientNode> router; 
//构建哈希环
if (customHashFunction != null) {
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
  //默认使用MD5进行Hash计算
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}

List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //对messageQueue进行hash计算,找到顺时针最近的consumer节点
  ClientNode clientNode = router.routeNode(mq.toString());
  //判断是否是当前consumer
  if (clientNode != null && currentCID.equals(clientNode.getKey())) {
    results.add(mq);
  }
}

6. AllocateMessageQueueByConfig 自定义负载策略
用户自定义配置,用户在创建 Consumer 的时候,可以设置要使用的负载策略,如果我们设置为 AllocateMessageQueueByConfig 方式时,我们可以自己指定需要监听的 MessageQueues。

DefaultMQPullConsumer的负载均衡

Pull Consumer 可以看到所有的 Message Queue,而且从哪个 Message Queue 读取消息,读消息时的 Offset 都由使用者控制,使用者可以实现任何特殊方式的负载均衡。DefaultMQPullConsumer 可以通过 registerMessageQueueListener 函数监听 Consumer 加入/退出事件以及 MQPullConsumerScheduleService 类更改 Pull 逻辑实现自定义的负载均衡。

源码阅读环境设置

编译

mvn -Prelease-all -DskipTests clean install -U

NameServer 启动流程

  • 配置 VM 参数

    • 方式一:修改createNamesrvController初始化代码

    • 方式二:设置 VM 参数

      # RocketMQ HOME
      -Drocketmq.home.dir=/Server/rocketmq
      # RocketMQ Log Dir
      -Duser.home=/Server/rocketmq/user.home
  • 配置 Program arguments

    # host:ip
    -n localhost:9876
  • 启动 namesrv

    org.apache.rocketmq.namesrv.NamesrvStartup#main()

Broker 启动流程

  • 配置 VM 参数

    • 方式一:修改createNamesrvController初始化代码

    • 方式二:设置 VM 参数

      # RocketMQ HOME
      -Drocketmq.home.dir=/Server/rocketmq
      # RocketMQ Log Dir
      -Duser.home=/Server/rocketmq/user.home
  • 配置 Program arguments

    # host:ip
    ‐n 127.0.0.0:9876 -c /Server/rocketmq/conf/broker.conf
  • 启动 namesrv

    org.apache.rocketmq.broker.BrokerStartup#main()

如果启动时打印 The broker[broker-a, 169.254.235.113:10911] boot success.

IP 与设置的 127.0.0.1 是不同的,原因是多网卡的问题,需要在 broker.conf 配置 brokerIP 配置项。

  1. brokerIP1 为当前 Broker 监听的 IP;
  2. brokerIP2 为存在 Broker 主从时,在 Broker 主节点上配置了 brokerIP2 的话,Broker 从节点会连接主节点配置的 brokerIP2 来同步。

默认不配置 brokerIP1和 brokerIP2 时,都会根据当前网卡选择一个 IP 使用,当你的机器有多块网卡时,很有可能会有问题。比如,机器上有两个 IP,一个公网 IP,一个私网 IP,结果默认选择的走公网 IP,这是不正确的,我期望的是所有业务内部通信都走内网。

Producer 和 Consumer 执行流程

  • 执行 org.apache.rocketmq.example.quickstart.Producer
  • 执行 org.apache.rocketmq.example.quickstart.Consumer

如果出现 No Topic Route Info 错误,即 Broker 不能根据 Message 自动创建 Topic。

解决方案:增加 broker.conf 中增加配置 autoCreateTopicEnable = true