RocketMQ-08丨存储文件

Posted by jiefang on March 24, 2021

存储文件

  • commitlog :消息存储目录;

  • config :运行期间一些配置信息,主要包括下列信息;
    • consumerFilter.json : 主题消息过滤信息;
    • consumerOffset.json : 集群消费模式消息消费进度;
    • delayOffset.json :延时消息队列拉取进度;
    • subscriptionGroup.json : 消息消费组配置信息;
    • topics.json: topic 配置属性;
  • consumequeue :消息消费队列存储目录;
  • index :消息索引文件存储目录;
  • abort :如果存在abort 文件说明Broker 非正常关闭,该文件默认启动时创建,正常退出之前删除;
  • checkpoint :文件检测点,存储commitlog 文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index 索引文件最后一次刷盘时间戳;

CommitLog文件

CommitLog文件的存储目录默认为${ROCKET_HOME}/store/commitlog ,可以通过在broker 配置文件中设置storePathRootDir 属性来改变默认路径。CommitLog文件默认大小为1G ,可通过在broker 配置文件中设置mapedFileSizeCommitLog `属性来改变默认大小。

1
2
3
4
5
6
7
8
9
10
//根据偏移量与消息长度查找消息
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

MappedFile#selectMappedBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
    int readPosition = getReadPosition();
    if ((pos + size) <= readPosition) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        } else {
            log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                + this.fileFromOffset);
        }
    } else {
        log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
            + ", fileFromOffset: " + this.fileFromOffset);
    }

    return null;
}

ConsumeQueue 文件

RocketMQ 基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在commitlog 文件中,试想一下如果消息消费者直接从消息存储文件(commitlog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ 为了适应消息消费的检索需求,设计了消息消费队列文件(Consumequeue),该文件可以看成是Commitlog 关于消息消费的“索引”文件, consumequeue 的第一级目录为消息主题,第二级目录为主题的消息队列。为了加速ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个Consumequeue条目不会存储消息的全量信息。

单个ConsumeQueue 文件中默认包含30 万个条目,单个文件的长度为30W × 20 字节,单个ConsumeQueue 文件可以看出是一个ConsumeQueue 条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是当消息到达Commitlog 文件后, 由专门的线程产生消息转发任务,从而构建消息消费队列文件与下文提到的索引文件。

ConsumeQueue#getIndexBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//根据sta rtlnde x 获取消息消费队列条目
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    int mappedFileSize = this.mappedFileSize;
    //startIndex乘以20
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    //如果该offset小于minLogicOffset ,则返回null,说明该消息已被删除
    //如果大于minLogicOffset,则根据偏移量定位到具体的物理文件,然后通过offset与物理文大小取模获取在该文件的偏移量,从而从偏移量开始连续读取20个字节即可
    if (offset >= this.getMinLogicOffset()) {
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

MappedFile#selectMappedBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }
    return null;
}

根据时间查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public long getOffsetInQueueByTime(final long timestamp) {
    //根据时间戳定位到物理文件
    MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
    if (mappedFile != null) {
        long offset = 0;
        //采用二分查找来加速检索。首先计算最低查找偏移量,取消息队列最小偏移量与该文件最小偏移量二者中的最小偏移量为low 。
        //获取当前存储文件中有效的最小消息物理偏移量minPhysicOffset ,如果查找到消息偏移量小于该物理偏移量, 则结束该查找过程
        int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
        int high = 0;
        int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
        long leftIndexValue = -1L, rightIndexValue = -1L;
        long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
        SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
        if (null != sbr) {
            ByteBuffer byteBuffer = sbr.getByteBuffer();
            high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
            try {
                while (high >= low) {
                    midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                    byteBuffer.position(midOffset);
                    long phyOffset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    if (phyOffset < minPhysicOffset) {
                        low = midOffset + CQ_STORE_UNIT_SIZE;
                        leftOffset = midOffset;
                        continue;
                    }

                    long storeTime =
                        this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                    if (storeTime < 0) {
                        return 0;
                    } else if (storeTime == timestamp) {
                        targetOffset = midOffset;
                        break;
                    } else if (storeTime > timestamp) {
                        high = midOffset - CQ_STORE_UNIT_SIZE;
                        rightOffset = midOffset;
                        rightIndexValue = storeTime;
                    } else {
                        low = midOffset + CQ_STORE_UNIT_SIZE;
                        leftOffset = midOffset;
                        leftIndexValue = storeTime;
                    }
                }
				//如果targetOffset 不等于- 1 表示找到了存储时间戳等于待查找时间戳的消息;


                if (targetOffset != -1) {

                    offset = targetOffset;
                //如果leftIndexValue等于-l,表示返回当前时间戳大并且最接近待查找的偏移量; 
                } else {
                    if (leftIndexValue == -1) {

                        offset = rightOffset;
                    //如果rightIndexValue等于-1,表示返回的消息比待查找时间戳小并且最接近查找的偏移量
                    } else if (rightIndexValue == -1) {

                        offset = leftOffset;
                    } else {
                        offset =
                            Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                - rightIndexValue) ? rightOffset : leftOffset;
                    }
                }

                return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
            } finally {
                sbr.release();
            }
        }
    }
    return 0;
}

Index索引文件

RocketMQ 引入了 Hash 索引机制为消息建立索引, HashMap 包含两个基本点 Hash 槽与 Hash 的链表结构。

lndexFile 总共包含 lndexHeader、Hash槽、 Hash条目(数据)。

  • IndexHeader :头部,包含 40 个字节,记录该 IndexFile 的统计信息,其结构如下:
    • beginTimestamp:该索引文件中包含消息的最小存储时间;
    • endTimes tamp:该索引文件中包含消息的最大存储时间;
    • beginPhyoffset:该索引文件中包含消息的最小物理偏移量( commitlog 文件偏移量);
    • endPhyoffset :该索引文件中包含消息的最大物理偏移量( commitlog 文件偏移量);
    • hashslotCount: hashslot 数,并不是 hash 槽使用的个数,在这里意义不大;
    • indexCount: Index 条目列表当前已使用的个数, Index 条目在 Index 条目列表中按顺序存储;
  • Hash 槽: 一个IndexFile 默认包含 500 万个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引;

  • Index 条目列表:默认一个索引文件包含 2000 万个条目,每 Index 条目结构如下
  • hashcode: key 的hashcode;
  • phyoffset:消息对应的物理偏移量;
  • timedif:该消息存储时间与第 条消息的时间戳的差值,小于 该消息无效;
  • prelndexNo :该条目的前一条 Index 索, 出现 hash冲突时,构建的链表结构;

IndexFile#putKey()

RocketMQ 将消息索引键与消息偏移量映射关系写入到 IndexFile 的实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //已使用条目大于等于允许最大条目数,返回false,表示当前索引文件已写满
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //获取hashcode绝对值
        int keyHash = indexKeyHashMethod(key);
        //取余,定位下标 maxHashSlotNum = 5000000
        int slotPos = keyHash % this.hashSlotNum;
        //hash槽的物理地址
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            //读取hash槽中存储的数据,如果hash槽存储的数据小于0或大于当前索中的索引条目格式, slotValue置为0
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;
			//计算待存储消息的时间戳与第一条消息时间戳的差值,并转换成秒
            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            //这里是Hash冲突链式解决方案的关键实现, Hash槽中存储的是该HashCode所对应的
			//最新的Index条目的下标,新的Index条目的最后4个字节存储该HashCode上一个条目的
			//Index下标。如果Hash槽中存储的值为0或大于当前IndexFile最大条目数或小于-1,
			//表示该Hash槽当前并没有与之对应的Index条目。值得关注的是IndexFile条目中存储的不
			//是消息索引key而是消息属性key的HashCode,在根据key查找时需要根据消息物理偏移
			//量找到消息进而再验证消息key的值,之所以只存储HashCode不存储具体的key是为
			//了将 Index 目设计为定长结构,才能方便地检索与定位条目。
            //将条目信息存储在IndexFile,计算物理偏移量=头部字节长度+hash槽数量*单个hash槽大小+当前index条目个数*单个Index条目大小
            //maxIndexNum = 5000000 * 4
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;
			//hashcode,消息物理偏移量,消息存储时间戳,索引文件时间戳,当前hash槽的值存入mappedByteBuffer
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            //上一个同一个hash槽里存放的值,形成链表
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
			//覆盖之前hash槽的值
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
			//更新索引头信息
            //如果当前文件只有一个条目,设置起始条目偏移量和起始时间
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            if (invalidIndex == slotValue) {
                this.indexHeader.incHashSlotCount();
            }
            //条目index下标++
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}

IndexFile#selectPhyOffset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
        //计算hashcode
        int keyHash = indexKeyHashMethod(key);
        //取余,定位hash槽下标
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;
        try {
            if (lock) {
                // fileLock = this.fileChannel.lock(absSlotPos,
                // hashSlotSize, true);
            }

            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // if (fileLock != null) {
            // fileLock.release();
            // fileLock = null;
            // }
			//如果对应的Hash槽中存储的数据小于1大于当前索引条目个数则表示HashCode没有对应的条目, 
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {
            } else {
                //循环查找最多maxNum数量的消息物理偏移量
                for (int nextIndexToRead = slotValue; ; ) {
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }

                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;

                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
					//时间校验
                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;
					//时间匹配
                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                    if (keyHash == keyHashRead && timeMatched) {
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }

                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
            this.mappedFile.release();
        }
    }
}

checkpoint 文件

checkpoint 作用是记录 Comitlog、ConsumeQueue/Index 文件的刷盘时间点, 文件固定长度为4k ,其中只用该文件的前 24 字节。

  • physicMsgTimestamp:commitLog文件刷盘时间点;
  • logicMsgTimestamp:消息消费队列文件刷盘时间点;
  • indexMsgTimestamp:索引文件刷盘时间点;