RocketMQ DLedger主从切换
流程图
BrokerController#initialize()
1
2
3
4
5
6
7
8
9
10
11
12
13
| /**
* Broker初始化
* @return boolean
* @throws CloneNotSupportedException 不支持克隆异常
*/
public boolean initialize() throws CloneNotSupportedException {
//是否启动Dledger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
//增加节点状态变更事件监听器
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
}
|
DLedgerRoleChangeHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| /**
* Dledger角色变化处理器
*/
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
@Override public void handle(long term, MemberState.Role role) {
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
//当前节点状态CANDIDATE,正在发起选举Leader节点
case CANDIDATE:
//当前节点如果不是从节点,切换为从节点
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId());
break;
//选举成为Leader
case LEADER:
while (true) {
if (!dLegerServer.getMemberState().isLeader()) {
succ = false;
break;
}
//表示当前节点还未又数据转发,直接跳出循环,无需等待
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
break;
}
//则必须等待数据都已提交,即LedgerEndIndex与CommittedIndex 相等。
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
//且需要等待CommitLog日志全部已转发到ConsumeQueue中
&& messageStore.dispatchBehindBytes() == 0) {
break;
}
Thread.sleep(100);
}
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
}
}
};
executorService.submit(runnable);
}
}
|
BrokerController#changeToSlave()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| /**
* 切换为从节点
* @param brokerId brokerId
*/
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role 设置brokerId,如果是0就设置为1
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
//设置节点角色
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
//关闭延迟消息定时调度线程
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
//关闭事务消息状态回查处理器
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise 从节点启动元数据同步处理器
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
//向集群内所有的nameserver告知broker信息状态的变更
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
|
BrokerController#changeToMaster()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| /**
* 切换为主节点
* @param role 角色
*/
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise 关闭元数据同步器,因为主节点无需同步
handleSlaveSynchronize(role);
//handle the scheduled service
try {
//开启定时任务处理线程
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
try {
//开启事务状态回查处理线程
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
//设置brokerId和brokerRole
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
//向nameserver立即发送心跳包以便告知broker 服务器当前最新的状态
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
|
BrokerController#startProcessorByHa()
1
2
3
4
5
6
7
8
9
10
11
| /**
* 如果是主节点开启事务消息回查线程
* @param role 节点角色
*/
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
|
BrokerController#shutdownProcessorByHa()
1
2
3
4
5
6
7
8
| /**
* 关闭事务状态回查处理器,当节点从主节点变更为从节点后,该方法被调用
*/
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
|
BrokerController#handleSlaveSynchronize()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| /**
* 从节点向主节点处理元数据同步
* @param role 节点角色
*/
private void handleSlaveSynchronize(BrokerRole role) {
//当前节点是从节点
if (role == BrokerRole.SLAVE) {
//取消从节点同步Future
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
//设置主节点地址为空
this.slaveSynchronize.setMasterAddr(null);
//每隔10S,从节点请求主节点同步一次元数据
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
//如果当前节点是主节点
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
|
问题
消息消费进度是否存在丢失风险
- 首先,由于 RocketMQ 元数据,当然也包含消息消费进度的同步是采用的从服务器 定时向主服务器拉取进行更新,存在时延,引入 DLedger 机制,也并不保证其一致性, DLedger 只保证 CommitLog 文件的一致性。
- 当主节点宕机后,各个从节点并不会完成同 步了消息消费进度,于此同时,消息消费继续,此时消费者会继续从从节点拉取消息进行消 费,但汇报的从节点并不一定会成为新的主节点,故消费进度在 broker 端存在丢失的可 能性。当然并不是一定会丢失,因为消息消费端只要不重启,消息消费进度会存储在内存中。
- 综合所述,消息消费进度在 broker 端会有丢失的可能性,存在重复消费的可能性,不过 问题不大,因为 RocketMQ 本身也不承若不会重复消费。
消息是否存在丢失风险
消息会不会丢失的关键在于,日志复制进度的从节点是否可以被选举为主节点,如果在一个集群中,从节点的复制进度落后与从主节点,但当主节点宕机后,如果该从节点被选举 成为新的主节点,那这将是一个灾难,将会丢失数据。如果发起投票节点的复制进度比自己小的话,会投拒绝票。 必须得到集群内超过半数节点认可,即最终选举出来的主节点的当前复制进度一定是比 绝大多数的从节点要大,并且也会等于承偌给客户端的已提交偏移量。故得出的结论是不会丢消息。