RocketMQ-19丨RocketMQ整合DLedger

Posted by jiefang on July 16, 2021

RocketMQ整合DLedger

RocketMQ 的消息存储文件主要包括CommitLog 文件、ConsumeQueue 文件与Index 文件。Commitlog 文件存储全量的消息,ConsumeQueue、Index 文件都是基于CommitLog 文件构建的。要使用DLedger 来实现消息存储的一致性,应该关键是要实现CommitLog 文件的一致性,即DLedger 要整合的对象应该是CommitLog 文件,即只需保证Raft 协议的复制组内各个节点的CommitLog 文件一致即可。DLedger 整合CommitLog 文件,把RocketMQ 消息,即一个个CommitLog 条目整体当成DLedger 的body 字段。

Broker启动流程

DefaultMessageStore构造方法

1
2
3
4
5
6
7
8
9
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    //是否开启Dledger
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        this.commitLog = new DLedgerCommitLog(this);
    } else {
        this.commitLog = new CommitLog(this);
    }
}

DLedgerCommitLog构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);
    dLedgerConfig = new DLedgerConfig();
    //是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    //DLedger存储类型,固定为基于文件的存储模式
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    //Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    //DLeger group 的名称,建议与broker 配置属性brokerName 保持一致
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    //DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    //设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    //设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
    //DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    //DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
    dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
    dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());

    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
    //根据DLedger 配置信息创建DLedgerServer,即创建DLedger 集群节点,集群内各个节点启动后,就会触发选主
    dLedgerServer = new DLedgerServer(dLedgerConfig);
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
        assert bodyOffset == DLedgerEntry.BODY_OFFSET;
        buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
        buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    //构建消息序列化
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}

BrokerController#initialize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * Broker初始化
 * @return boolean
 * @throws CloneNotSupportedException 不支持克隆异常
 */
public boolean initialize() throws CloneNotSupportedException {
    boolean result = this.topicConfigManager.load();

    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                                        this.brokerConfig);
            //是否启动Dledger
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                //增加节点状态变更事件监听器
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    ...
}

DLedgerCommitLog#load()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * 加载CommitLog文件、ConsumQueue文件和IndexFile文件
 * @return 是否加载成功
 * @throws IOException IO异常
 */
public boolean load() {
    boolean result = true;

    try {
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        //加载周期消息处理类
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        // load Commit Log,根据配置加载CommitLog或DLedgerCommitLog
        result = result && this.commitLog.load();

        // load Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) {
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

            this.indexService.load(lastExitOK);

            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

DLedgerCommitLog

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * Store all metadata downtime for recovery, data protection reliability
 * Dledger实现的CommitLog
 */
public class DLedgerCommitLog extends CommitLog {
    //基于Raft 协议实现的集群内的一个节点,用DLedgerServer 实例表示
    private final DLedgerServer dLedgerServer;
    //DLedger 的配置信息
    private final DLedgerConfig dLedgerConfig;
    //DLedger 基于文件映射的存储实现
    private final DLedgerMmapFileStore dLedgerFileStore;
    //DLedger 所管理的存储文件集合,对比RocketMQ中的MappedFileQueue
    private final MmapFileList dLedgerFileList;
    //The id identifies the broker role, 0 means master, others means slave 节点ID,0 表示主节点,非0 表示从节点
    private final int id;
    //消息序列器
    private final MessageSerializer messageSerializer;
    //用于记录消息追加的时耗(日志追加所持有锁时间)
    private volatile long beginTimeInDledgerLock = 0;
    //This offset separate the old commitlog from dledger commitlog
    //记录的旧Commitlog文件中的最大偏移量,如果访问的偏移量大于它,则访问Dledger 管理的文件
    private long dividedCommitlogOffset = -1;
    //是否正在恢复旧的Commitlog文件
    private boolean isInrecoveringOldCommitlog = false;
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);
    dLedgerConfig = new DLedgerConfig();
    //是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    //DLedger存储类型,固定为基于文件的存储模式
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    //Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    //DLeger group 的名称,建议与broker 配置属性brokerName 保持一致
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    //DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    //设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    //设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
    //DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    //DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
    dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
    dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());

    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
    //根据DLedger 配置信息创建DLedgerServer,即创建DLedger 集群节点,集群内各个节点启动后,就会触发选主
    dLedgerServer = new DLedgerServer(dLedgerConfig);
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
        assert bodyOffset == DLedgerEntry.BODY_OFFSET;
        buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
        buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    //构建消息序列化
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}

DLedgerCommitLog#recover()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
 * 恢复
 * @param maxPhyOffsetOfConsumeQueue 消息消费队列最大物理偏移量
 */
private void recover(long maxPhyOffsetOfConsumeQueue) {
    dLedgerFileStore.load();
    if (dLedgerFileList.getMappedFiles().size() > 0) {
        dLedgerFileStore.recover();
        //DLedger中所有物理文件的最小偏移量,操作消息的物理偏移量小于该值,则从Commitlog文件中查找;物理偏移量大于等于该值的话则从DLedger相关的文件中查找消息
        dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        //如果存在旧的Commitlog 文件,则禁止删除DLedger文件,其具体做法就是禁止强制删除文件,并将文件的有效存储时间设置为10年
        if (mappedFile != null) {
            //目的是防止CommitLog的偏移量与Dledger文件的偏移量出现断层
            disableDeleteDledger();
        }
        //Dledger最大物理偏移量
        long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {
            //Consumequeue中存储的最大物理偏移量大于DLedger中最大的物理偏移量,则删除多余的Consumequeue文件
            log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
        }
        return;
    }
    //Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
    isInrecoveringOldCommitlog = true;
    //No need the abnormal recover 正常恢复
    super.recoverNormally(maxPhyOffsetOfConsumeQueue);
    isInrecoveringOldCommitlog = false;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    if (mappedFile == null) {
        return;
    }
    ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
    byteBuffer.position(mappedFile.getWrotePosition());
    boolean needWriteMagicCode = true;
    // 1 TOTAL SIZE
    byteBuffer.getInt(); //size
    int magicCode = byteBuffer.getInt();
    if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
        needWriteMagicCode = false;
    } else {
        log.info("Recover old commitlog found a illegal magic code={}", magicCode);
    }
    dLedgerConfig.setEnableDiskForceClean(false);
    dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
    log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
    if (needWriteMagicCode) {
        byteBuffer.position(mappedFile.getWrotePosition());
        byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
        byteBuffer.putInt(BLANK_MAGIC_CODE);
        mappedFile.flush(0);
    }
    mappedFile.setWrotePosition(mappedFile.getFileSize());
    mappedFile.setCommittedPosition(mappedFile.getFileSize());
    mappedFile.setFlushedPosition(mappedFile.getFileSize());
    dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
    log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}

DLedgerCommitLog#putMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
 * 消息追加
 * @param msg 消息
 * @return 结果
 */
@Override
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    String topic = msg.getTopic();
    setMessageInfo(msg,tranType);

    // Back to Results
    AppendMessageResult appendResult;
    AppendFuture<AppendEntryResponse> dledgerFuture;
    EncodeResult encodeResult;

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    long elapsedTimeInLock;
    long queueOffset;
    try {
        beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
        encodeResult = this.messageSerializer.serialize(msg);
        queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
        encodeResult.setQueueOffsetKey(queueOffset);
        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
        }
        AppendEntryRequest request = new AppendEntryRequest();
        request.setGroup(dLedgerConfig.getGroup());
        request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
        request.setBody(encodeResult.getData());
        //调用dLedgerServer#handleAppend进行追加,集群内的Leader 节点负责消息追加以及在消息复制,只有超过集群内的半数节点成功写入消息后,
        //才会返回写入成功。如果追加成功,将会返回本次追加成功后的起始偏移量,即pos属性。
        dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
        if (dledgerFuture.getPos() == -1) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
        }
        //根据DLedger起始偏移量计算真正的消息的物理偏移量
        //返回给客户端的消息偏移量为body字段的开始偏移量
        //putMessage 返回的物理偏移量与不使用Dledger 方式返回的物理偏移量的含义是一样的,即从开偏移量开始,可以正确读取消息
        long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;

        int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

        String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
        appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // The next update ConsumeQueue information
                DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        log.error("Put message error", e);
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
    } finally {
        beginTimeInDledgerLock = 0;
        putMessageLock.unlock();
    }

    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
    }

    PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
    try {
        AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
        switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
            case SUCCESS:
                putMessageStatus = PutMessageStatus.PUT_OK;
                break;
            case INCONSISTENT_LEADER:
            case NOT_LEADER:
            case LEADER_NOT_READY:
            case DISK_FULL:
                putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
                break;
            case WAIT_QUORUM_ACK_TIMEOUT:
                //Do not return flush_slave_timeout to the client, for the ons client will ignore it.
                putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                break;
            case LEADER_PENDING_FULL:
                putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
                break;
        }
    } catch (Throwable t) {
        log.error("Failed to get dledger append result", t);
    }

    PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
    if (putMessageStatus == PutMessageStatus.PUT_OK) {
        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
    }
    return putMessageResult;
}

DLedgerCommitLog#getMessage()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 根据偏移量读取消息
 * @param offset 偏移量
 * @param size 长度
 * @return 结果
 */
@Override
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    //小于CommitLog最大偏移量,从CommitLog中查找
    if (offset < dividedCommitlogOffset) {
        return super.getMessage(offset, size);
    }
    int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
    //根据偏移量寻找文件
    MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        //返回查找的消息
        return convertSbr(mappedFile.selectMappedBuffer(pos, size));
    }
    return null;
}

总结

  • DLedger在整合时,使用DLedger 条目包裹RocketMQ 中的CommitLog 条目,即在DLedger 条目的body字段来存储整条CommitLog 条目;
  • 引入dividedCommitlogOffset 变量,表示物理偏移量小于该值的消息存在于旧的CommitLog 文件中,实现升级DLedger 集群后能访问到旧的数据;
  • 新DLedger 集群启动后,会将最后一个CommitLog 填充,即新的数据不会再写入到原先的CommitLog 文件;
  • 消息追加到DLedger 数据日志文件中,返回的偏移量不是DLedger 条目的起始偏移量,而是DLedger 条目中body 字段的起始偏移量,即真实消息的起始偏移量,保证消息物理偏移量的语义与RocketMQ Commitlog 一样;