RocketMQ-17丨DLedger选主机制

Posted by jiefang on July 9, 2021

DLedger选主机制

核心类图

  • DLedgerConfig:多副本模块相关的配置信息,例如集群节点信息;
  • MemberState:节点状态机,即raft 协议中的follower、candidate、leader 三种状态的状态机实现;
  • Raft 协议相关:
    • DLedgerClientProtocol:DLedger 客户端协议
      • CompletableFuture get(GetEntriesRequest reques t):客户端从服务器获取日志条目(获取数据);
      • CompletableFuture append(AppendEntryReques t request):客户端向服务器追加日志(存储数据);
      • CompletableFuture metadata(MetadataRequest req uest):获取元数据;
    • DLedgerProtocol:DLedger 服务端协议;
      • CompletableFuture vote(VoteRequest request):发起投票请求;
      • CompletableFuture heartBeat(HeartBeatRequest re quest):Leader 向从节点发送心跳包;
      • CompletableFuture pull(PullEntriesRequest reques t):拉取日志条目;
      • CompletableFuture push(PushEntryRequest reque st):推送日志条件;
    • 协议处理Handler:
      • DLedgerClientProtocolHandler
      • DLedgerProtocolHander
  • DLedgerRpcService:DLedger Server(节点)之间的网络通信,默认基于Netty 实现,其实现类为:DLed gerRpcNettyService;
  • DLedgerLeaderElector:Leader选举实现器;
  • DLedgerServer:Dledger Server,Dledger 节点的封装类;

流程

DLedgerLeaderElector

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
 * Leader选举实现器
 */
public class DLedgerLeaderElector {
    private static Logger logger = LoggerFactory.getLogger(DLedgerLeaderElector.class);
    //随机数生成器
    private Random random = new Random();
    //配置参数
    private DLedgerConfig dLedgerConfig;
    //节点状态机
    private final MemberState memberState;
    //RPC服务,实现向集群内的节点发送心跳包、投票的RPC 实现
    private DLedgerRpcService dLedgerRpcService;

    //as a server handler
    //record the last leader state 上次收到心跳包的时间戳
    private volatile long lastLeaderHeartBeatTime = -1;
    //上次发送心跳包的时间戳
    private volatile long lastSendHeartBeatTime = -1;
    //上次成功收到心跳包的时间戳
    private volatile long lastSuccHeartBeatTime = -1;
    //一个心跳包的周期,默认为2s
    private int heartBeatTimeIntervalMs = 2000;
    //允许最大的N个心跳周期内未收到心跳包,状态为Follower的节点只有超过maxHeartBeatLeak * heartBeatTimeIntervalMs
    //的时间内未收到主节点的心跳包,才会重新进入Candidate状态,重新下一轮的选举。
    private int maxHeartBeatLeak = 3;
    //as a client 发送下一个心跳包的时间戳
    private long nextTimeToRequestVote = -1;
    //是否应该立即发起投票
    private volatile boolean needIncreaseTermImmediately = false;
    //最小的发送投票间隔时间,默认为300ms
    private int minVoteIntervalMs = 300;
    //最大的发送投票的间隔,默认为1000ms
    private int maxVoteIntervalMs = 1000;
    //注册的节点状态处理器,通过addRoleChangeHandler方法添加
    private List<RoleChangeHandler> roleChangeHandlers = new ArrayList<>();
    //最后一次投票结果
    private VoteResponse.ParseResult lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
    //上一次投票的开销
    private long lastVoteCost = 0L;
    //状态机管理器
    private StateMaintainer stateMaintainer = new StateMaintainer("StateMaintainer", logger);
}

DLedgerLeaderElector#startup()

1
2
3
4
5
6
7
//启动选举状态管理器
public void startup() {
    stateMaintainer.start();
    for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
        roleChangeHandler.startup();
    }
}

StateMaintainer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 状态维护管理器
 */
public class StateMaintainer extends ShutdownAbleThread {

    public StateMaintainer(String name, Logger logger) {
        super(name, logger);
    }
    //run方法调用
    @Override public void doWork() {
        try {
            if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
                DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);
                DLedgerLeaderElector.this.maintainState();
            }
            sleep(10);
        } catch (Throwable t) {
            DLedgerLeaderElector.logger.error("Error in heartbeat", t);
        }
    }
}
DLedgerLeaderElector#refreshIntervals()
1
2
3
4
5
6
7
8
9
10
/**
 * 刷新配置
 * @param dLedgerConfig 配置
 */
private void refreshIntervals(DLedgerConfig dLedgerConfig) {
    this.heartBeatTimeIntervalMs = dLedgerConfig.getHeartBeatTimeIntervalMs();
    this.maxHeartBeatLeak = dLedgerConfig.getMaxHeartBeatLeak();
    this.minVoteIntervalMs = dLedgerConfig.getMinVoteIntervalMs();
    this.maxVoteIntervalMs = dLedgerConfig.getMaxVoteIntervalMs();
}
DLedgerLeaderElector#maintainState()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * The core method of maintainer. Run the specified logic according to the current role: candidate => propose a
 * vote. leader => send heartbeats to followers, and step down to candidate when quorum followers do not respond.
 * follower => accept heartbeats, and change to candidate when no heartbeat from leader.
 * 根据当前角色运行指定逻辑:
 * 候选人 => 提议投票。
 * 领导者 => 向追随者发送心跳,并在法定追随者没有响应时下台到候选人。
 * follower => 接受心跳,当leader没有心跳时切换到候选。
 * @throws Exception 异常
 */
private void maintainState() throws Exception {
    if (memberState.isLeader()) {
        //保持作为领导者
        maintainAsLeader();
    } else if (memberState.isFollower()) {
        //保持作为跟随者
        maintainAsFollower();
    } else {
        //保持为候选人
        maintainAsCandidate();
    }
}

状态流转

DLedgerLeaderElector#maintainAsLeader()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 保持为领导者
 * @throws Exception 异常
 */
private void maintainAsLeader() throws Exception {
    //每隔2S发送一次心跳
    if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
        long term;
        String leaderId;
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                //stop sending
                return;
            }
            term = memberState.currTerm();
            leaderId = memberState.getLeaderId();
            lastSendHeartBeatTime = System.currentTimeMillis();
        }
        //向Follower发送心跳包
        sendHeartbeats(term, leaderId);
    }
}

DLedgerLeaderElector#maintainAsFollower()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * 保持作为跟随者
 */
private void maintainAsFollower() {
    //最后一次收到心跳包到现在时间超过4S
    if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
        synchronized (memberState) {
            //最后一次收到心跳包到现在时间超过6S
            if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
                logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
                //将角色变更为候选人
                changeRoleToCandidate(memberState.currTerm());
            }
        }
    }
}

DLedgerLeaderElector#maintainAsCandidate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/**
 * 保持为候选人
 * @throws Exception 异常
 */
private void maintainAsCandidate() throws Exception {
    //for candidate
    if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
        return;
    }
    //投票轮次
    long term;
    //Leader节点当前的投票轮次
    long ledgerEndTerm;
    //当前日志的最大序列,即下一条日志的开始index
    long ledgerEndIndex;
    synchronized (memberState) {
        if (!memberState.isCandidate()) {
            return;
        }
        //上一次的投票结果为待下一次投票或应该立即开启投票,且根据当前状态机获取下一轮的投票轮次
        if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
            long prevTerm = memberState.currTerm();
            term = memberState.nextTerm();
            logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
            lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
        } else {
            term = memberState.currTerm();
        }
        ledgerEndIndex = memberState.getLedgerEndIndex();
        ledgerEndTerm = memberState.getLedgerEndTerm();
    }
    if (needIncreaseTermImmediately) {
        //设置下一次投票超时时间
        nextTimeToRequestVote = getNextTimeToRequestVote();
        needIncreaseTermImmediately = false;
        return;
    }

    long startVoteTimeMs = System.currentTimeMillis();
    //投票获取响应
    final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
    //已知的最大投票轮次
    final AtomicLong knownMaxTermInGroup = new AtomicLong(term);
    //所有投票票数
    final AtomicInteger allNum = new AtomicInteger(0);
    //有效投票数
    final AtomicInteger validNum = new AtomicInteger(0);
    //获得的投票数
    final AtomicInteger acceptedNum = new AtomicInteger(0);
    //未准备投票的节点数量,如果对端节点的投票轮次小于发起投票的轮次,则认为对端未准备好,对端节点使用本次的轮次进入Candidate状态
    final AtomicInteger notReadyTermNum = new AtomicInteger(0);
    //发起投票的节点,ledgerEndTerm小于对端节点的个数
    final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
    //是否已经存在Leader
    final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);

    CountDownLatch voteLatch = new CountDownLatch(1);
    for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
        future.whenComplete((VoteResponse x, Throwable ex) -> {
            try {
                if (ex != null) {
                    throw ex;
                }
                logger.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x));
                //如果投票结果不是UNKNOW,则有效投票数量增1
                if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
                    validNum.incrementAndGet();
                }
                synchronized (knownMaxTermInGroup) {
                    switch (x.getVoteResult()) {
                        case ACCEPT:
                            acceptedNum.incrementAndGet();
                            break;
                        case REJECT_ALREADY_VOTED:
                        case REJECT_TAKING_LEADERSHIP:
                            break;
                        case REJECT_ALREADY_HAS_LEADER:
                            alreadyHasLeader.compareAndSet(false, true);
                            break;
                        case REJECT_TERM_SMALL_THAN_LEDGER:
                        case REJECT_EXPIRED_VOTE_TERM:
                            //如果自己维护的term小于远端维护的term,更新自己维护的投票轮次
                            if (x.getTerm() > knownMaxTermInGroup.get()) {
                                knownMaxTermInGroup.set(x.getTerm());
                            }
                            break;
                        case REJECT_EXPIRED_LEDGER_TERM:
                        case REJECT_SMALL_LEDGER_END_INDEX:
                            biggerLedgerNum.incrementAndGet();
                            break;
                        case REJECT_TERM_NOT_READY:
                            notReadyTermNum.incrementAndGet();
                            break;
                        default:
                            break;

                    }
                }
                //如果已经投票选出Leader或达到大多数选票
                if (alreadyHasLeader.get()
                    || memberState.isQuorum(acceptedNum.get())
                    || memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
                    voteLatch.countDown();
                }
            } catch (Throwable t) {
                logger.error("vote response failed", t);
            } finally {
                allNum.incrementAndGet();
                //所有选票数等于节点数
                if (allNum.get() == memberState.peerSize()) {
                    voteLatch.countDown();
                }
            }
        });

    }

    try {
        //等待收集投票结果,并设置超时时间
        voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
    } catch (Throwable ignore) {

    }

    lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
    VoteResponse.ParseResult parseResult;
    //如果远端的投票轮次大于发起投票的节点,则该节点使用远端的轮次,重新进入到Candidate状态,并且重置投票计时器
    if (knownMaxTermInGroup.get() > term) {
        parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
        nextTimeToRequestVote = getNextTimeToRequestVote();
        changeRoleToCandidate(knownMaxTermInGroup.get());
    //如果已经存在Leader,该节点重新进入到Candidate,并重置定时器,状态为WAIT_TO_REVOTE,该状态下的特征是下次投票时不增加投票轮次
    } else if (alreadyHasLeader.get()) {
        parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
        nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
    //如果收到的有效票数未超过半数,则重置计时器
    } else if (!memberState.isQuorum(validNum.get())) {
        parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
        nextTimeToRequestVote = getNextTimeToRequestVote();
    //有效票数-当前日志小于远端日志个数不是大多数,则重置计时器
    } else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) {
        parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
        nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;
    //如果得到的赞同票超过半数,则成为Leader
    } else if (memberState.isQuorum(acceptedNum.get())) {
        parseResult = VoteResponse.ParseResult.PASSED;
    //如果得到的赞成票加上未准备投票的节点数超过半数,则应该立即发起投票,故其结果为REVOTE_IMMEDIATELY
    } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
        parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
    } else {
        //开启下一轮投票
        parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
        nextTimeToRequestVote = getNextTimeToRequestVote();
    }
    lastParseResult = parseResult;
    logger.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",
        memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);
    //如果投票成功,则状态机状态设置为Leader
    if (parseResult == VoteResponse.ParseResult.PASSED) {
        logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
        changeRoleToLeader(term);
    }
}

投票

节点的状态为Candidate 时会向集群内的其他节点发起投票请求( 可以理解为拉票),向对方询问是否愿意选举我为Leader,对端节点会根据自己的情况对其投赞成票、拒绝票,如果是拒绝票,还会给出拒绝原因,具体由voteForQuorumResponses、handleVote 这两个方法来实现

DLedgerLeaderElector#voteForQuorumResponses()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 投票获取响应
 * @param term 选举周期
 * @param ledgerEndTerm 发起投票节点维护的已知的最大投票轮次
 * @param ledgerEndIndex 发起投票节点维护的已知的最大日志条目索引
 * @return 所有投票响应
 * @throws Exception 异常
 */
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
    long ledgerEndIndex) throws Exception {
    List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
    for (String id : memberState.getPeerMap().keySet()) {
        VoteRequest voteRequest = new VoteRequest();
        voteRequest.setGroup(memberState.getGroup());
        voteRequest.setLedgerEndIndex(ledgerEndIndex);
        voteRequest.setLedgerEndTerm(ledgerEndTerm);
        voteRequest.setLeaderId(memberState.getSelfId());
        voteRequest.setTerm(term);
        voteRequest.setRemoteId(id);
        CompletableFuture<VoteResponse> voteResponse;
        //给自己投票
        if (memberState.getSelfId().equals(id)) {
            voteResponse = handleVote(voteRequest, true);
        } else {
            //async 异步发起投票
            voteResponse = dLedgerRpcService.vote(voteRequest);
        }
        responses.add(voteResponse);

    }
    return responses;
}

DLedgerLeaderElector#handleVote()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
 * 处理投票请求
 * @param request 投票请求
 * @param self 是否投票给自己
 * @return CompletableFuture
 */
public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean self) {
    //hold the lock to get the latest term, leaderId, ledgerEndIndex
    synchronized (memberState) {
        if (!memberState.isPeerMember(request.getLeaderId())) {
            logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId());
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
        }
        if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
            logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
        }
        //判断请求节点的ledgerEndTerm 与当前节点的ledgerEndTerm,判断日志的复制进度
        if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
            //请求节点的日志复制进度比当前节点低,这种情况是不能成为主节点的
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
        } else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
            //轮次相同,日志复制进度比当前节点低
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
        }
        //如果发起投票节点的term 小于当前节点的term
        if (request.getTerm() < memberState.currTerm()) {
            //此种情况下投拒绝票,也就是说在raft 协议的世界中,谁的term 越大,越有话语权
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
        //如果发起投票节点的term 等于当前节点的term
        } else if (request.getTerm() == memberState.currTerm()) {
            //该节点是否已经投过票
            if (memberState.currVoteFor() == null) {
                //let it go
            //如果当前节点投票Leader等于请求的Leader
            } else if (memberState.currVoteFor().equals(request.getLeaderId())) {
                //repeat just let it go
            } else {
                //如果该节点已存在的Leader节点,则拒绝并告知已存在Leader节点
                if (memberState.getLeaderId() != null) {
                    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER));
                } else {
                    //如果该节点还未有Leader 节点,但已经投了其他节点的票,则拒绝请求节点,并告知已投票
                    return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
                }
            }
        //如果发起投票节点的term 大于当前节点的term
        } else {
            //stepped down by larger term
            //拒绝请求节点的投票请求,并告知自身还未准备投票,自身会使用请求节点的投票轮次立即进入到Candidate 状态
            changeRoleToCandidate(request.getTerm());
            needIncreaseTermImmediately = true;
            //only can handleVote when the term is consistent
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
        }
        //请求轮次比当前轮次小
        if (request.getTerm() < memberState.getLedgerEndTerm()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
        }
        //请求轮次等于当前节点轮次,当前节点日志复制进度比请求复制进度大
        if (!self && isTakingLeadership() && request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && memberState.getLedgerEndIndex() >= request.getLedgerEndIndex()) {
            return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TAKING_LEADERSHIP));
        }
        //投票给请求节点
        memberState.setCurrVoteFor(request.getLeaderId());
        return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
    }
}

DLedgerRpcNettyService#vote()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 调用远程投票操作
 * @param request 请求
 * @return CompletableFuture
 * @throws Exception 异常
 */
@Override public CompletableFuture<VoteResponse> vote(VoteRequest request) throws Exception {
    CompletableFuture<VoteResponse> future = new CompletableFuture<>();
    voteInvokeExecutor.execute(() -> {
        try {
            //投票命令
            RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.VOTE.getCode(), null);
            wrapperRequest.setBody(JSON.toJSONBytes(request));
            remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (responseCommand != null) {
                    VoteResponse response = JSON.parseObject(responseCommand.getBody(), VoteResponse.class);
                    future.complete(response);
                } else {
                    logger.error("Vote request time out, {}", request.baseInfo());
                    future.complete(new VoteResponse());
                }
            });
        } catch (Throwable t) {
            logger.error("Send vote request failed, {}", request.baseInfo(), t);
            future.complete(new VoteResponse());
        }
    });
    return future;
}

心跳

DLedgerLeaderElector#sendHeartbeats()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
 * 发送心跳
 * @param term 选举周期
 * @param leaderId 领导者编码
 * @throws Exception 异常
 */
private void sendHeartbeats(long term, String leaderId) throws Exception {
    final AtomicInteger allNum = new AtomicInteger(1);
    final AtomicInteger succNum = new AtomicInteger(1);
    final AtomicInteger notReadyNum = new AtomicInteger(0);
    final AtomicLong maxTerm = new AtomicLong(-1);
    final AtomicBoolean inconsistLeader = new AtomicBoolean(false);
    final CountDownLatch beatLatch = new CountDownLatch(1);
    long startHeartbeatTimeMs = System.currentTimeMillis();
    for (String id : memberState.getPeerMap().keySet()) {
        if (memberState.getSelfId().equals(id)) {
            continue;
        }
        HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
        heartBeatRequest.setGroup(memberState.getGroup());
        heartBeatRequest.setLocalId(memberState.getSelfId());
        heartBeatRequest.setRemoteId(id);
        heartBeatRequest.setLeaderId(leaderId);
        heartBeatRequest.setTerm(term);
        CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
        future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
            try {
                if (ex != null) {
                    memberState.getPeersLiveTable().put(id, Boolean.FALSE);
                    throw ex;
                }
                switch (DLedgerResponseCode.valueOf(x.getCode())) {
                    //心跳包成功响应    
                    case SUCCESS:
                        succNum.incrementAndGet();
                        break;
					//主节点的投票term 小于从节点的投票轮次                        
                    case EXPIRED_TERM:
                        maxTerm.set(x.getTerm());
                        break;
                    //从节点已经有了新的主节点    
                    case INCONSISTENT_LEADER:
                        inconsistLeader.compareAndSet(false, true);
                        break;
                    //从节点未准备好
                    case TERM_NOT_READY:
                        notReadyNum.incrementAndGet();
                        break;
                    default:
                        break;
                }

                if (x.getCode() == DLedgerResponseCode.NETWORK_ERROR.getCode())
                    memberState.getPeersLiveTable().put(id, Boolean.FALSE);
                else
                    memberState.getPeersLiveTable().put(id, Boolean.TRUE);

                if (memberState.isQuorum(succNum.get())
                    || memberState.isQuorum(succNum.get() + notReadyNum.get())) {
                    beatLatch.countDown();
                }
            } catch (Throwable t) {
                logger.error("heartbeat response failed", t);
            } finally {
                allNum.incrementAndGet();
                if (allNum.get() == memberState.peerSize()) {
                    beatLatch.countDown();
                }
            }
        });
    }
    //等待2000毫秒
    beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
    //如果成功的票数大于进群内的半数,则表示集群状态正常,正常按照心跳包间隔发送心跳包
    if (memberState.isQuorum(succNum.get())) {
        lastSuccHeartBeatTime = System.currentTimeMillis();
    } else {
        logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
            memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
        //如果成功的票数加上未准备的投票的节点数量超过集群内的半数,则立即发送心跳包
        if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
            lastSendHeartBeatTime = -1;
        //如果从节点的投票轮次比主节点的大,则使用从节点的投票轮次
        } else if (maxTerm.get() > term) {
            //节点状态从Leader转换为Candidate
            changeRoleToCandidate(maxTerm.get());
        //从节点已经有了另外的主节点,节点状态从Leader转换为Candidate
        } else if (inconsistLeader.get()) {
            changeRoleToCandidate(term);
        //6秒没有发送心跳
        } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
            changeRoleToCandidate(term);
        }
    }
}

DLedgerServer#handleHeartBeat()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 接受心跳业务处理
 * @param request 心跳请求
 * @return CompletableFuture
 * @throws Exception 异常
 */
@Override public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {
    try {

        PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
        PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
        return dLedgerLeaderElector.handleHeartBeat(request);
    } catch (DLedgerException e) {
        logger.error("[{}][HandleHeartBeat] failed", memberState.getSelfId(), e);
        HeartBeatResponse response = new HeartBeatResponse();
        response.copyBaseInfo(request);
        response.setCode(e.getCode().getCode());
        response.setLeaderId(memberState.getLeaderId());
        return CompletableFuture.completedFuture(response);
    }
}

DLedgerLeaderElector#handleHeartBeat()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
 * 接受心跳业务处理
 * @param request 心跳请求
 * @return CompletableFuture
 * @throws Exception 异常
 */
public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {

    if (!memberState.isPeerMember(request.getLeaderId())) {
        logger.warn("[BUG] [HandleHeartBeat] remoteId={} is an unknown member", request.getLeaderId());
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNKNOWN_MEMBER.getCode()));
    }

    if (memberState.getSelfId().equals(request.getLeaderId())) {
        logger.warn("[BUG] [HandleHeartBeat] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_MEMBER.getCode()));
    }

    //如果主节点的term 小于从节点的term,发送反馈给主节点,告知主节点的term 已过时;
    if (request.getTerm() < memberState.currTerm()) {
        return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
    //如果投票轮次相同,并且发送心跳包的节点是该节点的主节点,则返回成功
    } else if (request.getTerm() == memberState.currTerm()) {
        if (request.getLeaderId().equals(memberState.getLeaderId())) {
            lastLeaderHeartBeatTime = System.currentTimeMillis();
            return CompletableFuture.completedFuture(new HeartBeatResponse());
        }
    }

    //abnormal case
    //hold the lock to get the latest term and leaderId
    synchronized (memberState) {
        //主节点投票轮次小于当前节点投票轮次,则返回主节点投票轮次过期
        if (request.getTerm() < memberState.currTerm()) {
            return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
        //如果投票轮次相同
        } else if (request.getTerm() == memberState.currTerm()) {
            //当前节点还没有Leader,设置请求里的节点为Leader
            if (memberState.getLeaderId() == null) {
                changeRoleToFollower(request.getTerm(), request.getLeaderId());
                return CompletableFuture.completedFuture(new HeartBeatResponse());
            //当前节点有Leader且等于请求里的Leader
            } else if (request.getLeaderId().equals(memberState.getLeaderId())) {
                lastLeaderHeartBeatTime = System.currentTimeMillis();
                return CompletableFuture.completedFuture(new HeartBeatResponse());
            } else {
                //this should not happen, but if happened
                logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());
                return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
            }
        } else {
            //To make it simple, for larger term, do not change to follower immediately
            //first change to candidate, and notify the state-maintainer thread
            //如果主节点的投票轮次大于从节点的投票轮次,则认为从节点并为准备好,则从节点进入Candidate 状态,并立即发起一次投票
            changeRoleToCandidate(request.getTerm());
            needIncreaseTermImmediately = true;
            //TOOD notify
            return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
        }
    }
}