0%

RocketMQ 源码解析

RocketMQ 可以分为 NameServer、Consumer、Broker、Producer 四大组件,它们相互调用构成了完整的 Produce-Consume 模式。

NameServer

NameServer 在 RocketMQ 的使用过程中主要是做分布式协调、通知的作用,通过接收其他角色定期上报的状态信息,协调各个角色之间的调用关系(包括在角色下线后从自身的可用列表中移除)。

协调各角色之间的关系

以创建 Topic 为例,创建 Topic 的命令先发往对应的 Broker,在 Broker 接收到创建请求后,执行具体的创建逻辑。结束后向 NameServer 发送注册消息。只有在 NameServer 完成创建 Topic 后,其他客户端才能够发现新增的 Topic。

在 NameServer Module 中 org.apache.rocketmq.namesrv.NamesrvStartup 是模块启动入口,其作用主要是创建和运行 org.apache.rocketmq.namesrv.NamesrvController 帮助协调各功能模块之间的关系。

初始化逻辑处理

//工作线程 默认ServerWorkerThreads=8
this.remotingExecutor =
    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//将封装Netty的通信服务与工作线程池绑定
this.registerProcessor();
//扫描失效Broker 10s一次,超时两分钟则认为Broker失效
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
//打印配置信息 10s一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

核心业务逻辑处理

网络通信模块收到请求调用 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 处理。通过 RequestCode 可看出 NameServer 的主要功能。

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
  
    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

集群状态存储

NameServer 作为集群的协调者,需要保存和维护集群的各种元数据 , 这是通过 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager 类来实现的。

//TOPIC QUEUE 存储了所有 TOPIC 的属性信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//BROKER 存储了Broker和Cluster的名称以及一主多从的地址信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//CLUSTER 储存了Cluster映射Broker的反索引
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//BROKER LIVE 与brokerAddrTable不同的是,它储存的是Brokers的实时状态
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//FILTER 一个Broker可以有多个FilterServer
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

状态维护原理

因为各个角色都会主动向 NameServer 上报状态,所以 NameServer 只需要根据上报消息里的请求码做出响应(具体逻辑在 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor)。

此外,连接断开的事件也会触发状态更新(具体逻辑在 org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService)。

@Override
public void onChannelClose(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
    this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

Remoting模块

RocketMQ 的通信相关代码在 Remote 模块,并在org.apache.rocketmq.remoting.netty.NettyRemotingClientorg.apache.rocketmq.remoting.netty.NettyRemotingServer 中封装了 Netty 的网络库。

Consumer

初始化逻辑处理

首先是初始化 MQClientInstance,并且设置好 rebalance 策略和 pullApi­Wraper,有这些结构后才能发送 pull 请求获取消息。然后是确定 OffsetStore。OffsetStore 里存储的是当前消费者所消费的消息在队列中的偏移量。

根据消费消息方式的不同,OffsetStore 的类型也不同。 如果是 BROADCASTING 模式,使用的是 LocalFileOffsetStore,Offset 存到本地;如果是 CLUSTERING 模式,使用的是 RemoteBrokerOffsetStore,Offset 存到 Broker 机器上。

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

然后是初始化 consumeMessageService,根据对消息顺序需求的不同,使用不同的 Service 类型。

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

获取消息逻辑

获取消息的逻辑实现在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 函数中。

流量控制部分

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
      "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
      this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  }
  return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
      "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
      this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  }
  return;
}

处理返回结果部分

PullCallback pullCallback = new PullCallback() {
  @Override
  public void onSuccess(PullResult pullResult) {
    if (pullResult != null) {
      pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                   subscriptionData);

      switch (pullResult.getPullStatus()) {
        case FOUND:
          long prevRequestOffset = pullRequest.getNextOffset();
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());
          long pullRT = System.currentTimeMillis() - beginTimestamp;
          DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                             pullRequest.getMessageQueue().getTopic(), pullRT);

          long firstMsgOffset = Long.MAX_VALUE;
          if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          } else {
            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
              pullResult.getMsgFoundList(),
              processQueue,
              pullRequest.getMessageQueue(),
              dispatchToConsume);

            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
              DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                     DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
            } else {
              DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
            }
          }

          if (pullResult.getNextBeginOffset() < prevRequestOffset
              || firstMsgOffset < prevRequestOffset) {
            log.warn(
              "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
              pullResult.getNextBeginOffset(),
              firstMsgOffset,
              prevRequestOffset);
          }

          break;
        case NO_NEW_MSG:
        case NO_MATCHED_MSG:
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());

          DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

          DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
          break;
        case OFFSET_ILLEGAL:
          log.warn("the pull request offset illegal, {} {}",
                   pullRequest.toString(), pullResult.toString());
          pullRequest.setNextOffset(pullResult.getNextBeginOffset());

          pullRequest.getProcessQueue().setDropped(true);
          DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

            @Override
            public void run() {
              try {
                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                        pullRequest.getNextOffset(), false);

                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                log.warn("fix the pull request offset, {}", pullRequest);
              } catch (Throwable e) {
                log.error("executeTaskLater Exception", e);
              }
            }
          }, 10000);
          break;
        default:
          break;
      }
    }
  }

  @Override
  public void onException(Throwable e) {
    if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
      log.warn("execute the pull request exception", e);
    }

    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  }
};

发送获取消息请求

try {
    this.pullAPIWrapper.pullKernelImpl(
        pullRequest.getMessageQueue(),
        subExpression,
        subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(),
        pullRequest.getNextOffset(),
        this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag,
        commitOffsetValue,
        BROKER_SUSPEND_MAX_TIME_MILLIS,
        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC,
        pullCallback
    );
} catch (Exception e) {
    log.error("pullKernelImpl exception", e);
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

并发处理逻辑

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService 为例。

定义线程池

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));

将收到的Message打包消费

从 Broker 获取到一批消息以后,根据 BatchSize 的设置,把一批消息封装到一个 ConsumeRequest 中,然后把这个 ConsumeRequest 提交到 consumeExecutor 线程池中执行。

@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

消息消费的状态处理

消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS 和 RECONSUME_LATER。如果消费不成功,要把消息提交到上面说的 scheduledExecutorService 线程池中,5 秒后再执行;如果消费模式是 CLUSTERING 模式,未消费成功的消息会先被发送回 Broker,供这个 ConsumerGroup 里的其他 Consumer 消费,如果发送回 Broker 失败,再调用 CONSUME_LATER,消息消费的 Status 处理逻辑如下。

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

处理逻辑是用户自定义的,当消息量大的时候,处理逻辑执行效率的高低影响系统的吞吐量。可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。

MQClientlnstance

MQClientInstance 是客户端各种类型的 Consumer 和 Producer 的底层类。这个类首先从 NameServer 获取并保存各种配置信息,比如 Topic 的 Route 信息。同时 MQClientlnstance 还会通过 MQClientAPIImpl 类实现消息的收发,也就是从 Broker 获取消息或者发送消息到 Broker。

既然 MQClientInstance 实现的是底层通信功能和获取并保存元数据的功能,就没必要每个 Consumer 或 Producer 都创建一个对象,一个 MQC!ientlnstance 对象可以被多个 Consumer 或 Producer 公用。

RocketMQ 通过一个工厂类达到共用 MQClientlnstance 的目的。

MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

MQClientlnstance 是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

系统中维护了 ConcurrentMap<String/* clientld */, MQClientlnstance> factoryTable 这个 Map 对象,每创建一个新的 MQClientlnstance,都会以 clientld 作为 Key 放入 Map 结构中。clientld 的格式是“clientlp” + @ + “InstanceName”,其中 clientlp 是客户端机器的 IP 地址,一般不会变,instancename 有默认值,也可以被手动设置。

普通情况下,一个用到 RocketMQ 客户端的 Java 程序,或者说一个 JVM 进程只要有一个 MQClientlnstance 实例就够了。这时候创建一个或多个 Consumer 或者 Producer,底层使用的是同一个 MQClientlnstance 实例。但有些情况下只有一个 MQClientlnstance 对象是不够的,比如一个 Java 程序需要连接两个 RoceketMQ 集群,从一个集群读取消息,发送到另一个集群,一个 MQClientlnstance 对象无法支持这种场景。这种情况下一定要手动指定不同的 InstanceName,底层会创建两个 MQClientlnstance 对象。

MQClientlnstance 通过定时任务会执行如下几个操作:获取 NameServer 地址、更新 TopicRoute 信息、清理离线的 Broker 和保存消费者的 Offset。

Broker

主从同步机制

RocketMQ 的 Broker 分为 Master 和 Slave 两个角色,为了保证高可用性,Master 角色的机器接收到消息后,要把内容同步到 Slave 机器上,这样一旦 Master 宕机,Slave 机器依然可以提供服务。Master 和 Slave 角色的 Broker 之间同步信息功能的实现。需要同步的信息分为两种类型,实现方式各不相同:一种是元数据信息,采用基于 Netty 的 command 方式来同步消息;另一种是 CommitLog 信息,同步方式是直接基于 Java NIO 来实现。

同步属性信息

Slave 需要和 Master 同步的不只是消息本身,一些元数据信息也需要同步,比如 TopicConfig 信息、ConsumerOffset 信息、DelayOffset 和 SubscriptionGroupConfig 信息。Broker 在启动的时候,判断自己的角色是否是 Slave,是的话就启动定时同步任务。

private void handleSlaveSynchronize(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.slaveSynchronize.syncAll();
                }
                catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } else {
        //handle the slave synchronise
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

在 syncAll 函数里,调用 syncTopicConfig()syncConsumerOffset()syncDelayOffset()syncSubscriptionGroupConfig() 进行元数据同步。我们以 syncConsumerOffset() 为例,看看底层的具体实现。

private void syncConsumerOffset() {
    String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
        try {
            ConsumerOffsetSerializeWrapper offsetWrapper =
                this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
            this.brokerController.getConsumerOffsetManager().getOffsetTable()
                .putAll(offsetWrapper.getOffsetTable());
            this.brokerController.getConsumerOffsetManager().persist();
            log.info("Update slave consumer offset from master, {}", masterAddrBak);
        } catch (Exception e) {
            log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
        }
    }
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

sysConsumerOffset() 的基本逻辑是组装一个 RemotingCommand,底层通过 Netty 将消息发送到 Master 角色的 Broker,然后获取 Offset 信息。

同步消息体

Master 和 Slave 之间同步消息体内容的方法,也就是同步 CommitLog 内容的方法。CommitLog 和元数据信息不同:首先,CommitLog 的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成 Master 上的元数据内容和 Slave 上的元数据内容不一致,不过这种情况还可以补救(手动调整 Offset,重启 Consumer 等)。CommitLog 在高可靠性场景下如果没有及时同步,一旦 Master 机器出故障,消息就彻底丢失了。所以有专门的代码来实现
Master 和 Slave 之间消息体内容的同步。

主要的实现代码在 Broker 模块的 org.apache.rocketmq.store.ha 包中,里面
包括 HAService、HAConnection 和 WaitNotifyObject 这三个类。

HAService 是实现 commitLog 同步的主体,它在 Master 机器和 Slave 机器上执行的逻辑不同,默认是在 Master 机器上执行。

//BrokerController#initialize()
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
}

当 Broker 角色是 Slave 的时候,MasterAddr 的值会被正确设置,这样
HAService 在启动的时候,在 HAClient 这个内部类中,connectMaster 会被正
确执行,如代码清单所示。

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {

            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

HAClient 试图通过 Java NIO 函数去连接 Master 角色的 Broker。Master 角色有相应的监听代码,如下所示。

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog 的同步,不是经过 netty command 的方式,而是直接进行 TCP 连接,这样效率更高。连接成功以后,通过对比 Master 和 Slave 的 Offset,不断进行同步。

主从同步方式

sync_master 和 async_master 是写在 Broker 配置文件里的配置参数,这个参数影响的是主从同步的方式。sync_master 是同步方式,也就是 Master 角色 Broker 中的消息要立刻同步过去;async_master 是异步方式,也就是 Master 角色 Broker 中的消息是通过异步处理的方式同步到 Slave 角色的机器上的。

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                PutMessageStatus replicaStatus = null;
                try {
                    replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                }
                if (replicaStatus != PutMessageStatus.PUT_OK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }

}

在 CommitLog 类的 putMessage 函数末尾,调用 handleHA 函数。代码中的
关键词是 wakeupAll 和 waitForFlush,在同步方式下,Master 每次写消息的时
候,都会等待向 Slave 同步消息的过程,同步完成后再返回,如代码清单所示。(putMessage函数比较长,仅列出关键的代码)。

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 
    //...

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}