Redisson-02丨Redisson可重入锁源码分析

Posted by jiefang on February 8, 2021

Redisson可重入锁源码分析

1
2
3
4
5
6
@Override
public RLock lock(String lockKey) {
    RLock lock = redissonClient.getLock(lockKey);
    lock.lock();
    return lock;
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Redisson implements RedissonClient {
    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }
}
public class RedissonLock extends RedissonExpirable implements RLock {
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        //获取锁的默认锁定时间30 * 1000
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
}

加锁

1
2
3
4
5
6
7
8
9
10
public class RedissonLock extends RedissonExpirable implements RLock {
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
}

lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //获取线程ID
    long threadId = Thread.currentThread().getId();
    //尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    //获取锁成功返回
    if (ttl == null) {
        return;
    }

    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        //循环获取锁直到获取到锁
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            //其它线程持有锁,当前线程没有获取锁成功
            if (ttl >= 0) {
                try {
                    //阻塞等待ttl的时间再次获取锁
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    getEntry(threadId).getLatch().acquire();
                } else {
                    getEntry(threadId).getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

tryAcquireAsync()

RedissonLock#tryAcquire->RedissonLock#tryAcquireAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    //设定了锁定时间
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //没有设定就取默认设置30*1000
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
        // lock acquired
        //返回为null说明锁定成功,就添加看门狗监控锁
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
	//如果锁不存在,就设置锁,对应一个hash结构,key为Redis客户端和线程组成的唯一id,value为1,表示加锁次数,然后设置锁定时间为30000毫秒
    //{
  	//	“8743c9c0-0795-4907-87fd-6c719a6b4586:1”: 1
	//}
    //如果锁存在且key相等,即是同一个线程再次加锁,则hash的value加1,加锁次数加1,设置锁定时间
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              //KEYS[1]:锁的名称
              //ARGV[2]:8743c9c0-0795-4907-87fd-6c719a6b4586:1一个Redis客户端加线程组成的唯一标识
              //ARGV[1]:30000,就是锁定时间
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

evalWriteAsync()

1
2
3
4
5
6
7
8
9
public class CommandAsyncService implements CommandAsyncExecutor {
    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        //使用crc16算法对16384取模,获取锁对应的hashslot,根据hashslot获取对应的Redis Master节点
        NodeSource source = getNodeSource(key);
        //执行lua脚本
        return evalAsync(source, false, codec, evalCommandType, script, keys, params);
    }
}

getNodeSource()

1
2
3
4
5
6
7
private NodeSource getNodeSource(String key) {
    //使用crc16算法对16384取模,获取锁对应的hashslot
    int slot = connectionManager.calcSlot(key);
    //根据hashslot获取对应的Redis Master节点
    MasterSlaveEntry entry = connectionManager.getEntry(slot);
    return new NodeSource(entry, slot);
}

calcSlot()

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }
    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        key = key.substring(start+1, end);
    }
    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
}

锁监控

scheduleExpirationRenewal()

1
2
3
4
5
6
7
8
9
10
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

renewExpiration()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //每隔锁时间/3的时间调用看门狗为锁延长时间,默认是10000毫秒
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //如果延长锁时间成功,则递归调用自身方法,再次延长
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                //延长锁时间成功
                if (res) {
                    // reschedule itself
                    //再次调用自身方法
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

renewExpirationAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //KEYS[1]:锁名称
            //ARGV[2]:8e6b27a7-5346-483a-b9b5-0957c690c27f:1 判断这个锁hash中是否存在这个key
            //如果存在,说明锁还是这个线程持有,重新设置锁定时间30000毫秒
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

锁释放

unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void unlock() {
    try {
        //get包裹异步方法就会同步等待异步执行结果
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

unlockAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);
	//解锁方法执行完成
    future.onComplete((opStatus, e) -> {
        //执行异常
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        //返回null,当前线程不是加锁线程
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        //设置当前线程的任务为取消
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });

    return result;
}

unlockInnerAsync()

  • KEYS[1]:锁名称;
  • KEYS[2]:pub和sub的channel名称,redisson_lock__channel:{锁名称};
  • ARGV[1]:pub和sub的消息,UNLOCK_MESSAGE = 0L;
  • ARGV[2]:30000毫秒;
  • ARGV[3]:26cebeaa-e3b0-4097-8192-d62d0d0214b8:1,当前客户端和当前线程组成的唯一标识;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //判断锁是否是当前线程持有,不是返回null                          
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //加锁次数-1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            //-1后加锁次数任然大于0,锁续期30000毫秒,返回0                            
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //加锁次数等于0,说明已解锁,删除锁,发布消息,返回1
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

超时锁

1
2
3
4
5
RLock lock = redissonClient.getLock(lockKey);
//waitTime:获取锁的最长等待时间
//leaseTime:锁定时间
//unit:时间单位
lock.tryLock(waitTime, leaseTime, unit);

tryLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class RedissonLock extends RedissonExpirable implements RLock {
	@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        //当前锁返回的锁剩余时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        //如果ttl为空,则加锁成功
        if (ttl == null) {
            return true;
        }
        //剩余等待时间=最大等待时间-第一次获取锁耗费时间,如果剩余<=0,则获取锁失败
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅当前线程消息
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
		
        try {
            time -= System.currentTimeMillis() - current;
            //剩余等待时间=最大等待时间-第一次获取锁耗费时间,如果剩余<=0,则获取锁失败
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        	//死循环
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired,获取成功
                if (ttl == null) {
                    return true;
                }
			
                time -= System.currentTimeMillis() - currentTime;
                //时间耗尽获取失败
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                //ttl时间后尝试获取锁
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            //取消订阅消息
            unsubscribe(subscribeFuture, threadId);
        }
    }
}

tryAcquireOnceAsync()

RedissonLock#tryAcquire()->RedissonLock#tryAcquireOnceAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
    //此方法会进入此if分支,获取锁设定锁定时间,不会为这个锁增加看门狗监听器,到期会被删除
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

总结

Redisson基于Redis实现的分布式锁的机制:

  1. 加锁:在redis里设置hash数据结构,生存周期是30000毫秒;
  2. 维持加锁:代码里一直加锁,redis里的key会一直保持存活,后台每隔10秒的定时任务(watchdog)不断的检查,只要客户端还在加锁,就刷新key的生存周期为30000毫秒;
  3. 可重入锁:同一个线程可以多次加锁,就是在hash数据结构中将加锁次数累加1;
  4. 锁互斥:不同客户端,或者不同线程,尝试加锁陷入死循环等待;
  5. 手动释放锁:可重入锁自动递减加锁次数,全部释放锁之后删除锁key;
  6. 宕机自动释放锁:如果持有锁的客户端宕机了,那么此时后台的watchdog定时调度任务也没了,不会刷新锁key的生存周期,此时redis里的锁key会自动释放;
  7. 尝试加锁超时:在指定时间内没有成功加锁就自动退出死循环,标识本次尝试加锁失败;
  8. 超时锁自动释放:获取锁之后,在一定时间内没有手动释放锁,则redis里的key自动过期,自动释放锁;