Redisson倒计时门闩分析
用法
1
2
3
4
5
6
7
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
设置计时数量
trySetCountAsync(long count)
- KEYS[1]:倒计时门闩的名称;
- KEYS[2]:channel的名称;
- ARGV[1]:发送的消息,1L;
- ARGV[2]:倒计时计数的数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断倒计时门闩是否存在
"if redis.call('exists', KEYS[1]) == 0 then "
//不存在则设置倒计时门闩
+ "redis.call('set', KEYS[1], ARGV[2]); "
//发送消息
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
}
阻塞等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void await() throws InterruptedException {
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
commandExecutor.syncSubscription(future);
//获取此倒计时门闩的计时数量,如果大于0则阻塞等待
while (getCount() > 0) {
// waiting for open state
RedissonCountDownLatchEntry entry = getEntry();
if (entry != null) {
entry.getLatch().await();
}
}
} finally {
unsubscribe(future);
}
}
计时减1
countDownAsync()
1
2
3
4
5
6
7
8
9
10
11
@Override
public RFuture<Void> countDownAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//倒计时门闩数量-1
"local v = redis.call('decr', KEYS[1]);" +
//如果数量减到0则删除此门闩
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
//发送消息
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
}