Redisson公平锁源码分析
获取公平锁
1
2
3
4
5
RLock RedissonClient#getFairLock(String name)
@Override
public RLock getFairLock(String name) {
return new RedissonFairLock(connectionManager.getCommandExecutor(), name);
}
构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//公平锁继承自可重入锁
public class RedissonFairLock extends RedissonLock implements RLock {
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
//线程默认等待时间
this(commandExecutor, name, 5000);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
//公平锁内部持有的queue
threadsQueueName = prefixName("redisson_lock_queue", name);
//公平锁内部持有的zset
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
}
加锁
RedissonLock#lock()->RedissonLock#lock()->RedissonLock#tryAcquire()->RedissonLock#tryAcquireAsync()->RedissonFairLock#tryLockInnerAsync()
RedissonFairLock#tryLockInnerAsync()
KEYS=Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
- KEYS[1]:锁的名称,“myLock”;
- KEYS[2]:threadsQueueName=redisson_lock_queue:{anyLock},Redis里的队列;
- KEYS[3]:timeoutSetName=redisson_lock_timeout:{anyLock},Redis里的有序集合ZSet;
ARGV = internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime
- ARGV[1]:默认锁定时间,30000毫秒;
- ARGV[2]:UUID:threadId;
- ARGV[3]:线程等待时间,5000毫秒;
- ARGV[4]:当前时间戳;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public class RedissonFairLock extends RedissonLock implements RLock {
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads删除陈旧的线程
"while true do " +
//lindex redisson_lock_queue:{anyLock} 0 从队列中取出第一个元素,如果第一次加锁,
//队列为空,什么都获取不到,直接跳出循环
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//如果能获取到元素,在ZSet中取出时间
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[3]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[4]), keys[i]);" +
"end;" +
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
"return 1;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime, threadWaitTime);
}
//公平锁使用的这部分lua脚本
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads 删除陈旧的线程
"while true do " +
//从queue中取第一个元素
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
//如果元素为空,跳出
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//如果这个元素不为空,在ZSet中取出这个元素的score分数
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果时间小于当前时间戳,说明元素已过期,从queue和zset中删除
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
//当前不存在锁且(queue不存在或queue中第一个元素是当前客户端+线程)
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
//从queue弹出和zset中删除元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
//取出zset中所有元素
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
//循环zset中的元素,每个元素时间戳-5000毫秒
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
//当前线程加锁,并设置加锁次数为1,设置锁超时时间
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
//锁存在且key是当前客户端+当前线程
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
//value是加锁次数+1
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
//重新设置锁超时时间
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired 获取锁失败,判断当前线程是否已经在queue中存在
// check if the thread is already in the queue
//从zset中取出当前线程的元素的score
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
//超时时间不为空,则时间-5000毫秒-当前时间戳
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
//队列中取出最后一个元素
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果队列中最后元素不为空且这个元素不是当前客户端+线程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//ttl=队列中最后一个元素的时间戳-当前时间戳
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
//否则设置ttl为锁的超时时间
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//设置timeout = ttl + 5000毫秒 + 当前时间戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//将当前客户端+线程的元素,timeout为score放入zset中,成功以后
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
//将当前客户端+线程的元素放入queue中
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
}
}
原理图
解锁
RedissonLock#unlock()->RedissonLock#unlockAsync()->RedissonFairLock#unlockInnerAsync()
unlockInnerAsync()
KEYS=Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName())
- KEYS[1]:锁的名称,“myLock”;
- KEYS[2]:threadsQueueName=redisson_lock_queue:{anyLock},Redis里的队列;
- KEYS[3]:timeoutSetName=redisson_lock_timeout:{anyLock},Redis里的有序集合ZSet;
- KEYS[4]:pub和sub的channel名称;
ARGV = LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()
- ARGV[1]:解锁发送的消息;
- ARGV[2]:30000毫秒;
- ARGV[3]:UUID:threadId;
- ARGV[4]:当前时间戳;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class RedissonFairLock extends RedissonLock implements RLock {
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads 删除陈旧的线程
//死循环
"while true do "
//从queue中获取第一个元素,如果为null,跳出
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
//queue中第一个元素,在zset获取这个元素的score,score即时间戳
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
//如果时间戳小于当前时间,说明已过时,从queue和zset中删除
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
//如果锁不存在,取queue中第一个元素,不为空则发送消息
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//如果锁中当前客户端+线程的key不存在,返回nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//锁中当前客户端+线程的key存在,value值-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果-1后加锁次数大于0,重新设置锁时间30000毫秒
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
//删除锁
"redis.call('del', KEYS[1]); " +
//获取queue中第一个元素,不为null,就发送消息
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
}