文件与内存映射
RocketMQ 通过使用内存映射文件来提高 IO 访问性能,无论是 CommitLog、ConsumeQueue、IndexFile ,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
CommitLog文件的组织方式:
使用 MappedFile、MappedFileQueue 来封装存储文件:
MappedFileQueue
MappedFileQueue是MappedFile 管理容器,MappedFileQueue是对存储目录的封装, 例如 CommitLog 文件的存储路径${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
| public class MappedFileQueue {
private static final int DELETE_FILES_BATCH_MAX = 10;
//存储目录
private final String storePath;
//单个文件存储大小
private final int mappedFileSize;
//MappedFile文件集合
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
//创建MappedFile的服务类
private final AllocateMappedFileService allocateMappedFileService;
//当前刷盘指针,标识该指针之前的所有数据全部持久化到硬盘
private long flushedWhere = 0;
//当前数据提交指针,内存中ByteBuffer当前的写指针
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
//根据消息存储时间戳查找MappedFile
//从MappedFile列表中第一个文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文件,如果不存在返回最后一个MappedFile文件
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
//根据消息偏移量查找MappedFile
//根据offet 查找MappedFile 直接使用offset%mappedFileSize是否可行?答案是否定的,由于使用了内存映射,只要存在于存储目录下的文件,
//都需要对应创建内存映射文件,如果不定时将已消费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,
//所以RocketMQ 采取定时删除存储文件的策略,也就是说在存储文件中, 第一个文件不一定是00000000000000000000 ,因为该文件在某一时刻会被删除,
//故根据offset 定位MappedFile 的算法为(int) ((offset/this.mappedFileSize)-(mappedFile.getFileFromOffset()/this.MappedFileSize)) 。
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
//获取存储文件最小偏移量,从这里也可以看出,并不是直接返回0 ,而是返回MappedFile的getFileFormOffset()
public long getMinOffset() {
if (!this.mappedFiles.isEmpty()) {
try {
return this.mappedFiles.get(0).getFileFromOffset();
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getMinOffset has exception.", e);
}
}
return -1;
}
//获取存储文件的最大偏移量,返回最后一个Mapp巳dFile 文件的fileFromOffset 加上MappedFile 文件当前的写指针
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
//返回存储文件当前的写指针。返回最后一个文件的fileFromOffset加上当前写指针位置。
public long getMaxWrotePosition() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
return 0;
}
}
|
MappedFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| public class MappedFile extends ReferenceResource {
//操作系统每页大小,默认4K
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//当前JVM实例中MappedFile虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//当前文件的写指针,从0开始(对应内存映射文件中的写指针)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//当前文件的提交指针,如果开启transientStore PoolEnable,则数据会存储在TransientStorePool中,然后提交到内存映射ByteBuffer中,再刷到硬盘。
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//刷写硬盘指针,该指针之前的数据持久化到硬盘
private final AtomicInteger flushedPosition = new AtomicInteger(0);
//文件大小
protected int fileSize;
//文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
//堆内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile 对应的内存映射文件Buffer。transientStorePoolEnable为true 时不为空
protected ByteBuffer writeBuffer = null;
//堆内存池, transientStor巳PoolEnable 为true时启用。
protected TransientStorePool transientStorePool = null;
//文件名称
private String fileName;
//该文件初始偏移量
private long fileFromOffset;
//物理文件
private File file;
//物理文件对应内存映射Buffer
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
//是否是 MappedFileQueue队列中第一个文件
private boolean firstCreateInQueue = false;
}
|
初始化
根据是否开启transientStorePoolEnable
存在两种初始化情况。transientStorePoolEnable
为true 表示内容先存储在堆外内存,然后通过Commit 线程将数据提交到内存映射Buffer中,再通过Flush 线程将内存映射Buffer 中的数据持久化到磁盘中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
//如果transientstorePooIEnable为true,则初始化MappedFile的writeBuffer
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
//初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
|
提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| //执行提交操作, commitLeastPages 为本次提交最小的页数,如果待提交数据不满commitLeastPages ,则不执行本次提交操作,待下次提交
//writeBuff1巳r 如果为空,直接返回wrotePosition 指针,无须执行commit 操作, 表明commit 操作主体是writeBuffer
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
//判断是否执行commit操作
protected boolean isAbleToCommit(final int commitLeastPages) {
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
//如果commitLeastPages 大于0,则比较wrotePosition(当前writeBuffe的写指针)与上一次提交的指针(committedPosition)的差值,
//除以OS_PAGE_SIZE 得到当前脏页的数量,如果大于commitLeastPages 则返回true ;如果commitLeastPages 小于0 表示只要存在脏页就提交。
if (commitLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
}
//具体提交实现
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > commitLeastPages) {
try {
//创建writeBuffer的共享缓存区
ByteBuffer byteBuffer = writeBuffer.slice();
//将新创建的position 回退到上一次提交的位置
byteBuffer.position(lastCommittedPosition);
//设置limit 为wrotePosition
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
//commitedPosition 到wrotePosition 的数据写入到FileChannel
this.fileChannel.write(byteBuffer);
//更新committedPosition指针为wrotePosition
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
|
刷盘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
//获取MappedFile最大读指针
public int getReadPosition() {
//如果writeBuffer 为空, 则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
|
销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
//判断是否清理完成
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
//尝试释放资源
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
@Override
public boolean cleanup(final long currentRef) {
//如果available为true,表示MappedFile当前可用,无须清理,返回false;如果资源已经被清除,返回true
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}
//使用clean方法清除堆外内存
clean(this.mappedByteBuffer);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
|
TransientStorePool
TransientStorePool : 短暂的存储池。RocketMQ 单独创建一个MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写人该内存映射中,然后由commit 线程定时将数据从该内存复制到与目的物理文件对应的内存映射中。RokcetMQ 引人该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//avaliableBuffers个数,默认5
private final int poolSize;
//每个ByteBuffer 大小, 默认为mapedFileSizeCommitLog
private final int fileSize;
//ByteBuffer容器,双端队列
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
//创建poolSize个堆外内存, 并利用com.sun.jna.Library 类库将该批内存锁定,避免被置换到交换区,提高存储性能。
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
}
|