Redisson-07丨Redisson倒计时门闩分析

Posted by jiefang on February 13, 2021

Redisson倒计时门闩分析

用法

1
2
3
4
5
6
7
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

设置计时数量

trySetCountAsync(long count)

  • KEYS[1]:倒计时门闩的名称;
  • KEYS[2]:channel的名称;
  • ARGV[1]:发送的消息,1L;
  • ARGV[2]:倒计时计数的数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
             //判断倒计时门闩是否存在                             
            "if redis.call('exists', KEYS[1]) == 0 then "
                  //不存在则设置倒计时门闩                        
                + "redis.call('set', KEYS[1], ARGV[2]); "
                  //发送消息                        
                + "redis.call('publish', KEYS[2], ARGV[1]); "
                + "return 1 "
            + "else "
                + "return 0 "
            + "end",
            Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
}

阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void await() throws InterruptedException {
    RFuture<RedissonCountDownLatchEntry> future = subscribe();
    try {
        commandExecutor.syncSubscription(future);
		//获取此倒计时门闩的计时数量,如果大于0则阻塞等待
        while (getCount() > 0) {
            // waiting for open state
            RedissonCountDownLatchEntry entry = getEntry();
            if (entry != null) {
                entry.getLatch().await();
            }
        }
    } finally {
        unsubscribe(future);
    }
}

计时减1

countDownAsync()

1
2
3
4
5
6
7
8
9
10
11
@Override
public RFuture<Void> countDownAsync() {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    //倒计时门闩数量-1                      
                    "local v = redis.call('decr', KEYS[1]);" +
                    //如果数量减到0则删除此门闩                      
                    "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                    //发送消息                      
                    "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
                Arrays.<Object>asList(getName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
}