JUC-ReentrantReadWriteLock(读写锁)
简介
ReentrantReadWriteLock维护了一对相关的锁:共享锁readLock和独占锁writeLock。共享锁readLock用于读操作,能同时被多个线程获取;独占锁writeLock用于写入操作,只能被一个线程持有。 读写锁的主要特性:
- 公平性:
- 非公平锁:默认模式
- 公平锁:
- 可重入性:
ReentrantReadWriteLock允许读线程和写线程重复获取读锁或写锁。当所有写锁都被释放,不可重入读线程才允许获取锁。此外,一个写入线程可以获取读锁,但是一个读线程不能获取写锁。- 锁降级:重入性允许从写锁降级到读锁:首先获取写锁,然后获取读锁,然后释放写锁。不过,从一个读锁升级到写锁是不允许的;
- Condition支持:Condition只有在写锁中用到,读锁是不支持Condition的。
原理

- ReentrantReadWriteLock实现了- ReadWriteLock接口。- ReadWriteLock是一个读写锁的接口,提供了”获取读锁的- readLock()函数” 和 “获取写锁的- writeLock()函数”。
- ReentrantReadWriteLock有三个内部类:自定义同步器- Sync,读锁- ReadLock和写锁- WriteLock。和- ReentrantLock一样,- Sync继承自AQS,也包括了两个内部实现公平锁- FairSync和非公平锁- NonfairSync。- ReadLock和- WriteLock都实现了- Lock接口,内部也都分别持有- Sync对象。所有的同步功能都是通过Sync来实现的。
源码
Sync
ReentrantReadWriteLock.Sync源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 最多支持65535(1<<16 -1)个写锁(一个独占线程的重入次数)和65535个读锁;低16位表示写锁计数,高16位表示持有读锁的线程数
    static final int SHARED_SHIFT   = 16;
    // 读锁高16位,读锁个数加1,其实是状态值加 2^16
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 锁最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 写锁掩码,用于标记低16位
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    //持有读锁线程数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //写锁,重入次数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }    
    //当前线程持有的读锁重入次数
    private transient ThreadLocalHoldCounter readHolds;
    //最近一个获取读锁成功的线程计数器
    private transient HoldCounter cachedHoldCounter;    
    //第一个获取读锁的线程
    private transient Thread firstReader = null;
    //firstReader的持有数
    private transient int firstReaderHoldCount;  
    //构造函数
    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
    // 持有读锁的线程计数器
    static final class HoldCounter {
        int count = 0;
        final long tid = getThreadId(Thread.currentThread());
    }
    // 本地线程计数器
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    //尝试获取读锁
    final boolean tryReadLock() {
        Thread current = Thread.currentThread();
        for (;;) {
            //判断是被独占锁获取
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return false;
            //已获取读锁的数量
            int r = sharedCount(c);
            if (r == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //读锁
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return true;
            }
        }
    }    
}
ReadLock
读锁为一个可重入的共享锁,它能够被多个线程同时持有,在没有其他写线程访问时,读锁总是或获取成功。 读锁源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    //获取读锁
    public void lock() {
        sync.acquireShared(1);
    }
    //获取读锁响应中断
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    //尝试获取读锁
    public boolean tryLock() {
        return sync.tryReadLock();
    }
    //尝试获取读锁带超时
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    //释放读锁
    public void unlock() {
        sync.releaseShared(1);
    }
    //读锁是不支持Condition的
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
    public String toString() {
        int r = sync.getReadLockCount();
        return super.toString() +
            "[Read locks = " + r + "]";
    }
}
lock()
ReentrantReadWriteLock.ReadLock.lock()源码:
1
2
3
4
    //获取读锁
    public void lock() {
        sync.acquireShared(1);
    }
lock方法调用了同步器sync的acquireShared,实际调用的是AQS的acquireShared,tryAcquireShared的子类实现源码是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        //持有写锁的线程可以获取读锁,如果获取锁的线程不是current线程;则返回-1
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        //高16位的值,当前持有读锁的线程数量
        int r = sharedCount(c);
        //公平锁和非公平锁判断是否需要阻塞等待
        //判断获取读锁线程是否达到最大值
        //CAS+1成功
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            //第一次获取读锁
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //重入次数
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                //HoldCounter不存在就返回一个新的默认的
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                //如果rh.count == 0,则把这个rh设置到这个线程的ThreadLocal
                else if (rh.count == 0)
                    readHolds.set(rh);
                //计数加1
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
tryAcquireShared()的作用是尝试获取“读锁/共享锁”。函数流程如下:
- 如果“写锁”已经被持有,这时候可以继续获取读锁,但如果持有写锁的线程不是当前线程,直接返回-1(表示获取失败);
- 如果在尝试获取锁时不需要阻塞等待(由公平性决定),并且读锁的共享计数小于最大数量MAX_COUNT,则直接通过CAS函数更新读取锁的共享计数,最后将当前线程获取读锁的次数+1。
- 如果第二步执行失败,则调用fullTryAcquireShared尝试获取读锁。fullTryAcquireShared源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } fullTryAcquireShared是获取读锁的完整版本,用于处理CAS失败、阻塞等待和重入读问题。相对于tryAcquireShared来说,执行流程上都差不多,不同的是,它增加了重试机制和对“持有读锁数的延迟读取”的处理。unlock()
ReentrantReadWriteLock.ReadLock.unlock()源码:
1
2
3
    public void unlock() {
        sync.releaseShared(1);
    }
AQS中的releaseShared()源码:
1
2
3
4
5
6
7
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
AQS中tryReleaseShared在子类ReentrantReadWriteLock.Sync的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //是否是第一个获取读锁的线程
        if (firstReader == current) {
            //更新重入次数
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                //线程本地变量中的重入次数
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            //剩余资源
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
WriteLock
写锁就是一个支持可重入的排他锁。
lock()
ReentrantReadWriteLock.WriteLock.lock()源码:
1
2
3
    public void lock() {
        sync.acquire(1);
    }
调用AQS中的acquire(),tryAcquire()由ReentrantReadWriteLock.Sync实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //当前锁个数
        int c = getState();
        //获取低16位,即写锁的次数
        int w = exclusiveCount(c);
        //处理可重入
        if (c != 0) {
            //if c != 0 and w == 0说明读锁不为0
            //当前线程不是已获取写锁的线程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //if c != 0 and w != 0且当前线程是已获取写锁的线程
            //最大重入次数
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //重入次数加1
            setState(c + acquires);
            return true;
        }
        //是否需要阻塞,非公平锁的写锁一直是false
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            //如果CAS失败返回false
            return false;
        //设置当前线程独占
        setExclusiveOwnerThread(current);
        return true;
    }
unlock()
ReentrantReadWriteLock.WriteLock.unlock()源码:
1
2
3
    public void unlock() {
        sync.release(1);
    }
调用AQS中的release(),由子类ReentrantReadWriteLock.Sync实现:
1
2
3
4
5
6
7
8
9
10
11
12
    protected final boolean tryRelease(int releases) {
        //判断当前线程是否是独占锁线程
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //判断重入次数
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
