Redisson信号量分析
基于Redis的Redisson的分布式信号量(Semaphore
)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。
信号量(Semaphore)
用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(100);
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
初始化许可
trySetPermits(int permits)
1
2
3
4
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
trySetPermitsAsync(int permits)
KEYS = Arrays.
-
KEYS[1]:信号量名称;
-
KEYS[2]:channel名称;
-
ARGV[1]:许可数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取信号量
"local value = redis.call('get', KEYS[1]); " +
//如果此信号量不存在或其许可数等于0
"if (value == false or value == 0) then "
//设置信号量许可数
+ "redis.call('set', KEYS[1], ARGV[1]); "
//发送消息
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
获取许可
RedissonSemaphore#acquire()->acquire(int permits)->tryAcquire(int permits)->tryAcquireAsync(int permits)
tryAcquireAsync()
- KEYS[1]:信号量名称;
- ARGV[1]:许可数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
//获取许可数量小于0抛异常,等于0,直接成功
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取信号量
"local value = redis.call('get', KEYS[1]); " +
//如果信号量的许可数不为空且大于需要获取的数量
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
//当前信号量的许可数减去获取的数量
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
释放许可
RedissonSemaphore#release()->release(int permits)->releaseAsync(int permits)
releaseAsync(int permits)
- KEYS[1]:信号量名称;
- KEYS[2]:channel的名称;
- ARGV[1]:释放许可的数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
//增加此信号量的许可数
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
//发送消息
"redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
可过期性信号量(PermitExpirableSemaphore)
基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore
)是在RSemaphore
对象的基础上,为每个信号增加了一个过期时间。
用法
1
2
3
4
5
6
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);