Redisson读写锁分析
基于Redis的Redisson分布式可重入读写锁RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
用法
1
2
3
4
5
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
读锁
RedissonLock#lock()->RedissonLock#lock(long leaseTime, TimeUnit unit, boolean interruptibly)->RedissonLock#tryAcquire(long leaseTime, TimeUnit unit, long threadId)->RedissonLock#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)->RedissonReadLock#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)
加锁
KEYS = Arrays.
- KEYS[1]:锁名称,myLock;
- KEYS[2]:{myLock}:UUID:threadId:rwlock_timeout;
ARGV = internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)
- ARGV[1]:30000;
- ARGV[2]:UUID:threadId;
- ARGV[3]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class RedissonReadLock extends RedissonLock implements RLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//获取当前读写锁的模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//如果读写模式为空
"if (mode == false) then " +
//设置读写锁读写模式为read
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
//设置hash结构,锁myLock和当前客户端+线程标识
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//设置string结构,key:{myLock}:UUID:threadId:rwlock_timeout:1,value:1
"redis.call('set', KEYS[2] .. ':1', 1); " +
//设置string超时时间为30000毫秒
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
//设置hash锁的超时时间为30000毫秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果当前锁模式为读模式或为写模式且锁中线程为当前线程
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
//设置锁中UUID:treadId的value+1
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//{myLock}:UUID:threadId:rwlock_timeout:拼接+1后的value
"local key = KEYS[2] .. ':' .. ind;" +
//插入string,{myLock}:UUID:threadId:rwlock_timeout:ind 1
"redis.call('set', key, 1); " +
//设置刚插入的string超时时间为30000毫秒
"redis.call('pexpire', key, ARGV[1]); " +
//获取锁的剩余时间
"local remainTime = redis.call('pttl', KEYS[1]); " +
//重新设置锁超时时间为30000毫秒和剩余时间中的大者
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),
internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
}
解锁
KEYS = Arrays.
- KEYS[1]:锁名称,myLock;
- KEYS[2]:pub和sub的channel名称;
- KEYS[3]:{myLock}:UUID:threadId:rwlock_timeout;
- KEYS[4]:{myLock};
**ARGV **= LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)
- ARGV[1]:pub和sub发送的消息:0L;
- ARGV[2]:UUID:threadId;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取锁的读写模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//模式没获取到
"if (mode == false) then " +
//发送消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
//锁中是否存在当前客户端+线程的key
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
//不存在返回
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
//锁中客户端+线程的value-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
//value-1后等于则删除锁中这个线程的key
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
//删除string类型的线程
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
//如果hash的长度大于1
"if (redis.call('hlen', KEYS[1]) > 1) then " +
//
"local maxRemainTime = -3; " +
//取出hash中所有的key
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
//循环遍历hash中所有value为数字类型的filed
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
//遍历这些key设置的超时时间
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
//获取最大的超时时间
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
//如果最大超时时间大于0,则设置锁的超时时间为最大超时时间
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
//如果当前锁模式是写模式,直接返回0
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
//删除锁
"redis.call('del', KEYS[1]); " +
//发送消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}
写锁
加锁
KEYS = Arrays.
- KEYS[1]:锁名称;
ARGV = internalLockLeaseTime, getLockName(threadId)
- ARGV[1]:30000毫秒;
- ARGV[2]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RedissonWriteLock extends RedissonLock implements RLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//获取锁的读写模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//如果锁不存在
"if (mode == false) then " +
//设置锁读写模式为写模式
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
//设置锁的field的key为UUID:threadId:write,value为1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//设置锁的超时时间为30000毫秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在且为写模式
"if (mode == 'write') then " +
//判断锁中是否是当前线程持有
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//锁重入,加锁次数+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//获取当前锁的剩余时间
"local currentExpire = redis.call('pttl', KEYS[1]); " +
//重新设置锁的超时时间为30000毫秒+剩余时间
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
}
解锁
KEYS = Arrays.
- KEYS[1]:锁名称;
- KEYS[2]:sub和pub的channel名称;
ARGV = LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
- ARGV[1]:消息,1L;
- ARGV[2]:30000毫秒;
- ARGV[3]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//获取锁的读写模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
//锁不存在
"if (mode == false) then " +
//发送消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果锁模式为写模式
"if (mode == 'write') then " +
//判断是否是当前客户端+线程持有写锁
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
//不是当前客户端+线程
"if (lockExists == 0) then " +
"return nil;" +
"else " +
//是当前客户端+线程,加锁次数-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果-1后仍然大于0,重新设置锁的超时时间为30000毫秒
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//加锁次数-1后为0,删除锁的field
"redis.call('hdel', KEYS[1], ARGV[3]); " +
//如果锁的长度为1
"if (redis.call('hlen', KEYS[1]) == 1) then " +
//删除锁
"redis.call('del', KEYS[1]); " +
//发送消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
//锁的长度不为1,则当前锁降级为读模式
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
不同线程间读写模式
读读共享
读读共享模式下Redis里的锁数据结构:
1
2
3
4
5
6
7
8
myLock: {
“mode”: “read”,
“UUID_01:threadId_01”: 1,
“UUID_02:threadId_02”: 1
}
{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
{anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
读写互斥
先读后写
已加读锁,写请求失败。
先写后读
一个线程持有写锁,其它线程读请求阻塞。
写写互斥
一个线程持有写锁,其它线程写请求阻塞。
同一线程读写模式
读锁+读锁
1
2
3
4
5
6
7
8
myLock: {
“mode”: “read”,
“UUID_01:threadId_01”: 2
}
{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
{anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1
读锁+写锁
同一个客户端同一个线程,先读锁再写锁,是互斥的,会导致加锁失败。
写锁+读锁
在同一个线程写锁的期间,可以多次加读锁。
1
2
3
4
5
6
7
myLock: {
“mode”: “write”,
“UUID_01:threadId_01:write”: 1,
“UUID_01:threadId_01”: 1
}
{anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
写锁+写锁
同一个客户端同一个线程,多次加写锁,是可以重入加锁的。
1
2
3
4
myLock: {
“mode”: “write”,
“UUID_01:threadId_01:write”: 2
}