Redisson-04丨Redisson联锁与红锁源码分析

Posted by jiefang on February 10, 2021

Redisson联锁与红锁源码分析

联锁

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

用法

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

构造

1
2
3
4
5
6
7
8
9
10
public class RedissonMultiLock implements RLock {
    final List<RLock> locks = new ArrayList<>();
    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        //多个RLock放到list里
        this.locks.addAll(Arrays.asList(locks));
    }
}

加锁

RedissonMultiLock#lock()->RedissonMultiLock#lockInterruptibly()->RedissonMultiLock#lockInterruptibly(long leaseTime, TimeUnit unit)->tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)

lockInterruptibly(long leaseTime, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RedissonMultiLock implements RLock {
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        //最大等待时间为锁数量*1500毫秒
        long baseWaitTime = locks.size() * 1500;
        long waitTime = -1;
        if (leaseTime == -1) {
            waitTime = baseWaitTime;
        } else {
            leaseTime = unit.toMillis(leaseTime);
            waitTime = leaseTime;
            if (waitTime <= 2000) {
                waitTime = 2000;
            } else if (waitTime <= baseWaitTime) {
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
        }
        
        while (true) {
            if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
                return;
            }
        }
    }
}

tryLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class RedissonMultiLock implements RLock {
	@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//        try {
//            return tryLockAsync(waitTime, leaseTime, unit).get();
//        } catch (ExecutionException e) {
//            throw new IllegalStateException(e);
//        }
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            if (waitTime == -1) {
                newLeaseTime = unit.toMillis(leaseTime);
            } else {
                newLeaseTime = unit.toMillis(waitTime)*2;
            }
        }
        
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        long lockWaitTime = calcLockWaitTime(remainTime);
        //联锁等于0,联锁有一个锁失败就释放其它的锁,如果红锁需要锁定3个master实例,则可以失败次数为1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<>(locks.size());
        //遍历所有的小锁,执行小锁的加锁操作
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (RedisResponseTimeoutException e) {
                unlockInner(Arrays.asList(lock));
                lockAcquired = false;
            } catch (Exception e) {
                lockAcquired = false;
            }
            //加锁成功放入list里
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {//加锁失败
                //判断是否要跳出循环,联锁不跳出,红锁会跳出
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }
				//联锁情况下,有一个加锁失败即释放其它成功加锁的锁
                if (failedLocksLimit == 0) {
                    unlockInner(acquiredLocks);
                    if (waitTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // reset iterator
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    failedLocksLimit--;
                }
            }
            //剩余时间=剩余时间-加锁用时
            if (remainTime != -1) {
                remainTime -= System.currentTimeMillis() - time;
                time = System.currentTimeMillis();
                //剩余时间小于等于0,释放已经成功加的锁
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        return true;
    }
}

unlockInner(Collection locks)

1
2
3
4
5
6
7
8
9
10
protected void unlockInner(Collection<RLock> locks) {
    List<RFuture<Void>> futures = new ArrayList<>(locks.size());
    for (RLock lock : locks) {
        futures.add(lock.unlockAsync());
    }

    for (RFuture<Void> unlockFuture : futures) {
        unlockFuture.awaitUninterruptibly();
    }
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void unlock() {
    List<RFuture<Void>> futures = new ArrayList<>(locks.size());

    for (RLock lock : locks) {
        futures.add(lock.unlockAsync());
    }

    for (RFuture<Void> future : futures) {
        future.syncUninterruptibly();
    }
}

红锁

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

算法

假设有一个redis cluster,有3个redis master实例,然后执行如下步骤获取一把分布式锁:

  1. 获取当前时间戳,单位是毫秒;
  2. 轮流尝试在每个master节点上创建锁,过期时间较短,一般就几十毫秒,在每个节点上创建锁的过程中,需要加一个超时时间,一般来说比如几十毫秒如果没有获取到锁就超时了,标识为获取锁失败;
  3. 尝试在大多数节点上建立一个锁,比如3个节点就要求是2个节点(n / 2 +1);
  4. 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了;
  5. 要是锁建立失败了,那么就依次删除已经创建的锁;
  6. 只要别人创建了一把分布式锁,你就得不断轮询去尝试获取锁;

用法

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。3/2+1=2,2个节点上加锁成功就认为红锁加锁成功
lock.lock();
...
lock.unlock();

构造

红锁是基于联锁实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RedissonRedLock extends RedissonMultiLock {

    public RedissonRedLock(RLock... locks) {
        super(locks);
    }
	//允许失败加锁数量
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    //最少成功加锁数量,如果是3个小锁就等于3/2+1=2
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }

    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    }
    
    @Override
    public void unlock() {
        unlockInner(locks);
    }
}