RocketMQ-18丨DLedger日志复制机制

Posted by jiefang on July 13, 2021

DLedger日志复制机制

RequestOrResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * 请求或响应
 */
public class RequestOrResponse {
    //该集群所属组名
    protected String group;
    //请求目的节点ID
    protected String remoteId;
    //当前节点ID
    protected String localId;
    //请求响应字段,表示返回响应码
    protected int code = DLedgerResponseCode.SUCCESS.getCode();
    //集群中的Leader Id
    protected String leaderId = null;
    //集群当前的选举轮次
    protected long term = -1;
}

AppendEntryRequest

1
2
3
4
5
6
7
8
/**
 * 追加日志条目请求
 */
public class AppendEntryRequest extends RequestOrResponse {

    //待发送的数据
    private byte[] body;
}

AppendEntryResponse

1
2
3
4
5
6
7
8
/**
 * 追加日志条目响应
 */
public class AppendEntryResponse extends RequestOrResponse {

    private long index = -1;
    private long pos = -1;
}

Leader端

DLedgerServer#startup()

1
2
3
4
5
6
7
8
9
10
/**
 * 启动
 */
public void startup() {
    this.dLedgerStore.startup();
    this.dLedgerRpcService.startup();
    this.dLedgerEntryPusher.startup();
    this.dLedgerLeaderElector.startup();
    executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
}

DLedgerEntryPusher#startup()

1
2
3
4
5
6
7
8
9
10
11
12
/**
 * 启动ACK检查器和每个follower的分发器
 */
public void startup() {
    entryHandler.start();
    //大多数ACK检查器启动
    quorumAckChecker.start();
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
        //每个follower的分发器启动
        dispatcher.start();
    }
}

DLedgerServer#handleAppend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
 * Handle the append requests:
 * 1.append the entry to local store
 * 2.submit the future to entry pusher and wait the quorum ack
 * 3.if the pending requests are full, then reject it immediately
 * 处理追加请求: 1. 将条目追加到本地存储 2. 将未来提交到条目推送器并等待仲裁确认 3. 如果待处理的请求已满,则立即拒绝它
 * @param request 请求
 * @return CompletableFuture
 * @throws IOException IO异常
 */
@Override
public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException {
    try {
        //请求的节点不是当前的处理节点
        PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
        //请求的集群不在当前所属集群
        PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
        //如果当前节点不是主节点,则抛出异常
        PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
        PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
        long currTerm = memberState.currTerm();
        //如果预处理队列已经满了,则拒绝客户端请求
        if (dLedgerEntryPusher.isPendingFull(currTerm)) {
            AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
            appendEntryResponse.setGroup(memberState.getGroup());
            appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
            appendEntryResponse.setTerm(currTerm);
            appendEntryResponse.setLeaderId(memberState.getSelfId());
            return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
        } else {
            //批量追加日志请求
            if (request instanceof BatchAppendEntryRequest) {
                BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
                if (batchRequest.getBatchMsgs() != null && batchRequest.getBatchMsgs().size() != 0) {
                    // record positions to return;
                    long[] positions = new long[batchRequest.getBatchMsgs().size()];
                    DLedgerEntry resEntry = null;
                    // split bodys to append
                    int index = 0;
                    Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
                    while (iterator.hasNext()) {
                        DLedgerEntry dLedgerEntry = new DLedgerEntry();
                        dLedgerEntry.setBody(iterator.next());
                        //DLedger分别实现了基于内存、基于文件的存储实现
                        //追加日志条目至文件
                        resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
                        positions[index++] = resEntry.getPos();
                    }
                    // only wait last entry ack is ok 等待最后一个条目的ACK
                    BatchAppendFuture<AppendEntryResponse> batchAppendFuture =
                            (BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true);
                    batchAppendFuture.setPositions(positions);
                    return batchAppendFuture;
                }
                throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
                        " with empty bodys");
            } else {
                DLedgerEntry dLedgerEntry = new DLedgerEntry();
                dLedgerEntry.setBody(request.getBody());
                //请求封装成DledgerEntry,则调用dLedgerStore 方法追加日志
                DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
                //同步等待副本节点的复制响应
                return dLedgerEntryPusher.waitAck(resEntry, false);
            }
        }
    } catch (DLedgerException e) {
        logger.error("[{}][HandleAppend] failed", memberState.getSelfId(), e);
        AppendEntryResponse response = new AppendEntryResponse();
        response.copyBaseInfo(request);
        response.setCode(e.getCode().getCode());
        response.setLeaderId(memberState.getLeaderId());
        return AppendFuture.newCompletedFuture(-1, response);
    }
}

DLedgerMmapFileStore#appendAsLeader()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
 * 作为Leader追加日志
 * @param entry 日志条目
 * @return DLedgerEntry
 */
@Override
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
    //当前节点的状态是否是Leader,如果不是,则抛出异常
    PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
    //当前磁盘是否已满,其判断依据是DLedger的根目录或数据文件目录的使用率超过了允许使用的最大值,默认值为85%
    PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
    ByteBuffer dataBuffer = localEntryBuffer.get();
    ByteBuffer indexBuffer = localIndexBuffer.get();
    DLedgerEntryCoder.encode(entry, dataBuffer);
    int entrySize = dataBuffer.remaining();
    synchronized (memberState) {
        PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
        PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);
        long nextIndex = ledgerEndIndex + 1;
        entry.setIndex(nextIndex);
        entry.setTerm(memberState.currTerm());
        entry.setMagic(CURRENT_MAGIC);
        DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
        long prePos = dataFileList.preAppend(dataBuffer.remaining());
        entry.setPos(prePos);
        PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
        DLedgerEntryCoder.setPos(dataBuffer, prePos);
        for (AppendHook writeHook : appendHooks) {
            writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
        }
        //追加日志至存储
        long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
        PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
        PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
        DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
        //追加索引至索引文件
        long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
        PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
        if (logger.isDebugEnabled()) {
            logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
        }
        ledgerEndIndex++;
        ledgerEndTerm = memberState.currTerm();
        if (ledgerBeginIndex == -1) {
            ledgerBeginIndex = ledgerEndIndex;
        }
        updateLedgerEndIndexAndTerm();
        return entry;
    }
}
MmapFileList#append()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * 追加日志
 * @param data 数据
 * @param pos 位置
 * @param len 长度
 * @param useBlank 使用分隔符
 * @return 添加消息后偏移量
 */
public long append(byte[] data, int pos, int len, boolean useBlank) {
    //预处理追加日志,添加分隔符
    if (preAppend(len, useBlank) == -1) {
        return -1;
    }
    //获取最新的内存映射日志文件
    MmapFile mappedFile = getLastMappedFile();
    //返回最新的偏移量
    long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    //追加日志至文件
    if (!mappedFile.appendMessage(data, pos, len)) {
        logger.error("Append error for {}", storePath);
        return -1;
    }
    return currPosition;
}
DefaultMmapFile#appendMessage()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
 * Content of data from offset to offset + length will be wrote to file.
 * 数据内容写入文件映射内存
 * @param offset The offset of the subarray to be used.
 * @param length The length of the subarray to be used.
 */
@Override
public boolean appendMessage(final byte[] data, final int offset, final int length) {
    int currentPos = this.wrotePosition.get();
    if ((currentPos + length) <= this.fileSize) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        byteBuffer.put(data, offset, length);
        this.wrotePosition.addAndGet(length);
        return true;
    }
    return false;
}

DLedgerEntryPusher#waitAck()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * 主节点等待从节点复制ACK
 * @param entry 日志条目
 * @param isBatchWait 是否批量
 * @return CompletableFuture
 */
public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry, boolean isBatchWait) {
    //更新当前节点的push水位线
    updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());
    //如果集群的节点个数为1,无需转发,直接返回成功结果
    if (memberState.getPeerMap().size() == 1) {
        AppendEntryResponse response = new AppendEntryResponse();
        response.setGroup(memberState.getGroup());
        response.setLeaderId(memberState.getSelfId());
        response.setIndex(entry.getIndex());
        response.setTerm(entry.getTerm());
        response.setPos(entry.getPos());
        if (isBatchWait) {
            return BatchAppendFuture.newCompletedFuture(entry.getPos(), response);
        }
        return AppendFuture.newCompletedFuture(entry.getPos(), response);
    } else {
        checkTermForPendingMap(entry.getTerm(), "waitAck");
        AppendFuture<AppendEntryResponse> future;
        //构建append响应Future 并设置超时时间,默认值为:2500 ms
        if (isBatchWait) {
            future = new BatchAppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
        } else {
            future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
        }
        future.setPos(entry.getPos());
        //添加至pendingAppendResponsesByTerm的value中
        CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
        if (old != null) {
            logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
        }
        return future;
    }
}

EntryDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * This thread will be activated by the leader.
 * This thread will push the entry to follower(identified by peerId) and update the completed pushed index to index map.
 * Should generate a single thread for each peer.
 * The push has 4 types:
 *   APPEND : append the entries to the follower
 *   COMPARE : if the leader changes, the new leader should compare its entries to follower's
 *   TRUNCATE : if the leader finished comparing by an index, the leader will send a request to truncate the follower's ledger
 *   COMMIT: usually, the leader will attach the committed index with the APPEND request, but if the append requests are few and scattered,
 *           the leader will send a pure request to inform the follower of committed index.
 *
 *   The common transferring between these types are as following:
 *
 *   COMPARE ---- TRUNCATE ---- APPEND ---- COMMIT
 *   ^                             |
 *   |---<-----<------<-------<----|
 * 该线程将由领导者激活。 该线程会将条目推送到follower(由peerId标识)并将完成的推送索引更新为索引映射
 */
private class EntryDispatcher extends ShutdownAbleThread {

    //向从节点发送命令的类型,可选值:PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT
    private AtomicReference<PushEntryRequest.Type> type = new AtomicReference<>(PushEntryRequest.Type.COMPARE);
    //最后Commit时间
    private long lastPushCommitTimeMs = -1;
    //目标节点ID
    private String peerId;
    //已完成比较的日志序号
    private long compareIndex = -1;
    //已写入的日志索引
    private long writeIndex = -1;
    //允许的最大挂起日志数量
    private int maxPendingSize = 1000;
    //Leader 节点当前的投票轮次
    private long term = -1;
    //领导者ID
    private String leaderId = null;
    //上次检测泄漏的时间, 所谓的泄漏, 就是看挂起的日志请求数量是否查过了maxPendingSize
    private long lastCheckLeakTimeMs = System.currentTimeMillis();
    //记录日志的挂起时间,key:日志的序列(entryIndex),value:挂起时间戳。
    private ConcurrentMap<Long, Long> pendingMap = new ConcurrentHashMap<>();
    private ConcurrentMap<Long, Pair<Long, Integer>> batchPendingMap = new ConcurrentHashMap<>();
    private PushEntryRequest batchAppendEntryRequest = new PushEntryRequest();
    private Quota quota = new Quota(dLedgerConfig.getPeerPushQuota());
}

PushEntryRequest.Type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 请求类型枚举
 */
public enum Type {
    /**
     * 将日志条目追加到从节点
     */
    APPEND,
    /**
     * 通常,Leader会将提交的索引附加到append请求,但是如果append请求很少且分散,Leader将发送一个单独的请求来通知从节点提交的索引。
     */
    COMMIT,
    /**
     * 如果Leader发生变化,新的Leader需要与他的从节点的日志条目进行比较,以便截断从节点多余的数据
     */
    COMPARE,
    /**
     * 如果Leader通过索引完成日志对比,则Leader将发送TRUNCATE给它的从节点
     */
    TRUNCATE
}

EntryDispatcher#doWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 工作,调用follower追加日志
 */
@Override
public void doWork() {
    try {
        //检查状态,是否可以继续发送append或compare
        if (!checkAndFreshState()) {
            waitForRunning(1);
            return;
        }
        //如果推送类型为APPEND,主节点向从节点传播消息请求
        if (type.get() == PushEntryRequest.Type.APPEND) {
            if (dLedgerConfig.isEnableBatchPush()) {
                doBatchAppend();
            } else {
                doAppend();
            }
        //主节点向从节点发送对比数据差异请求(当一个新节点被选举成为主节点时,往往这是第一步)
        } else {
            doCompare();
        }
        waitForRunning(1);
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
        DLedgerUtils.sleep(500);
    }
}
EntryDispatcher#checkAndFreshState()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * 检查状态,是否可以继续发送append或compare
 * @return boolean
 */
private boolean checkAndFreshState() {
    //如果节点的状态不是主节点,则直接返回false
    if (!memberState.isLeader()) {
        return false;
    }
    //如果当前节点状态是主节点,但当前的投票轮次与状态机轮次或leaderId还未设置,或leaderId 与状态机的leaderId 不相等,
    //这种情况通常是集群触发了重新选举,设置其term、leaderId 与状态机同步,即将发送COMPARE 请求
    if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {
        synchronized (memberState) {
            if (!memberState.isLeader()) {
                return false;
            }
            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
            term = memberState.currTerm();
            leaderId = memberState.getSelfId();
            changeState(-1, PushEntryRequest.Type.COMPARE);
        }
    }
    return true;
}

EntryDispatcher#doAppend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * 追加日志条目操作
 * @throws Exception 异常
 */
private void doAppend() throws Exception {
    while (true) {
        if (!checkAndFreshState()) {
            break;
        }
        if (type.get() != PushEntryRequest.Type.APPEND) {
            break;
        }
        //writeIndex表示当前追加到从该节点的索引,通常情况下主节点向从节点发送append请求时,会附带主节点的已提交指针,但如果append请求发不那么频繁,
        //writeIndex大于leaderEndIndex时由于pending请求超过其pending请求的队列长度(默认为1w)时,会阻止数据的追加,此时有可能出现writeIndex大于leaderEndIndex的情况,此时单独发送COMMIT请求
        if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
            //操作提交请求
            doCommit();
            //检查响应时间,如果超时重新推送追加日志
            doCheckAppendResponse();
            break;
        }
        //等待数大于等于1000或最后检查时间超时1秒钟
        if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) {
            //获取当前节点push水位线
            long peerWaterMark = getPeerWaterMark(term, peerId);
            for (Long index : pendingMap.keySet()) {
                //小于水位线的移除
                if (index < peerWaterMark) {
                    pendingMap.remove(index);
                }
            }
            lastCheckLeakTimeMs = System.currentTimeMillis();
        }
        //如果挂起的请求(等待从节点追加结果)大于maxPendingSize时,检查并追加一次append 请求
        if (pendingMap.size() >= maxPendingSize) {
            doCheckAppendResponse();
            break;
        }
        doAppendInner(writeIndex);
        writeIndex++;
    }
}
EntryDispatcher#doAppendInner()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * 追加日志条目操作
 * @param index 索引
 * @throws Exception 异常
 */
private void doAppendInner(long index) throws Exception {
    //根据序号查询出日志
    DLedgerEntry entry = getDLedgerEntryForAppend(index);
    if (null == entry) {
        return;
    }
    //检查配额并等待
    checkQuotaAndWait(entry);
    //追加日志请求
    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);
    //RPC推送请求至follower
    CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
    //索引,当前时间放入待定集合,用于判断是否超时
    pendingMap.put(index, System.currentTimeMillis());
    responseFuture.whenComplete((x, ex) -> {
        try {
            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
            switch (responseCode) {
                //如果follower返回成功
                case SUCCESS:
                    //移除当前follower待定集合的当前索引
                    pendingMap.remove(x.getIndex());
                    //更新当前follower水位线
                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
                    //唤醒大多数ACK检查器
                    quorumAckChecker.wakeup();
                    break;
                //状态不一致,将发送COMPARE 请求,来对比主从节点的数据是否一致
                case INCONSISTENT_STATE:
                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
                    changeState(-1, PushEntryRequest.Type.COMPARE);
                    break;
                default:
                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
                    break;
            }
        } catch (Throwable t) {
            logger.error("", t);
        }
    });
    lastPushCommitTimeMs = System.currentTimeMillis();
}
EntryDispatcher#doCheckAppendResponse()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 检查并追加请求
 * @throws Exception 异常
 */
private void doCheckAppendResponse() throws Exception {
    //获取节点水位线
    long peerWaterMark = getPeerWaterMark(term, peerId);
    //从挂起的请求队列中获取下一条的发送时间,如果不为空并去超过了append的超时时间,则再重新发送append请求,最大超时时间默认为1s,可以通过maxPushTimeOutMs来改变默认值
    Long sendTimeMs = pendingMap.get(peerWaterMark + 1);
    //如果超时,重新发送追加日志请求
    if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) {
        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
        doAppendInner(peerWaterMark + 1);
    }
}

EntryDispatcher#doCommit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 操作提交请求
 * @throws Exception 异常
 */
private void doCommit() throws Exception {
    //每隔1S
    if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > 1000) {
        //构建提交请求
        PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);
        //Ignore the results
        //通过网络向从节点发送commit请求
        dLedgerRpcService.push(request);
        lastPushCommitTimeMs = System.currentTimeMillis();
    }
}

EntryDispatcher#doCompare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
 * 进行比较日志条目
 * @throws Exception 异常
 */
private void doCompare() throws Exception {
    while (true) {
        //判断是否是主节点,如果不是主节点,则直接跳出
        if (!checkAndFreshState()) {
            break;
        }
        //如果是请求类型不是COMPARE 或TRUNCATE 请求,则直接跳出
        if (type.get() != PushEntryRequest.Type.COMPARE
            && type.get() != PushEntryRequest.Type.TRUNCATE) {
            break;
        }
        //如果已比较索引和ledgerEndIndex 都为-1 ,表示一个新的DLedger 集群,则直接跳出
        if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {
            break;
        }
        //revise the compareIndex 如果compareIndex 为-1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex
        if (compareIndex == -1) {
            compareIndex = dLedgerStore.getLedgerEndIndex();
            logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
        } else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {
            logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
            compareIndex = dLedgerStore.getLedgerEndIndex();
        }

        DLedgerEntry entry = dLedgerStore.get(compareIndex);
        PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
        PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
        //根据序号查询到日志,并向从节点发起COMPARE请求,超时时间为3s
        CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
        PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);
        PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
        PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode()
            , DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", compareIndex);
        long truncateIndex = -1;
        //根据响应结果计算需要截断的日志序号
        if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
            /*
             * The comparison is successful:
             * 1.Just change to append state, if the follower's end index is equal the compared index.
             * 2.Truncate the follower, if the follower has some dirty entries.
             */
            //如果两者的日志序号相同,则无需截断,下次将直接先从节点发送append请求;否则将truncateIndex 设置为响应结果中的endIndex
            if (compareIndex == response.getEndIndex()) {
                changeState(compareIndex, PushEntryRequest.Type.APPEND);
                break;
            } else {
                truncateIndex = compareIndex;
            }
        } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()
            || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {
            /*
             The follower's entries does not intersect with the leader.
             This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries.
             Just truncate the follower.
             */
            //如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,这通常发生在从节点崩溃很长一段时间,
            //而主节点删除了过期的条目时。truncateIndex 设置为主节点的ledgerBeginIndex,即主节点目前最小的偏移量
            truncateIndex = dLedgerStore.getLedgerBeginIndex();
        } else if (compareIndex < response.getBeginIndex()) {
            /*
             The compared index is smaller than the follower's begin index.
             This happened rarely, usually means some disk damage.
             Just truncate the follower.
             */
            //如果已比较的日志序号小于从节点的开始日志序号,很可能是从节点磁盘发送损耗,从主节点最小日志序号开始同步
            truncateIndex = dLedgerStore.getLedgerBeginIndex();
        } else if (compareIndex > response.getEndIndex()) {
            /*
             The compared index is bigger than the follower's end index.
             This happened frequently. For the compared index is usually starting from the end index of the leader.
             */
            //如果已比较的日志序号大于从节点的最大日志序号,则已比较索引设置为从节点最大的日志序号,触发数据的继续同步
            compareIndex = response.getEndIndex();
        } else {
            /*
              Compare failed and the compared index is in the range of follower's entries.
             */
            //如果已比较的日志序号大于从节点的开始日志序号,但小于从节点的最大日志序号,则待比较索引减一
            compareIndex--;
        }
        /*
         The compared index is smaller than the leader's begin index, truncate the follower.
         */
        //如果比较出来的日志序号小于主节点的最小日志需要,则设置为主节点的最小序号
        if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {
            truncateIndex = dLedgerStore.getLedgerBeginIndex();
        }
        /*
         If get value for truncateIndex, do it right now.
         */
        if (truncateIndex != -1) {
            changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
            doTruncate(truncateIndex);
            break;
        }
    }
}

EntryDispatcher#doTruncate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * 操作删除日志条目
 * @param truncateIndex 删除日志索引
 * @throws Exception 异常
 */
private void doTruncate(long truncateIndex) throws Exception {
    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
    PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
    logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
    //构建删除请求
    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
    //推送删除请求
    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
    PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
    lastPushCommitTimeMs = System.currentTimeMillis();
    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
}

QuorumAckChecker

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * This thread will check the quorum index and complete the pending requests.
 * 该线程将检查仲裁索引并完成挂起的请求。
 */
private class QuorumAckChecker extends ShutdownAbleThread {

    //上次打印水位线的时间戳,单位为毫秒
    private long lastPrintWatermarkTimeMs = System.currentTimeMillis();
    //上次检测泄漏的时间戳,单位为毫秒
    private long lastCheckLeakTimeMs = System.currentTimeMillis();
    //已投票仲裁的日志索引
    private long lastQuorumIndex = -1;
}

QuorumAckChecker#doWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
    /**
     * run方法调用核心方法
     */
    @Override
    public void doWork() {
        try {
            //每隔3S
            if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
                logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
                    memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
                lastPrintWatermarkTimeMs = System.currentTimeMillis();
            }
            //如果当前节点不是主节点返回
            if (!memberState.isLeader()) {
                waitForRunning(1);
                return;
            }
            long currTerm = memberState.currTerm();
            checkTermForPendingMap(currTerm, "QuorumAckChecker");
            checkTermForWaterMark(currTerm, "QuorumAckChecker");
            //容器里有同步日志条目的请求
            if (pendingAppendResponsesByTerm.size() > 1) {
                for (Long term : pendingAppendResponsesByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setIndex(futureEntry.getKey());
                        response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
                        response.setLeaderId(memberState.getLeaderId());
                        logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
                        futureEntry.getValue().complete(response);
                    }
                    pendingAppendResponsesByTerm.remove(term);
                }
            }
            if (peerWaterMarksByTerm.size() > 1) {
                for (Long term : peerWaterMarksByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
                    peerWaterMarksByTerm.remove(term);
                }
            }
            Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);

            List<Long> sortedWaterMarks = peerWaterMarks.values()
                    .stream()
                    .sorted(Comparator.reverseOrder())
                    .collect(Collectors.toList());
            long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
            //根据各个从节点反馈的进度,进行仲裁,确定已提交序号
            dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
            //处理quorumIndex 之前的挂起请求,需要发送响应到客户端
            ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
            boolean needCheck = false;
            int ackNum = 0;
            //从quorumIndex 开始处理
            for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
                try {
                    //responses中移除该日志条目的挂起请求
                    CompletableFuture<AppendEntryResponse> future = responses.remove(i);
                    //如果未找到挂起请求,说明前面挂起的请求已经全部处理完毕,准备退出,退出之前再设置needCheck 的值
                    if (future == null) {
                        //needCheck 的含义是是否需要检查请求泄漏
                        needCheck = true;
                        break;
                    } else if (!future.isDone()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setTerm(currTerm);
                        response.setIndex(i);
                        response.setLeaderId(memberState.getSelfId());
                        response.setPos(((AppendFuture) future).getPos());
                        future.complete(response);
                    }
                    //表示本次确认的数量
                    ackNum++;
                } catch (Throwable t) {
                    logger.error("Error in ack to index={} term={}", i, currTerm, t);
                }
            }
            //如果本次确认的个数为0,处理超时未收到响应的future
            if (ackNum == 0) {
                for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {
                    TimeoutFuture<AppendEntryResponse> future = responses.get(i);
                    if (future == null) {
                        break;
                    } else if (future.isTimeOut()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
                        response.setTerm(currTerm);
                        response.setIndex(i);
                        response.setLeaderId(memberState.getSelfId());
                        future.complete(response);
                    } else {
                        break;
                    }
                }
                waitForRunning(1);
            }

            //检查是否发送泄漏。其判断泄漏的依据是如果挂起的请求的日志序号小于已提交的序号,则移除
            if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
                updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
                for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
                    if (futureEntry.getKey() < quorumIndex) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setTerm(currTerm);
                        response.setIndex(futureEntry.getKey());
                        response.setLeaderId(memberState.getSelfId());
                        response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
                        futureEntry.getValue().complete(response);
                        responses.remove(futureEntry.getKey());
                    }
                }
                lastCheckLeakTimeMs = System.currentTimeMillis();
            }
            //一次日志仲裁就结束了,最后更新lastQuorumIndex 为本次仲裁的的新的提交值
            lastQuorumIndex = quorumIndex;
        } catch (Throwable t) {
            DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
            DLedgerUtils.sleep(100);
        }
    }
}

Follower端

EntryHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * This thread will be activated by the follower.
 * Accept the push request and order it by the index, then append to ledger store one by one.
 * 该线程将由Follower激活。 接受推送请求并按索引排序,然后一一追加到账本存储中。
 */
private class EntryHandler extends ShutdownAbleThread {
    //上一次检查主服务器是否有push消息的时间戳
    private long lastCheckFastForwardTimeMs = System.currentTimeMillis();
    //append 请求处理队列
    ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap = new ConcurrentHashMap<>();
    //COMMIT、COMPARE、TRUNCATE 相关请求
    BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>(100);
}

EntryHandler#handlePush()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * Follower处理Leader推送的请求
 * @param request 请求
 * @return CompletableFuture
 * @throws Exception 异常
 */
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
    //The timeout should smaller than the remoting layer's request timeout
    //构建一个响应结果Future,默认超时时间1s
    CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);
    switch (request.getType()) {
        case APPEND:
            if (dLedgerConfig.isEnableBatchPush()) {
                PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                long firstIndex = request.getFirstEntryIndex();
                writeRequestMap.put(firstIndex, new Pair<>(request, future));
            } else {
                PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                long index = request.getEntry().getIndex();
                //如果已经存在该索引的日志条目,返回重复的推送
                Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
                if (old != null) {
                    logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
                    future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
                }
            }
            break;
        case COMMIT:
            //由doWork方法异步处理
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        case COMPARE:
        case TRUNCATE:
            PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            //将待写入队列writeRequestMap清空
            writeRequestMap.clear();
            compareOrTruncateRequests.put(new Pair<>(request, future));
            break;
        default:
            logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
            future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
            break;
    }
    return future;
}

EntryHandler#doWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
 * run方法调用的主方法
 */
@Override
public void doWork() {
    try {
        //如果当前节点不是Follower返回
        if (!memberState.isFollower()) {
            waitForRunning(1);
            return;
        }
        //队列不为空,只处理COMMIT、COMPARE、TRUNCATE 等请求
        if (compareOrTruncateRequests.peek() != null) {
            //弹出一个
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
            PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
            switch (pair.getKey().getType()) {
                case TRUNCATE:
                    handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMPARE:
                    handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
                    break;
                case COMMIT:
                    handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
                    break;
                default:
                    break;
            }
        } else {
            //获取下一个追加日志请求
            long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
            //处理异常
            if (pair == null) {
                checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
                waitForRunning(1);
                return;
            }
            PushEntryRequest request = pair.getKey();
            if (dLedgerConfig.isEnableBatchPush()) {
                handleDoBatchAppend(nextIndex, request, pair.getValue());
            } else {
                handleDoAppend(nextIndex, request, pair.getValue());
            }
        }
    } catch (Throwable t) {
        DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
        DLedgerUtils.sleep(100);
    }
}

EntryHandler#handleDoAppend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * Follower处理追加日志请求
 * @param writeIndex 追加索引
 * @param request 请求
 * @param future future
 */
private void handleDoAppend(long writeIndex, PushEntryRequest request,
    CompletableFuture<PushEntryResponse> future) {
    try {
        PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
        //从节点追加日志
        DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
        PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        //更新选举轮次和提交索引
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
}

EntryHandler#handleDoCompare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * Follower处理比较请求
 * @param compareIndex 比较索引
 * @param request 请求
 * @param future future
 * @return CompletableFuture
 */
private CompletableFuture<PushEntryResponse> handleDoCompare(long compareIndex, PushEntryRequest request,
    CompletableFuture<PushEntryResponse> future) {
    try {
        PreConditions.check(compareIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
        PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);
        DLedgerEntry local = dLedgerStore.get(compareIndex);
        //判断日志条目是不是一致
        PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
    } catch (Throwable t) {
        logger.error("[HandleDoCompare] compareIndex={}", compareIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
    return future;
}

EntryHandler#handleDoCommit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * Follower处理提交请求
 * @param committedIndex 提交索引
 * @param request 请求
 * @param future future
 * @return CompletableFuture
 */
private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex, PushEntryRequest request,
    CompletableFuture<PushEntryResponse> future) {
    try {
        PreConditions.check(committedIndex == request.getCommitIndex(), DLedgerResponseCode.UNKNOWN);
        PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
        //更新提交索引
        dLedgerStore.updateCommittedIndex(request.getTerm(), committedIndex);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
    } catch (Throwable t) {
        logger.error("[HandleDoCommit] committedIndex={}", request.getCommitIndex(), t);
        future.complete(buildResponse(request, DLedgerResponseCode.UNKNOWN.getCode()));
    }
    return future;
}

EntryHandler#handleDoTruncate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * Follower处理删除请求
 * @param truncateIndex 删除索引
 * @param request 请求
 * @param future future
 * @return CompletableFuture
 */
private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request,
    CompletableFuture<PushEntryResponse> future) {
    try {
        logger.info("[HandleDoTruncate] truncateIndex={} pos={}", truncateIndex, request.getEntry().getPos());
        PreConditions.check(truncateIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);
        PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
        long index = dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());
        PreConditions.check(index == truncateIndex, DLedgerResponseCode.INCONSISTENT_STATE);
        future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
        dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());
    } catch (Throwable t) {
        logger.error("[HandleDoTruncate] truncateIndex={}", truncateIndex, t);
        future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
    }
    return future;
}

原理

日志复制流程图

日志追加仲裁流程图