Redisson-06丨Redisson信号量分析

Posted by jiefang on February 12, 2021

Redisson信号量分析

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。

信号量(Semaphore)

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(100);
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

初始化许可

trySetPermits(int permits)

1
2
3
4
@Override
public boolean trySetPermits(int permits) {
    return get(trySetPermitsAsync(permits));
}

trySetPermitsAsync(int permits)

KEYS = Arrays.asList(getName(), getChannelName())

  • KEYS[1]:信号量名称;

  • KEYS[2]:channel名称;

  • ARGV[1]:许可数量;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //获取信号量                              
            "local value = redis.call('get', KEYS[1]); " +
            //如果此信号量不存在或其许可数等于0                              
            "if (value == false or value == 0) then "
                  //设置信号量许可数                        
                + "redis.call('set', KEYS[1], ARGV[1]); "
                  //发送消息                        
                + "redis.call('publish', KEYS[2], ARGV[1]); "
                + "return 1;"
            + "end;"
            + "return 0;",
            Arrays.<Object>asList(getName(), getChannelName()), permits);
}

获取许可

RedissonSemaphore#acquire()->acquire(int permits)->tryAcquire(int permits)->tryAcquireAsync(int permits)

tryAcquireAsync()

  • KEYS[1]:信号量名称;
  • ARGV[1]:许可数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    //获取许可数量小于0抛异常,等于0,直接成功
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(true);
    }

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              //获取信号量                            
              "local value = redis.call('get', KEYS[1]); " +
              //如果信号量的许可数不为空且大于需要获取的数量                            
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  //当前信号量的许可数减去获取的数量                        
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  "return 1; " +
              "end; " +
              "return 0;",
              Collections.<Object>singletonList(getName()), permits);
}

释放许可

RedissonSemaphore#release()->release(int permits)->releaseAsync(int permits)

releaseAsync(int permits)

  • KEYS[1]:信号量名称;
  • KEYS[2]:channel的名称;
  • ARGV[1]:释放许可的数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(null);
    }

    return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
        //增加此信号量的许可数                                  
        "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
        //发送消息                                  
        "redis.call('publish', KEYS[2], value); ",
        Arrays.<Object>asList(getName(), getChannelName()), permits);
}

可过期性信号量(PermitExpirableSemaphore)

基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。

用法

1
2
3
4
5
6
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);