RocketMQ-20丨RocketMQ DLedger主从切换

Posted by jiefang on July 22, 2021

RocketMQ DLedger主从切换

流程图

BrokerController#initialize()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * Broker初始化
 * @return boolean
 * @throws CloneNotSupportedException 不支持克隆异常
 */
public boolean initialize() throws CloneNotSupportedException {
    //是否启动Dledger
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        //增加节点状态变更事件监听器
        DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
        ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
    }
}

DLedgerRoleChangeHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
 * Dledger角色变化处理器
 */
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
    @Override public void handle(long term, MemberState.Role role) {
        Runnable runnable = new Runnable() {
            @Override public void run() {
                long start = System.currentTimeMillis();
                try {
                    boolean succ = true;
                    log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                    switch (role) {
                        //当前节点状态CANDIDATE,正在发起选举Leader节点
                        case CANDIDATE:
                            //当前节点如果不是从节点,切换为从节点
                            if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                                brokerController.changeToSlave(dLedgerCommitLog.getId());
                            }
                            break;
                        case FOLLOWER:
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                            break;
                        //选举成为Leader
                        case LEADER:
                            while (true) {
                                if (!dLegerServer.getMemberState().isLeader()) {
                                    succ = false;
                                    break;
                                }
                                //表示当前节点还未又数据转发,直接跳出循环,无需等待
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                    break;
                                }
                                //则必须等待数据都已提交,即LedgerEndIndex与CommittedIndex 相等。
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                        //且需要等待CommitLog日志全部已转发到ConsumeQueue中
                                    && messageStore.dispatchBehindBytes() == 0) {
                                    break;
                                }
                                Thread.sleep(100);
                            }
                            if (succ) {
                                messageStore.recoverTopicQueueTable();
                                brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                            }
                            break;
                        default:
                            break;
                    }
                    log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
                } catch (Throwable t) {
                    log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
                }
            }
        };
        executorService.submit(runnable);
    }
}

BrokerController#changeToSlave()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * 切换为从节点
 * @param brokerId brokerId
 */
public void changeToSlave(int brokerId) {
    log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);

    //change the role 设置brokerId,如果是0就设置为1
    brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
    //设置节点角色
    messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

    //handle the scheduled service
    try {
        //关闭延迟消息定时调度线程
        this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
    }

    //handle the transactional service
    try {
        //关闭事务消息状态回查处理器
        this.shutdownProcessorByHa();
    } catch (Throwable t) {
        log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
    }

    //handle the slave synchronise 从节点启动元数据同步处理器
    handleSlaveSynchronize(BrokerRole.SLAVE);

    try {
        //向集群内所有的nameserver告知broker信息状态的变更
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
    } catch (Throwable ignored) {

    }
    log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}

BrokerController#changeToMaster()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * 切换为主节点
 * @param role 角色
 */
public void changeToMaster(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {
        return;
    }
    log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());

    //handle the slave synchronise 关闭元数据同步器,因为主节点无需同步
    handleSlaveSynchronize(role);

    //handle the scheduled service
    try {
        //开启定时任务处理线程
        this.messageStore.handleScheduleMessageService(role);
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
    }

    //handle the transactional service
    try {
        //开启事务状态回查处理线程
        this.startProcessorByHa(BrokerRole.SYNC_MASTER);
    } catch (Throwable t) {
        log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
    }

    //if the operations above are totally successful, we change to master
    //设置brokerId和brokerRole
    brokerConfig.setBrokerId(0); //TO DO check
    messageStoreConfig.setBrokerRole(role);

    try {
        //向nameserver立即发送心跳包以便告知broker 服务器当前最新的状态
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
    } catch (Throwable ignored) {
    }
    log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}

BrokerController#startProcessorByHa()

1
2
3
4
5
6
7
8
9
10
11
/**
 * 如果是主节点开启事务消息回查线程
 * @param role 节点角色
 */
private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}

BrokerController#shutdownProcessorByHa()

1
2
3
4
5
6
7
8
/**
 * 关闭事务状态回查处理器,当节点从主节点变更为从节点后,该方法被调用
 */
private void shutdownProcessorByHa() {
    if (this.transactionalMessageCheckService != null) {
        this.transactionalMessageCheckService.shutdown(true);
    }
}

BrokerController#handleSlaveSynchronize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * 从节点向主节点处理元数据同步
 * @param role 节点角色
 */
private void handleSlaveSynchronize(BrokerRole role) {
    //当前节点是从节点
    if (role == BrokerRole.SLAVE) {
        //取消从节点同步Future
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        //设置主节点地址为空
        this.slaveSynchronize.setMasterAddr(null);
        //每隔10S,从节点请求主节点同步一次元数据
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.slaveSynchronize.syncAll();
                }
                catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    //如果当前节点是主节点
    } else {
        //handle the slave synchronise
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

问题

消息消费进度是否存在丢失风险

  • 首先,由于 RocketMQ 元数据,当然也包含消息消费进度的同步是采用的从服务器 定时向主服务器拉取进行更新,存在时延,引入 DLedger 机制,也并不保证其一致性, DLedger 只保证 CommitLog 文件的一致性。
  • 当主节点宕机后,各个从节点并不会完成同 步了消息消费进度,于此同时,消息消费继续,此时消费者会继续从从节点拉取消息进行消 费,但汇报的从节点并不一定会成为新的主节点,故消费进度在 broker 端存在丢失的可 能性。当然并不是一定会丢失,因为消息消费端只要不重启,消息消费进度会存储在内存中。
  • 综合所述,消息消费进度在 broker 端会有丢失的可能性,存在重复消费的可能性,不过 问题不大,因为 RocketMQ 本身也不承若不会重复消费。

消息是否存在丢失风险

消息会不会丢失的关键在于,日志复制进度的从节点是否可以被选举为主节点,如果在一个集群中,从节点的复制进度落后与从主节点,但当主节点宕机后,如果该从节点被选举 成为新的主节点,那这将是一个灾难,将会丢失数据。如果发起投票节点的复制进度比自己小的话,会投拒绝票。 必须得到集群内超过半数节点认可,即最终选举出来的主节点的当前复制进度一定是比 绝大多数的从节点要大,并且也会等于承偌给客户端的已提交偏移量。故得出的结论是不会丢消息。