RocketMQ-18丨DLedger存储实现

Posted by jiefang on July 15, 2021

DLedger存储实现

存储类

DLedgerStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
 * 存储抽象类
 */
public abstract class DLedgerStore {
    public MemberState getMemberState() {
        return null;
    }
    /**
     * 向主节点追加日志(数据)
     * @param entry 日志条目
     * @return 日志条目
     */
    public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry);
    /**
     * 从节点追加日志
     * @param entry 日志条目
     * @param leaderTerm 投票轮次
     * @param leaderId Leader ID
     * @return 日志条目
     */
    public abstract DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId);
    /**
     * 根据日志索引查找日志
     * @param index 索引
     * @return 日志条目
     */
    public abstract DLedgerEntry get(Long index);
    /**
     * 获取已提交的下标
     * @return 索引
     */
    public abstract long getCommittedIndex();
    /**
     * 更新已提交日志索引
     * @param term 选举轮次
     * @param committedIndex 已提交日志索引
     */
    public void updateCommittedIndex(long term, long committedIndex) {}
    /**
     * 获取Leader当前最大的投票轮次
     * @return 投票轮次
     */
    public abstract long getLedgerEndTerm();
    /**
     * 获取Leader下一条日志写入的索引
     * @return 索引
     */
    public abstract long getLedgerEndIndex();
    /**
     * 获取Leader第一条消息的下标
     * @return 索引
     */
    public abstract long getLedgerBeginIndex();
    /**
     * 更新写入索引和选举轮次
     */
    protected void updateLedgerEndIndexAndTerm() {
        if (getMemberState() != null) {
            getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
        }
    }
    /**
     * 刷写
     */
    public void flush() {}
    /**
     * 删除日志
     * @param entry 日志条目
     * @param leaderTerm 选举轮次
     * @param leaderId Leader ID
     * @return long
     */
    public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
        return -1;
    }
    /**
     * 启动
     */
    public void startup() {}
    /**
     * 停止
     */
    public void shutdown() {}
}

DLedgerMmapFileStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * DLedger基于文件内存映射机制的存储实现
 */
public class DLedgerMmapFileStore extends DLedgerStore {

    public static final String CHECK_POINT_FILE = "checkpoint";
    public static final String END_INDEX_KEY = "endIndex";
    public static final String COMMITTED_INDEX_KEY = "committedIndex";
    public static final int MAGIC_1 = 1;
    public static final int CURRENT_MAGIC = MAGIC_1;
    public static final int INDEX_UNIT_SIZE = 32;

    private static Logger logger = LoggerFactory.getLogger(DLedgerMmapFileStore.class);
    public List<AppendHook> appendHooks = new ArrayList<>();
    //日志的起始索引,默认为 -1
    private long ledgerBeginIndex = -1;
    //最后一条日志的索引
    private long ledgerEndIndex = -1;
    //已提交的日志索引
    private long committedIndex = -1;
    private long committedPos = -1;
    //当前最大的投票轮次
    private long ledgerEndTerm;
    //配置信息
    private DLedgerConfig dLedgerConfig;
    //状态机
    private MemberState memberState;
    //日志文件(数据文件)的内存映射Queue
    private MmapFileList dataFileList;
    //索引文件的内存映射文件集合
    private MmapFileList indexFileList;
    //本地线程变量,用来缓存索引ByteBuffer
    private ThreadLocal<ByteBuffer> localEntryBuffer;
    //本地线程变量,用来缓存数据索引ByteBuffe
    private ThreadLocal<ByteBuffer> localIndexBuffer;
    //数据文件刷盘线程
    private FlushDataService flushDataService;
    //清除过期日志文件线程
    private CleanSpaceService cleanSpaceService;
    //磁盘是否已满
    private boolean isDiskFull = false;
    //上一次检测点
    private long lastCheckPointTimeMs = System.currentTimeMillis();
    //是否已经加载,主要用来避免重复加载(初始化)日志文件
    private AtomicBoolean hasLoaded = new AtomicBoolean(false);
    //是否已恢复
    private AtomicBoolean hasRecovered = new AtomicBoolean(false);
}

DLedgerMmapFileStore#startup()

1
2
3
4
5
6
7
8
9
/**
 * 文件内存映射存储实现启动
 */
public void startup() {
    load();
    recover();
    flushDataService.start();
    cleanSpaceService.start();
}

DLedgerMmapFileStore#flush()

1
2
3
4
5
6
7
/**
 * 刷写日志条目文件和索引文件
 */
public void flush() {
    this.dataFileList.flush(0);
    this.indexFileList.flush(0);
}
MmapFileList#flush()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * 刷写日志条目文件
 * @param flushLeastPages 0
 * @return 结果
 */
public boolean flush(final int flushLeastPages) {
    boolean result = true;
    MmapFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
    }
    return result;
}
DefaultMmapFile#flush()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 立即将缓存中的数据刷新到磁盘。
 * @return The current flushed position
 */
@Override
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            try {
                this.mappedByteBuffer.force();
            } catch (Throwable e) {
                logger.error("Error occurred when force data to disk.", e);
            }
            this.flushedPosition.set(value);
            this.release();
        } else {
            logger.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

Dledger存储对比RocketMQ存储

功能 Dledger存储 RocketMQ存储
表示物理文件 DefaultMmapFile MappedFile
表示多个物理文件(逻辑连续) MmapFileList MappedFileQueue
封装存储逻辑 DLedgerMmapFileStore DefaultMessageStore
文件刷盘线程 DLedgerMmapFileStore$FlushDataService CommitLog$FlushCommitLogService
过期文件删除线程 DLedgerMmapFileStore$CleanSpaceService DefaultMessageStore$CleanCommitLogService

数据存储格式

日志条目

DLedgerEntryCoder#encode()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * 编码
 * @param entry 日志条目
 * @param byteBuffer 缓冲区
 */
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
    byteBuffer.clear();
    int size = entry.computSizeInBytes();
    //always put magic on the first position
    //魔数,4 字节
    byteBuffer.putInt(entry.getMagic());
    //条目总长度,包含Header(协议头) + 消息体,占4 字节
    byteBuffer.putInt(size);
    //当前条目的index,占8 字节
    byteBuffer.putLong(entry.getIndex());
    //当前条目所属的投票轮次,占8 字节
    byteBuffer.putLong(entry.getTerm());
    //该条目的物理偏移量,类似于commitlog 文件的物理偏移量,占8 字节
    byteBuffer.putLong(entry.getPos());
    //保留字段,当前版本未使用,占4 字节
    byteBuffer.putInt(entry.getChannel());
    //当前版本未使用,占4 字节
    byteBuffer.putInt(entry.getChainCrc());
    //body 的CRC 校验和,用来区分数据是否损坏,占4 字节。
    byteBuffer.putInt(entry.getBodyCrc());
    //用来存储body 的长度,占4 个字节。
    byteBuffer.putInt(entry.getBody().length);
    //具体消息的内容。
    byteBuffer.put(entry.getBody());
    byteBuffer.flip();
}

日志索引

DLedgerEntryCoder#encodeIndex()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 日志索引编码
 * @param pos 日志条目在文件的偏移量
 * @param size 条目大小
 * @param magic 魔数
 * @param index 索引
 * @param term 投票轮次
 * @param byteBuffer 缓冲区
 */
public static void encodeIndex(long pos, int size, int magic, long index, long term, ByteBuffer byteBuffer) {
    byteBuffer.clear();
    //魔数,4 字节
    byteBuffer.putInt(magic);
    //日志条目在文件的偏移量,8字节
    byteBuffer.putLong(pos);
    //条目大小,4字节
    byteBuffer.putInt(size);
    //日志条目索引,8字节
    byteBuffer.putLong(index);
    //投票轮次,8字节
    byteBuffer.putLong(term);
    byteBuffer.flip();
}