Redisson-05丨Redisson读写锁分析

Posted by jiefang on February 11, 2021

Redisson读写锁分析

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

用法

1
2
3
4
5
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

读锁

RedissonLock#lock()->RedissonLock#lock(long leaseTime, TimeUnit unit, boolean interruptibly)->RedissonLock#tryAcquire(long leaseTime, TimeUnit unit, long threadId)->RedissonLock#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)->RedissonReadLock#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)

加锁

KEYS = Arrays.asList(getName(), getReadWriteTimeoutNamePrefix(threadId))

  • KEYS[1]:锁名称,myLock;
  • KEYS[2]:{myLock}:UUID:threadId:rwlock_timeout;

ARGV = internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)

  • ARGV[1]:30000;
  • ARGV[2]:UUID:threadId;
  • ARGV[3]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class RedissonReadLock extends RedissonLock implements RLock {
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                //获取当前读写锁的模式
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                //如果读写模式为空
                                "if (mode == false) then " +
                                  //设置读写锁读写模式为read
                                  "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                                  //设置hash结构,锁myLock和当前客户端+线程标识
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  //设置string结构,key:{myLock}:UUID:threadId:rwlock_timeout:1,value:1
                                  "redis.call('set', KEYS[2] .. ':1', 1); " +
                                  //设置string超时时间为30000毫秒
                                  "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                                  //设置hash锁的超时时间为30000毫秒            
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                                "end; " +
                                //如果当前锁模式为读模式或为写模式且锁中线程为当前线程              
                                "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                                  //设置锁中UUID:treadId的value+1
                                  "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                  //{myLock}:UUID:threadId:rwlock_timeout:拼接+1后的value            
                                  "local key = KEYS[2] .. ':' .. ind;" +
                                  //插入string,{myLock}:UUID:threadId:rwlock_timeout:ind 1       
                                  "redis.call('set', key, 1); " +
                                  //设置刚插入的string超时时间为30000毫秒            
                                  "redis.call('pexpire', key, ARGV[1]); " +
                                  //获取锁的剩余时间        
                                  "local remainTime = redis.call('pttl', KEYS[1]); " +
                                  //重新设置锁超时时间为30000毫秒和剩余时间中的大者            
                                  "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                                  "return nil; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                        internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
    }
}

解锁

KEYS = Arrays.asList(getName(), getChannelName(), timeoutPrefix, keyPrefix)

  • KEYS[1]:锁名称,myLock;
  • KEYS[2]:pub和sub的channel名称;
  • KEYS[3]:{myLock}:UUID:threadId:rwlock_timeout;
  • KEYS[4]:{myLock};

**ARGV **= LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)

  • ARGV[1]:pub和sub发送的消息:0L;
  • ARGV[2]:UUID:threadId;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //获取锁的读写模式                              
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            //模式没获取到                              
            "if (mode == false) then " +
                //发送消息                          
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            //锁中是否存在当前客户端+线程的key                              
            "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
            //不存在返回                              
            "if (lockExists == 0) then " +
                "return nil;" +
            "end; " +
            //锁中客户端+线程的value-1    
            "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
            //value-1后等于则删除锁中这个线程的key                              
            "if (counter == 0) then " +
                "redis.call('hdel', KEYS[1], ARGV[2]); " + 
            "end;" +
            //删除string类型的线程                              
            "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
            //如果hash的长度大于1
            "if (redis.call('hlen', KEYS[1]) > 1) then " +
                //
                "local maxRemainTime = -3; " + 
                //取出hash中所有的key                          
                "local keys = redis.call('hkeys', KEYS[1]); " + 
                "for n, key in ipairs(keys) do " + 
                    //循环遍历hash中所有value为数字类型的filed                      
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    "if type(counter) == 'number' then " + 
                        "for i=counter, 1, -1 do " + 
                            //遍历这些key设置的超时时间              
                            "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                            //获取最大的超时时间              
                            "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                        "end; " + 
                    "end; " + 
                "end; " +
                //如果最大超时时间大于0,则设置锁的超时时间为最大超时时间        
                "if maxRemainTime > 0 then " +
                    "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                    "return 0; " +
                "end;" + 
                //如果当前锁模式是写模式,直接返回0    
                "if mode == 'write' then " + 
                    "return 0;" + 
                "end; " +
            "end; " +
            //删除锁    
            "redis.call('del', KEYS[1]); " +
            //发送消息                              
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; ",
            Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
            LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}

写锁

加锁

KEYS = Arrays.asList(getName())

  • KEYS[1]:锁名称;

ARGV = internalLockLeaseTime, getLockName(threadId)

  • ARGV[1]:30000毫秒;
  • ARGV[2]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RedissonWriteLock extends RedissonLock implements RLock {
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            //获取锁的读写模式                  
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            //如果锁不存在                  
                            "if (mode == false) then " +
                                  //设置锁读写模式为写模式            
                                  "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                                  //设置锁的field的key为UUID:threadId:write,value为1            
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  //设置锁的超时时间为30000毫秒            
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                              //如果锁已存在且为写模式                
                              "if (mode == 'write') then " +
                                  //判断锁中是否是当前线程持有            
                                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                      //锁重入,加锁次数+1        
                                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                                      //获取当前锁的剩余时间        
                                      "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                      //重新设置锁的超时时间为30000毫秒+剩余时间        
                                      "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName()), 
                        internalLockLeaseTime, getLockName(threadId));
    }
}

解锁

KEYS = Arrays.asList(getName(), getChannelName())

  • KEYS[1]:锁名称;
  • KEYS[2]:sub和pub的channel名称;

ARGV = LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

  • ARGV[1]:消息,1L;
  • ARGV[2]:30000毫秒;
  • ARGV[3]:UUID:threadId:write;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //获取锁的读写模式                              
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            //锁不存在                              
            "if (mode == false) then " +
                //发送消息                          
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //如果锁模式为写模式                              
            "if (mode == 'write') then " +
                //判断是否是当前客户端+线程持有写锁                          
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                //不是当前客户端+线程                          
                "if (lockExists == 0) then " +
                    "return nil;" +
                "else " +
                    //是当前客户端+线程,加锁次数-1                      
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    //如果-1后仍然大于0,重新设置锁的超时时间为30000毫秒                      
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        //加锁次数-1后为0,删除锁的field                  
                        "redis.call('hdel', KEYS[1], ARGV[3]); " +
                        //如果锁的长度为1                  
                        "if (redis.call('hlen', KEYS[1]) == 1) then " +
                            //删除锁              
                            "redis.call('del', KEYS[1]); " +
                            //发送消息              
                            "redis.call('publish', KEYS[2], ARGV[1]); " + 
                        "else " +
                            // has unlocked read-locks
                            //锁的长度不为1,则当前锁降级为读模式              
                            "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                        "end; " +
                        "return 1; "+
                    "end; " +
                "end; " +
            "end; "
            + "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), 
    LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

不同线程间读写模式

读读共享

读读共享模式下Redis里的锁数据结构:

1
2
3
4
5
6
7
8
myLock: {
 “mode”: “read”,
 “UUID_01:threadId_01”: 1,
 “UUID_02:threadId_02”: 1
}
{anyLock}:UUID_01:threadId_01:rwlock_timeout:1		1

{anyLock}:UUID_02:threadId_02:rwlock_timeout:1		1

读写互斥

先读后写

已加读锁,写请求失败。

先写后读

一个线程持有写锁,其它线程读请求阻塞。

写写互斥

一个线程持有写锁,其它线程写请求阻塞。

同一线程读写模式

读锁+读锁

1
2
3
4
5
6
7
8
myLock: {
 “mode”: “read”,
 “UUID_01:threadId_01”: 2
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1	1

{anyLock}:UUID_01:threadId_01:rwlock_timeout:2	1

读锁+写锁

同一个客户端同一个线程,先读锁再写锁,是互斥的,会导致加锁失败。

写锁+读锁

在同一个线程写锁的期间,可以多次加读锁。

1
2
3
4
5
6
7
myLock: {
 “mode”: “write”,
 “UUID_01:threadId_01:write”: 1,
 “UUID_01:threadId_01”: 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1		1

写锁+写锁

同一个客户端同一个线程,多次加写锁,是可以重入加锁的。

1
2
3
4
myLock: {
  “mode”: “write”,
  “UUID_01:threadId_01:write”: 2
}