DLedger存储实现
存储类
DLedgerStore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 存储抽象类
*/
public abstract class DLedgerStore {
public MemberState getMemberState() {
return null;
}
/**
* 向主节点追加日志(数据)
* @param entry 日志条目
* @return 日志条目
*/
public abstract DLedgerEntry appendAsLeader(DLedgerEntry entry);
/**
* 从节点追加日志
* @param entry 日志条目
* @param leaderTerm 投票轮次
* @param leaderId Leader ID
* @return 日志条目
*/
public abstract DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId);
/**
* 根据日志索引查找日志
* @param index 索引
* @return 日志条目
*/
public abstract DLedgerEntry get(Long index);
/**
* 获取已提交的下标
* @return 索引
*/
public abstract long getCommittedIndex();
/**
* 更新已提交日志索引
* @param term 选举轮次
* @param committedIndex 已提交日志索引
*/
public void updateCommittedIndex(long term, long committedIndex) {}
/**
* 获取Leader当前最大的投票轮次
* @return 投票轮次
*/
public abstract long getLedgerEndTerm();
/**
* 获取Leader下一条日志写入的索引
* @return 索引
*/
public abstract long getLedgerEndIndex();
/**
* 获取Leader第一条消息的下标
* @return 索引
*/
public abstract long getLedgerBeginIndex();
/**
* 更新写入索引和选举轮次
*/
protected void updateLedgerEndIndexAndTerm() {
if (getMemberState() != null) {
getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
}
}
/**
* 刷写
*/
public void flush() {}
/**
* 删除日志
* @param entry 日志条目
* @param leaderTerm 选举轮次
* @param leaderId Leader ID
* @return long
*/
public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
return -1;
}
/**
* 启动
*/
public void startup() {}
/**
* 停止
*/
public void shutdown() {}
}
DLedgerMmapFileStore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* DLedger基于文件内存映射机制的存储实现
*/
public class DLedgerMmapFileStore extends DLedgerStore {
public static final String CHECK_POINT_FILE = "checkpoint";
public static final String END_INDEX_KEY = "endIndex";
public static final String COMMITTED_INDEX_KEY = "committedIndex";
public static final int MAGIC_1 = 1;
public static final int CURRENT_MAGIC = MAGIC_1;
public static final int INDEX_UNIT_SIZE = 32;
private static Logger logger = LoggerFactory.getLogger(DLedgerMmapFileStore.class);
public List<AppendHook> appendHooks = new ArrayList<>();
//日志的起始索引,默认为 -1
private long ledgerBeginIndex = -1;
//最后一条日志的索引
private long ledgerEndIndex = -1;
//已提交的日志索引
private long committedIndex = -1;
private long committedPos = -1;
//当前最大的投票轮次
private long ledgerEndTerm;
//配置信息
private DLedgerConfig dLedgerConfig;
//状态机
private MemberState memberState;
//日志文件(数据文件)的内存映射Queue
private MmapFileList dataFileList;
//索引文件的内存映射文件集合
private MmapFileList indexFileList;
//本地线程变量,用来缓存索引ByteBuffer
private ThreadLocal<ByteBuffer> localEntryBuffer;
//本地线程变量,用来缓存数据索引ByteBuffe
private ThreadLocal<ByteBuffer> localIndexBuffer;
//数据文件刷盘线程
private FlushDataService flushDataService;
//清除过期日志文件线程
private CleanSpaceService cleanSpaceService;
//磁盘是否已满
private boolean isDiskFull = false;
//上一次检测点
private long lastCheckPointTimeMs = System.currentTimeMillis();
//是否已经加载,主要用来避免重复加载(初始化)日志文件
private AtomicBoolean hasLoaded = new AtomicBoolean(false);
//是否已恢复
private AtomicBoolean hasRecovered = new AtomicBoolean(false);
}
DLedgerMmapFileStore#startup()
1
2
3
4
5
6
7
8
9
/**
* 文件内存映射存储实现启动
*/
public void startup() {
load();
recover();
flushDataService.start();
cleanSpaceService.start();
}
DLedgerMmapFileStore#flush()
1
2
3
4
5
6
7
/**
* 刷写日志条目文件和索引文件
*/
public void flush() {
this.dataFileList.flush(0);
this.indexFileList.flush(0);
}
MmapFileList#flush()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 刷写日志条目文件
* @param flushLeastPages 0
* @return 结果
*/
public boolean flush(final int flushLeastPages) {
boolean result = true;
MmapFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
}
return result;
}
DefaultMmapFile#flush()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 立即将缓存中的数据刷新到磁盘。
* @return The current flushed position
*/
@Override
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
this.mappedByteBuffer.force();
} catch (Throwable e) {
logger.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
logger.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
Dledger存储对比RocketMQ存储
功能 | Dledger存储 | RocketMQ存储 |
---|---|---|
表示物理文件 | DefaultMmapFile | MappedFile |
表示多个物理文件(逻辑连续) | MmapFileList | MappedFileQueue |
封装存储逻辑 | DLedgerMmapFileStore | DefaultMessageStore |
文件刷盘线程 | DLedgerMmapFileStore$FlushDataService | CommitLog$FlushCommitLogService |
过期文件删除线程 | DLedgerMmapFileStore$CleanSpaceService | DefaultMessageStore$CleanCommitLogService |
数据存储格式
日志条目
DLedgerEntryCoder#encode()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 编码
* @param entry 日志条目
* @param byteBuffer 缓冲区
*/
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
byteBuffer.clear();
int size = entry.computSizeInBytes();
//always put magic on the first position
//魔数,4 字节
byteBuffer.putInt(entry.getMagic());
//条目总长度,包含Header(协议头) + 消息体,占4 字节
byteBuffer.putInt(size);
//当前条目的index,占8 字节
byteBuffer.putLong(entry.getIndex());
//当前条目所属的投票轮次,占8 字节
byteBuffer.putLong(entry.getTerm());
//该条目的物理偏移量,类似于commitlog 文件的物理偏移量,占8 字节
byteBuffer.putLong(entry.getPos());
//保留字段,当前版本未使用,占4 字节
byteBuffer.putInt(entry.getChannel());
//当前版本未使用,占4 字节
byteBuffer.putInt(entry.getChainCrc());
//body 的CRC 校验和,用来区分数据是否损坏,占4 字节。
byteBuffer.putInt(entry.getBodyCrc());
//用来存储body 的长度,占4 个字节。
byteBuffer.putInt(entry.getBody().length);
//具体消息的内容。
byteBuffer.put(entry.getBody());
byteBuffer.flip();
}
日志索引
DLedgerEntryCoder#encodeIndex()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 日志索引编码
* @param pos 日志条目在文件的偏移量
* @param size 条目大小
* @param magic 魔数
* @param index 索引
* @param term 投票轮次
* @param byteBuffer 缓冲区
*/
public static void encodeIndex(long pos, int size, int magic, long index, long term, ByteBuffer byteBuffer) {
byteBuffer.clear();
//魔数,4 字节
byteBuffer.putInt(magic);
//日志条目在文件的偏移量,8字节
byteBuffer.putLong(pos);
//条目大小,4字节
byteBuffer.putInt(size);
//日志条目索引,8字节
byteBuffer.putLong(index);
//投票轮次,8字节
byteBuffer.putLong(term);
byteBuffer.flip();
}