Redisson可重入锁源码分析
1
2
3
4
5
6
@Override
public RLock lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Redisson implements RedissonClient {
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
}
public class RedissonLock extends RedissonExpirable implements RLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
//获取锁的默认锁定时间30 * 1000
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
}
加锁
1
2
3
4
5
6
7
8
9
10
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
}
lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//获取线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//获取锁成功返回
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//循环获取锁直到获取到锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
//其它线程持有锁,当前线程没有获取锁成功
if (ttl >= 0) {
try {
//阻塞等待ttl的时间再次获取锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
tryAcquireAsync()
RedissonLock#tryAcquire->RedissonLock#tryAcquireAsync()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//设定了锁定时间
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//没有设定就取默认设置30*1000
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
//返回为null说明锁定成功,就添加看门狗监控锁
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//如果锁不存在,就设置锁,对应一个hash结构,key为Redis客户端和线程组成的唯一id,value为1,表示加锁次数,然后设置锁定时间为30000毫秒
//{
// “8743c9c0-0795-4907-87fd-6c719a6b4586:1”: 1
//}
//如果锁存在且key相等,即是同一个线程再次加锁,则hash的value加1,加锁次数加1,设置锁定时间
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//KEYS[1]:锁的名称
//ARGV[2]:8743c9c0-0795-4907-87fd-6c719a6b4586:1一个Redis客户端加线程组成的唯一标识
//ARGV[1]:30000,就是锁定时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
evalWriteAsync()
1
2
3
4
5
6
7
8
9
public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
//使用crc16算法对16384取模,获取锁对应的hashslot,根据hashslot获取对应的Redis Master节点
NodeSource source = getNodeSource(key);
//执行lua脚本
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
}
getNodeSource()
1
2
3
4
5
6
7
private NodeSource getNodeSource(String key) {
//使用crc16算法对16384取模,获取锁对应的hashslot
int slot = connectionManager.calcSlot(key);
//根据hashslot获取对应的Redis Master节点
MasterSlaveEntry entry = connectionManager.getEntry(slot);
return new NodeSource(entry, slot);
}
calcSlot()
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
key = key.substring(start+1, end);
}
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
}
锁监控
scheduleExpirationRenewal()
1
2
3
4
5
6
7
8
9
10
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
renewExpiration()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//每隔锁时间/3的时间调用看门狗为锁延长时间,默认是10000毫秒
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//如果延长锁时间成功,则递归调用自身方法,再次延长
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//延长锁时间成功
if (res) {
// reschedule itself
//再次调用自身方法
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renewExpirationAsync()
1
2
3
4
5
6
7
8
9
10
11
12
13
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//KEYS[1]:锁名称
//ARGV[2]:8e6b27a7-5346-483a-b9b5-0957c690c27f:1 判断这个锁hash中是否存在这个key
//如果存在,说明锁还是这个线程持有,重新设置锁定时间30000毫秒
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
锁释放
unlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void unlock() {
try {
//get包裹异步方法就会同步等待异步执行结果
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
unlockAsync()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
//解锁方法执行完成
future.onComplete((opStatus, e) -> {
//执行异常
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
//返回null,当前线程不是加锁线程
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//设置当前线程的任务为取消
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
unlockInnerAsync()
- KEYS[1]:锁名称;
- KEYS[2]:pub和sub的channel名称,redisson_lock__channel:{锁名称};
- ARGV[1]:pub和sub的消息,UNLOCK_MESSAGE = 0L;
- ARGV[2]:30000毫秒;
- ARGV[3]:26cebeaa-e3b0-4097-8192-d62d0d0214b8:1,当前客户端和当前线程组成的唯一标识;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断锁是否是当前线程持有,不是返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//加锁次数-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//-1后加锁次数任然大于0,锁续期30000毫秒,返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//加锁次数等于0,说明已解锁,删除锁,发布消息,返回1
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
超时锁
1
2
3
4
5
RLock lock = redissonClient.getLock(lockKey);
//waitTime:获取锁的最长等待时间
//leaseTime:锁定时间
//unit:时间单位
lock.tryLock(waitTime, leaseTime, unit);
tryLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//当前锁返回的锁剩余时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//如果ttl为空,则加锁成功
if (ttl == null) {
return true;
}
//剩余等待时间=最大等待时间-第一次获取锁耗费时间,如果剩余<=0,则获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
//订阅当前线程消息
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
//剩余等待时间=最大等待时间-第一次获取锁耗费时间,如果剩余<=0,则获取锁失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//死循环
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired,获取成功
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
//时间耗尽获取失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
//ttl时间后尝试获取锁
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
//取消订阅消息
unsubscribe(subscribeFuture, threadId);
}
}
}
tryAcquireOnceAsync()
RedissonLock#tryAcquire()->RedissonLock#tryAcquireOnceAsync()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
//此方法会进入此if分支,获取锁设定锁定时间,不会为这个锁增加看门狗监听器,到期会被删除
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
总结
Redisson基于Redis实现的分布式锁的机制:
- 加锁:在redis里设置hash数据结构,生存周期是30000毫秒;
- 维持加锁:代码里一直加锁,redis里的key会一直保持存活,后台每隔10秒的定时任务(watchdog)不断的检查,只要客户端还在加锁,就刷新key的生存周期为30000毫秒;
- 可重入锁:同一个线程可以多次加锁,就是在hash数据结构中将加锁次数累加1;
- 锁互斥:不同客户端,或者不同线程,尝试加锁陷入死循环等待;
- 手动释放锁:可重入锁自动递减加锁次数,全部释放锁之后删除锁key;
- 宕机自动释放锁:如果持有锁的客户端宕机了,那么此时后台的watchdog定时调度任务也没了,不会刷新锁key的生存周期,此时redis里的锁key会自动释放;
- 尝试加锁超时:在指定时间内没有成功加锁就自动退出死循环,标识本次尝试加锁失败;
- 超时锁自动释放:获取锁之后,在一定时间内没有手动释放锁,则redis里的key自动过期,自动释放锁;