存储文件
-
commitlog :消息存储目录;
- config :运行期间一些配置信息,主要包括下列信息;
- consumerFilter.json : 主题消息过滤信息;
- consumerOffset.json : 集群消费模式消息消费进度;
- delayOffset.json :延时消息队列拉取进度;
- subscriptionGroup.json : 消息消费组配置信息;
- topics.json: topic 配置属性;
- consumequeue :消息消费队列存储目录;
- index :消息索引文件存储目录;
- abort :如果存在abort 文件说明Broker 非正常关闭,该文件默认启动时创建,正常退出之前删除;
- checkpoint :文件检测点,存储commitlog 文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index 索引文件最后一次刷盘时间戳;
CommitLog文件
CommitLog文件的存储目录默认为${ROCKET_HOME}/store/commitlog
,可以通过在broker 配置文件中设置storePathRootDir 属性来改变默认路径。CommitLog文件默认大小为1G ,可通过在broker 配置文件中设置mapedFileSizeCommitLog
`属性来改变默认大小。
1
2
3
4
5
6
7
8
9
10
//根据偏移量与消息长度查找消息
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
MappedFile#selectMappedBuffer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();
if ((pos + size) <= readPosition) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
}
return null;
}
ConsumeQueue 文件
RocketMQ 基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在commitlog 文件中,试想一下如果消息消费者直接从消息存储文件(commitlog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ 为了适应消息消费的检索需求,设计了消息消费队列文件(Consumequeue),该文件可以看成是Commitlog 关于消息消费的“索引”文件, consumequeue 的第一级目录为消息主题,第二级目录为主题的消息队列。为了加速ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个Consumequeue条目不会存储消息的全量信息。
单个ConsumeQueue 文件中默认包含30 万个条目,单个文件的长度为30W × 20 字节,单个ConsumeQueue 文件可以看出是一个ConsumeQueue 条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是当消息到达Commitlog 文件后, 由专门的线程产生消息转发任务,从而构建消息消费队列文件与下文提到的索引文件。
ConsumeQueue#getIndexBuffer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//根据sta rtlnde x 获取消息消费队列条目
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
//startIndex乘以20
long offset = startIndex * CQ_STORE_UNIT_SIZE;
//如果该offset小于minLogicOffset ,则返回null,说明该消息已被删除
//如果大于minLogicOffset,则根据偏移量定位到具体的物理文件,然后通过offset与物理文大小取模获取在该文件的偏移量,从而从偏移量开始连续读取20个字节即可
if (offset >= this.getMinLogicOffset()) {
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}
MappedFile#selectMappedBuffer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
根据时间查找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public long getOffsetInQueueByTime(final long timestamp) {
//根据时间戳定位到物理文件
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
if (mappedFile != null) {
long offset = 0;
//采用二分查找来加速检索。首先计算最低查找偏移量,取消息队列最小偏移量与该文件最小偏移量二者中的最小偏移量为low 。
//获取当前存储文件中有效的最小消息物理偏移量minPhysicOffset ,如果查找到消息偏移量小于该物理偏移量, 则结束该查找过程
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
try {
while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
continue;
}
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
//如果targetOffset 不等于- 1 表示找到了存储时间戳等于待查找时间戳的消息;
if (targetOffset != -1) {
offset = targetOffset;
//如果leftIndexValue等于-l,表示返回当前时间戳大并且最接近待查找的偏移量;
} else {
if (leftIndexValue == -1) {
offset = rightOffset;
//如果rightIndexValue等于-1,表示返回的消息比待查找时间戳小并且最接近查找的偏移量
} else if (rightIndexValue == -1) {
offset = leftOffset;
} else {
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
Index索引文件
RocketMQ 引入了 Hash 索引机制为消息建立索引, HashMap 包含两个基本点 Hash 槽与 Hash 的链表结构。
lndexFile 总共包含 lndexHeader、Hash槽、 Hash条目(数据)。
- IndexHeader :头部,包含 40 个字节,记录该 IndexFile 的统计信息,其结构如下:
- beginTimestamp:该索引文件中包含消息的最小存储时间;
- endTimes tamp:该索引文件中包含消息的最大存储时间;
- beginPhyoffset:该索引文件中包含消息的最小物理偏移量( commitlog 文件偏移量);
- endPhyoffset :该索引文件中包含消息的最大物理偏移量( commitlog 文件偏移量);
- hashslotCount: hashslot 数,并不是 hash 槽使用的个数,在这里意义不大;
- indexCount: Index 条目列表当前已使用的个数, Index 条目在 Index 条目列表中按顺序存储;
-
Hash 槽: 一个IndexFile 默认包含 500 万个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引;
- Index 条目列表:默认一个索引文件包含 2000 万个条目,每 Index 条目结构如下
- hashcode: key 的hashcode;
- phyoffset:消息对应的物理偏移量;
- timedif:该消息存储时间与第 条消息的时间戳的差值,小于 该消息无效;
- prelndexNo :该条目的前一条 Index 索, 出现 hash冲突时,构建的链表结构;
IndexFile#putKey()
RocketMQ 将消息索引键与消息偏移量映射关系写入到 IndexFile 的实现方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//已使用条目大于等于允许最大条目数,返回false,表示当前索引文件已写满
if (this.indexHeader.getIndexCount() < this.indexNum) {
//获取hashcode绝对值
int keyHash = indexKeyHashMethod(key);
//取余,定位下标 maxHashSlotNum = 5000000
int slotPos = keyHash % this.hashSlotNum;
//hash槽的物理地址
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//读取hash槽中存储的数据,如果hash槽存储的数据小于0或大于当前索中的索引条目格式, slotValue置为0
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
//计算待存储消息的时间戳与第一条消息时间戳的差值,并转换成秒
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//这里是Hash冲突链式解决方案的关键实现, Hash槽中存储的是该HashCode所对应的
//最新的Index条目的下标,新的Index条目的最后4个字节存储该HashCode上一个条目的
//Index下标。如果Hash槽中存储的值为0或大于当前IndexFile最大条目数或小于-1,
//表示该Hash槽当前并没有与之对应的Index条目。值得关注的是IndexFile条目中存储的不
//是消息索引key而是消息属性key的HashCode,在根据key查找时需要根据消息物理偏移
//量找到消息进而再验证消息key的值,之所以只存储HashCode不存储具体的key是为
//了将 Index 目设计为定长结构,才能方便地检索与定位条目。
//将条目信息存储在IndexFile,计算物理偏移量=头部字节长度+hash槽数量*单个hash槽大小+当前index条目个数*单个Index条目大小
//maxIndexNum = 5000000 * 4
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//hashcode,消息物理偏移量,消息存储时间戳,索引文件时间戳,当前hash槽的值存入mappedByteBuffer
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//上一个同一个hash槽里存放的值,形成链表
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//覆盖之前hash槽的值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//更新索引头信息
//如果当前文件只有一个条目,设置起始条目偏移量和起始时间
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
//条目index下标++
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
IndexFile#selectPhyOffset()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
//计算hashcode
int keyHash = indexKeyHashMethod(key);
//取余,定位hash槽下标
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
//如果对应的Hash槽中存储的数据小于1大于当前索引条目个数则表示HashCode没有对应的条目,
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
//循环查找最多maxNum数量的消息物理偏移量
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
//时间校验
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
//时间匹配
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
checkpoint 文件
checkpoint 作用是记录 Comitlog、ConsumeQueue/Index 文件的刷盘时间点, 文件固定长度为4k ,其中只用该文件的前 24 字节。
- physicMsgTimestamp:commitLog文件刷盘时间点;
- logicMsgTimestamp:消息消费队列文件刷盘时间点;
- indexMsgTimestamp:索引文件刷盘时间点;