延迟消息
定时消息是指消息发送到Broker 后,并不立即被消费者消费而是要等到特定的时间后才能被消费, RocketMQ 并不支持任意的时间精度, 如果要支持任意时间精度的定时调度,不可避免地需要在Broker 层做消息排序(可以参考JDK 并发包调度线程池ScheduledExecutorService 的实现原理),再加上持久化方面的考量,将不可避免地带来具大的性能消耗,所以RocketMQ 只支持特定级别的延迟消息。消息延迟级别在Broker 端通过messageDelayLevel 配置,默认为” ls 5s 10s 30s lm 2m 3m 4m 5m 6m 7m 8m 9m l0m 20m 30m lh 2h”,delayLevel=1表示延迟1S。消息重试正是借助定时任务实现的,在将消息存入commitlog 文件之前需要判断消息的重试次数,如果大于0 ,则会将消息的主题设置为SCHEDULE TOPIC XXXX 。RocketMQ 定时消息实现类为org.apache .rocketmq. store.schedule. ScheduleMessageService 。
设计关键点
- 定时消息单独一个主题:SCHEDULE_TOPIC_XXXX , 该主题下队列数量等于MessageStoreConfig#messageDelayLevel 配置的延迟级别数量, 其对应关系为queueld 等于延迟级别减1 。ScheduleMessageService 为每一个延迟级别创建一个定时Timer 根据延迟级别对应的延迟时间进行延迟调度。在消息发送时, 如果消息的延迟级别delay Level 大于0 , 将消息的原主题名称、队列ID 存入消息的属性中,然后改变消息的主题、队列与延迟主题与延迟主题所属队列, 消息将最终转发到延迟队列的消费队列。
- 消息存储时如果消息的延迟级别属性delay Level 大于0 ,则会各份原主题、原队列到消息属性中,其键分别为PROPERTY_REAL_TOPIC 、PROPERTY_REAL_QUEUE_ID , 通过为不同的延迟级别创建不同的调度任务, 当时间到达后执行调度任务, 调度任务主要就是根据延迟拉取消息消费进度从延迟队列中拉取消息,然后从commitLog 中加载完整消息,清除延迟级别属性并恢复原先的主题、队列,再次创建一条新的消息存入到commitlog 中并转发到消息消费队列供消息消费者消费。
延迟消息流程
- 消息生产者发送消息到topicA,如果发送消息的delayLevel 大于0 ,则改变消息主题为SCHEDULE_ TOPIC_ XXXX ,消息队列为delayLevel 减l ;
- 消息经由commitLog 转发到延迟消息主题SCHEDULE_ TOPIC_ XXXX的消息消费队列;
- 定时任务Timer 每次执行根据上次拉取偏移量从消费队列中取出所有消息;
- 根据消息的物理偏移量与消息大小从CommitLog 中拉取消息;
- 根据消息属性重新创建消息,并恢复原主题topicA 、原队列ID ,清除delayLevel属性,存入commitLog 文件。
- 转发到原主题topicA 的消息消费队列,供消息消费者消费。
ScheduleMessageService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 周期定时任务消息处理类
*/
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//第一次调度延时时间,默认1S
private static final long FIRST_DELAY_TIME = 1000L;
//每一延时级别调度一次后延迟该时间间隔后再放入调度池
private static final long DELAY_FOR_A_WHILE = 100L;
//发送异常后延迟该时间后再继续参与调度
private static final long DELAY_FOR_A_PERIOD = 10000L;
//延迟等级-延迟毫秒数对应关系
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
//延迟等级-消费进度对应关系
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
}
ScheduleMessageService#load()
1
2
3
4
5
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
return result;
}
ScheduleMessageService#parseDelayLevel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 初始化延迟等级-延迟毫秒数关系Map
* @return boolean
*/
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
ScheduleMessageService#start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 启动定时调度任务
*/
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//为每一个延迟等级创建一个调度任务,第一次启动默认延时1S
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
//创建定时任务每隔10S持久化一次延迟队列消费进度
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
DeliverDelayedMessageTimerTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* 延迟消息计时器任务
*/
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
/**
* 按时执行
*/
public void executeOnTimeup() {
//队列ID=延迟等级-1,根据队列ID与延迟主题查找消费队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//根据offset从消息消费队列获取当前队列中所有有效信息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//遍历查询的ByteBuffer,每20个字节是一条消息的信息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//消息物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
//消息长度
int sizePy = bufferCQ.getByteBuffer().getInt();
//消息tag hashCode
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//根据消息物理偏移量从commitLog查找消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//根据延迟消息构建正常消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//消息再次存入commitLog,转发到主题对应的消费队列上,供消费者想再次消费
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
//创建新的延时任务,更新延迟队列定时拉取进度
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
}