RocketMQ-07丨文件与内存映射

Posted by jiefang on March 23, 2021

文件与内存映射

RocketMQ 通过使用内存映射文件来提高 IO 访问性能,无论是 CommitLog、ConsumeQueue、IndexFile ,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

CommitLog文件的组织方式:

使用 MappedFile、MappedFileQueue 来封装存储文件:

MappedFileQueue

MappedFileQueue是MappedFile 管理容器,MappedFileQueue是对存储目录的封装, 例如 CommitLog 文件的存储路径${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class MappedFileQueue {

    private static final int DELETE_FILES_BATCH_MAX = 10;
	//存储目录
    private final String storePath;
	//单个文件存储大小
    private final int mappedFileSize;
	//MappedFile文件集合
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
	//创建MappedFile的服务类
    private final AllocateMappedFileService allocateMappedFileService;
	//当前刷盘指针,标识该指针之前的所有数据全部持久化到硬盘
    private long flushedWhere = 0;
    //当前数据提交指针,内存中ByteBuffer当前的写指针
    private long committedWhere = 0;

    private volatile long storeTimestamp = 0;
    //根据消息存储时间戳查找MappedFile
    //从MappedFile列表中第一个文件开始查找,找到第一个最后一次更新时间大于待查找时间戳的文件,如果不存在返回最后一个MappedFile文件
    public MappedFile getMappedFileByTime(final long timestamp) {
        Object[] mfs = this.copyMappedFiles(0);
        if (null == mfs)
            return null;
        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }
        return (MappedFile) mfs[mfs.length - 1];
    }
    //根据消息偏移量查找MappedFile
    //根据offet 查找MappedFile 直接使用offset%mappedFileSize是否可行?答案是否定的,由于使用了内存映射,只要存在于存储目录下的文件,
    //都需要对应创建内存映射文件,如果不定时将已消费的消息从存储文件中删除,会造成极大的内存压力与资源浪费,
    //所以RocketMQ 采取定时删除存储文件的策略,也就是说在存储文件中, 第一个文件不一定是00000000000000000000 ,因为该文件在某一时刻会被删除,
    //故根据offset 定位MappedFile 的算法为(int) ((offset/this.mappedFileSize)-(mappedFile.getFileFromOffset()/this.MappedFileSize)) 。
    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            MappedFile firstMappedFile = this.getFirstMappedFile();
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }
        return null;
    }
    //获取存储文件最小偏移量,从这里也可以看出,并不是直接返回0 ,而是返回MappedFile的getFileFormOffset()
    public long getMinOffset() {
        if (!this.mappedFiles.isEmpty()) {
            try {
                return this.mappedFiles.get(0).getFileFromOffset();
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getMinOffset has exception.", e);
            }
        }
        return -1;
    }
    //获取存储文件的最大偏移量,返回最后一个Mapp巳dFile 文件的fileFromOffset 加上MappedFile 文件当前的写指针
    public long getMaxOffset() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
        return 0;
    }
    //返回存储文件当前的写指针。返回最后一个文件的fileFromOffset加上当前写指针位置。
    public long getMaxWrotePosition() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
        }
        return 0;
    }    
}

MappedFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MappedFile extends ReferenceResource {
    //操作系统每页大小,默认4K
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
	//当前JVM实例中MappedFile虚拟内存
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
	//当前JVM实例中MappedFile对象个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    //当前文件的写指针,从0开始(对应内存映射文件中的写指针)
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //当前文件的提交指针,如果开启transientStore PoolEnable,则数据会存储在TransientStorePool中,然后提交到内存映射ByteBuffer中,再刷到硬盘。
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //刷写硬盘指针,该指针之前的数据持久化到硬盘
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小
    protected int fileSize;
    //文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    //堆内存ByteBuffer,如果不为空,数据首先将存储在该Buffer中,然后提交到MappedFile 对应的内存映射文件Buffer。transientStorePoolEnable为true 时不为空
    protected ByteBuffer writeBuffer = null;
    //堆内存池, transientStor巳PoolEnable 为true时启用。
    protected TransientStorePool transientStorePool = null;
    //文件名称
    private String fileName;
    //该文件初始偏移量
    private long fileFromOffset;
    //物理文件
    private File file;
    //物理文件对应内存映射Buffer
    private MappedByteBuffer mappedByteBuffer;
    private volatile long storeTimestamp = 0;
    //是否是 MappedFileQueue队列中第一个文件
    private boolean firstCreateInQueue = false;
}

初始化

根据是否开启transientStorePoolEnable 存在两种初始化情况。transientStorePoolEnable为true 表示内容先存储在堆外内存,然后通过Commit 线程将数据提交到内存映射Buffer中,再通过Flush 线程将内存映射Buffer 中的数据持久化到磁盘中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    //如果transientstorePooIEnable为true,则初始化MappedFile的writeBuffer
    this.writeBuffer = transientStorePool.borrowBuffer();
    this.transientStorePool = transientStorePool;
}

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    //初始化fileFromOffset为文件名,也就是文件名代表该文件的起始偏移量
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//执行提交操作, commitLeastPages 为本次提交最小的页数,如果待提交数据不满commitLeastPages ,则不执行本次提交操作,待下次提交
//writeBuff1巳r 如果为空,直接返回wrotePosition 指针,无须执行commit 操作, 表明commit 操作主体是writeBuffer
public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }
    return this.committedPosition.get();
}
//判断是否执行commit操作
protected boolean isAbleToCommit(final int commitLeastPages) {
    int flush = this.committedPosition.get();
    int write = this.wrotePosition.get();

    if (this.isFull()) {
        return true;
    }
	//如果commitLeastPages 大于0,则比较wrotePosition(当前writeBuffe的写指针)与上一次提交的指针(committedPosition)的差值,
    //除以OS_PAGE_SIZE 得到当前脏页的数量,如果大于commitLeastPages 则返回true ;如果commitLeastPages 小于0 表示只要存在脏页就提交。
    if (commitLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }

    return write > flush;
}
//具体提交实现
protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - lastCommittedPosition > commitLeastPages) {
        try {
            //创建writeBuffer的共享缓存区
            ByteBuffer byteBuffer = writeBuffer.slice();
            //将新创建的position 回退到上一次提交的位置
            byteBuffer.position(lastCommittedPosition);
            //设置limit 为wrotePosition
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            //commitedPosition 到wrotePosition 的数据写入到FileChannel
            this.fileChannel.write(byteBuffer);
            //更新committedPosition指针为wrotePosition
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

刷盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }

            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}
//获取MappedFile最大读指针
public int getReadPosition() {
    //如果writeBuffer 为空, 则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public boolean destroy(final long intervalForcibly) {
    this.shutdown(intervalForcibly);
	//判断是否清理完成
    if (this.isCleanupOver()) {
        try {
            this.fileChannel.close();
            log.info("close file channel " + this.fileName + " OK");

            long beginTime = System.currentTimeMillis();
            boolean result = this.file.delete();
            log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                + this.getFlushedPosition() + ", "
                + UtilAll.computeElapsedTimeMilliseconds(beginTime));
        } catch (Exception e) {
            log.warn("close file channel " + this.fileName + " Failed. ", e);
        }
        return true;
    } else {
        log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
            + " Failed. cleanupOver: " + this.cleanupOver);
    }
    return false;
}
public void shutdown(final long intervalForcibly) {
    if (this.available) {
        this.available = false;
        this.firstShutdownTimestamp = System.currentTimeMillis();
        //尝试释放资源
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}
public void release() {
    long value = this.refCount.decrementAndGet();
    if (value > 0)
        return;

    synchronized (this) {
        this.cleanupOver = this.cleanup(value);
    }
}
@Override
public boolean cleanup(final long currentRef) {
    //如果available为true,表示MappedFile当前可用,无须清理,返回false;如果资源已经被清除,返回true
    if (this.isAvailable()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
                  + " have not shutdown, stop unmapping.");
        return false;
    }

    if (this.isCleanupOver()) {
        log.error("this file[REF:" + currentRef + "] " + this.fileName
                  + " have cleanup, do not do it again.");
        return true;
    }
	//使用clean方法清除堆外内存
    clean(this.mappedByteBuffer);
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
    TOTAL_MAPPED_FILES.decrementAndGet();
    log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
    return true;
}

TransientStorePool

TransientStorePool : 短暂的存储池。RocketMQ 单独创建一个MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写人该内存映射中,然后由commit 线程定时将数据从该内存复制到与目的物理文件对应的内存映射中。RokcetMQ 引人该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
	//avaliableBuffers个数,默认5
    private final int poolSize;
    //每个ByteBuffer 大小, 默认为mapedFileSizeCommitLog
    private final int fileSize;
    //ByteBuffer容器,双端队列
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;
    //创建poolSize个堆外内存, 并利用com.sun.jna.Library 类库将该批内存锁定,避免被置换到交换区,提高存储性能。
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }    
}