RocketMQ整合DLedger
RocketMQ 的消息存储文件主要包括CommitLog 文件、ConsumeQueue 文件与Index 文件。Commitlog 文件存储全量的消息,ConsumeQueue、Index 文件都是基于CommitLog 文件构建的。要使用DLedger 来实现消息存储的一致性,应该关键是要实现CommitLog 文件的一致性,即DLedger 要整合的对象应该是CommitLog 文件,即只需保证Raft 协议的复制组内各个节点的CommitLog 文件一致即可。DLedger 整合CommitLog 文件,把RocketMQ 消息,即一个个CommitLog 条目整体当成DLedger 的body 字段。
Broker启动流程
DefaultMessageStore构造方法
1
2
3
4
5
6
7
8
9
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//是否开启Dledger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
}
DLedgerCommitLog构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig();
//是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true
dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
//DLedger存储类型,固定为基于文件的存储模式
dLedgerConfig.setStoreType(DLedgerConfig.FILE);
//Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。
dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
//DLeger group 的名称,建议与broker 配置属性brokerName 保持一致
dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
//DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。
dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
//设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。
dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
//设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
//DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点
dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
//DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h
dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
//根据DLedger 配置信息创建DLedgerServer,即创建DLedger 集群节点,集群内各个节点启动后,就会触发选主
dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLedgerFileStore.addAppendHook(appendHook);
dLedgerFileList = dLedgerFileStore.getDataFileList();
//构建消息序列化
this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
BrokerController#initialize()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Broker初始化
* @return boolean
* @throws CloneNotSupportedException 不支持克隆异常
*/
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//是否启动Dledger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
//增加节点状态变更事件监听器
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
...
}
DLedgerCommitLog#load()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 加载CommitLog文件、ConsumQueue文件和IndexFile文件
* @return 是否加载成功
* @throws IOException IO异常
*/
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//加载周期消息处理类
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log,根据配置加载CommitLog或DLedgerCommitLog
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
DLedgerCommitLog
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Store all metadata downtime for recovery, data protection reliability
* Dledger实现的CommitLog
*/
public class DLedgerCommitLog extends CommitLog {
//基于Raft 协议实现的集群内的一个节点,用DLedgerServer 实例表示
private final DLedgerServer dLedgerServer;
//DLedger 的配置信息
private final DLedgerConfig dLedgerConfig;
//DLedger 基于文件映射的存储实现
private final DLedgerMmapFileStore dLedgerFileStore;
//DLedger 所管理的存储文件集合,对比RocketMQ中的MappedFileQueue
private final MmapFileList dLedgerFileList;
//The id identifies the broker role, 0 means master, others means slave 节点ID,0 表示主节点,非0 表示从节点
private final int id;
//消息序列器
private final MessageSerializer messageSerializer;
//用于记录消息追加的时耗(日志追加所持有锁时间)
private volatile long beginTimeInDledgerLock = 0;
//This offset separate the old commitlog from dledger commitlog
//记录的旧Commitlog文件中的最大偏移量,如果访问的偏移量大于它,则访问Dledger 管理的文件
private long dividedCommitlogOffset = -1;
//是否正在恢复旧的Commitlog文件
private boolean isInrecoveringOldCommitlog = false;
}
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
super(defaultMessageStore);
dLedgerConfig = new DLedgerConfig();
//是否强制删除文件,取自Broker配置属性cleanFileForciblyEnable,默认为true
dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
//DLedger存储类型,固定为基于文件的存储模式
dLedgerConfig.setStoreType(DLedgerConfig.FILE);
//Leader节点的id 名称,示例配置:n0,其配置要求第二个字符后必须是数字。
dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
//DLeger group 的名称,建议与broker 配置属性brokerName 保持一致
dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
//DLeger Group 中所有的节点信息,其配置示例n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多个节点使用分号隔开。
dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
//设置DLedger 的日志文件的根目录,取自borker 配件文件中的storePathRootDir ,即RocketMQ 的数据存储根路径。
dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
//设置DLedger 的单个日志文件的大小,取自Broker 配置文件中的mapedFileSizeCommitLog,即与Commitlog 文件的单个文件大小一致
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
//DLedger 日志文件的删除时间,取自Broker 配置文件中的deleteWhen,默认为凌晨4 点
dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
//DLedger 日志文件保留时长,取自Broker 配置文件中的fileReservedHours,默认为72h
dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
//根据DLedger 配置信息创建DLedgerServer,即创建DLedger 集群节点,集群内各个节点启动后,就会触发选主
dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLedgerFileStore.addAppendHook(appendHook);
dLedgerFileList = dLedgerFileStore.getDataFileList();
//构建消息序列化
this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
DLedgerCommitLog#recover()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 恢复
* @param maxPhyOffsetOfConsumeQueue 消息消费队列最大物理偏移量
*/
private void recover(long maxPhyOffsetOfConsumeQueue) {
dLedgerFileStore.load();
if (dLedgerFileList.getMappedFiles().size() > 0) {
dLedgerFileStore.recover();
//DLedger中所有物理文件的最小偏移量,操作消息的物理偏移量小于该值,则从Commitlog文件中查找;物理偏移量大于等于该值的话则从DLedger相关的文件中查找消息
dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//如果存在旧的Commitlog 文件,则禁止删除DLedger文件,其具体做法就是禁止强制删除文件,并将文件的有效存储时间设置为10年
if (mappedFile != null) {
//目的是防止CommitLog的偏移量与Dledger文件的偏移量出现断层
disableDeleteDledger();
}
//Dledger最大物理偏移量
long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {
//Consumequeue中存储的最大物理偏移量大于DLedger中最大的物理偏移量,则删除多余的Consumequeue文件
log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
}
return;
}
//Indicate that, it is the first time to load mixed commitlog, need to recover the old commitlog
isInrecoveringOldCommitlog = true;
//No need the abnormal recover 正常恢复
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {
return;
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
needWriteMagicCode = false;
} else {
log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {
byteBuffer.position(mappedFile.getWrotePosition());
byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
byteBuffer.putInt(BLANK_MAGIC_CODE);
mappedFile.flush(0);
}
mappedFile.setWrotePosition(mappedFile.getFileSize());
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}
DLedgerCommitLog#putMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 消息追加
* @param msg 消息
* @return 结果
*/
@Override
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
String topic = msg.getTopic();
setMessageInfo(msg,tranType);
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.getData());
//调用dLedgerServer#handleAppend进行追加,集群内的Leader 节点负责消息追加以及在消息复制,只有超过集群内的半数节点成功写入消息后,
//才会返回写入成功。如果追加成功,将会返回本次追加成功后的起始偏移量,即pos属性。
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}
//根据DLedger起始偏移量计算真正的消息的物理偏移量
//返回给客户端的消息偏移量为body字段的开始偏移量
//putMessage 返回的物理偏移量与不使用Dledger 方式返回的物理偏移量的含义是一样的,即从开偏移量开始,可以正确读取消息
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
break;
default:
break;
}
} catch (Exception e) {
log.error("Put message error", e);
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
} finally {
beginTimeInDledgerLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
}
PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
try {
AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS);
switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
case SUCCESS:
putMessageStatus = PutMessageStatus.PUT_OK;
break;
case INCONSISTENT_LEADER:
case NOT_LEADER:
case LEADER_NOT_READY:
case DISK_FULL:
putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
break;
case WAIT_QUORUM_ACK_TIMEOUT:
//Do not return flush_slave_timeout to the client, for the ons client will ignore it.
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
case LEADER_PENDING_FULL:
putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
break;
}
} catch (Throwable t) {
log.error("Failed to get dledger append result", t);
}
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes());
}
return putMessageResult;
}
DLedgerCommitLog#getMessage()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 根据偏移量读取消息
* @param offset 偏移量
* @param size 长度
* @return 结果
*/
@Override
public SelectMappedBufferResult getMessage(final long offset, final int size) {
//小于CommitLog最大偏移量,从CommitLog中查找
if (offset < dividedCommitlogOffset) {
return super.getMessage(offset, size);
}
int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
//根据偏移量寻找文件
MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
//返回查找的消息
return convertSbr(mappedFile.selectMappedBuffer(pos, size));
}
return null;
}
总结
- DLedger在整合时,使用DLedger 条目包裹RocketMQ 中的CommitLog 条目,即在DLedger 条目的body字段来存储整条CommitLog 条目;
- 引入dividedCommitlogOffset 变量,表示物理偏移量小于该值的消息存在于旧的CommitLog 文件中,实现升级DLedger 集群后能访问到旧的数据;
- 新DLedger 集群启动后,会将最后一个CommitLog 填充,即新的数据不会再写入到原先的CommitLog 文件;
- 消息追加到DLedger 数据日志文件中,返回的偏移量不是DLedger 条目的起始偏移量,而是DLedger 条目中body 字段的起始偏移量,即真实消息的起始偏移量,保证消息物理偏移量的语义与RocketMQ Commitlog 一样;