Java多线程-18丨JUC-AQS(同步器)源码分析-共享锁的获取和释放

Posted by jiefang on November 7, 2019

JUC-AQS(同步器)源码分析-共享锁的获取和释放

共享锁与独占锁的区别

共享锁与独占锁最大的区别在于,独占锁是独占的排他的,因此在独占锁中有一个exclusiveOwnerThread属性,用来记录当前持有锁的线程。当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功

对于共享锁而言,由于锁是可以被共享的,它可以被多个线程同时持有。换句话说,如果一个线程成功获取了共享锁,那么其他等待在这个共享锁上的线程就也可以尝试去获取锁,并且极有可能获取成功。

共享锁的实现和独占锁基本上是对应的:

独占锁 共享锁
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()

共享锁的获取

相对于独占的锁的tryAcquire(int arg)返回boolean类型的值,共享锁的tryAcquireShared(int acquires)返回的是一个整型值:

  • 如果该值小于0,则代表当前线程获取共享锁失败
  • 如果该值大于0,则代表当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
  • 如果该值等于0,则代表当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败

因此,只要该返回值大于等于0,就表示获取共享锁成功。

1
2
3
4
5
public final void acquireShared(int arg) {
    //由子类实现,小于0说明获取共享锁失败,需要进入队列
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared有哪些子类实现:

  • Semaphore.NonfairSync.tryAcquireShared
  • Semaphore.FairSync.tryAcquireShared
  • CountDownLatch.Sync.tryAcquireShared
  • ReentrantReadWriteLock.Sync.tryAcquireShared

doAcquireShared()方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    //将线程包装成共享模式的Node加入队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //当前节点的前置节点是head节点
            final Node p = node.predecessor();
            if (p == head) {
                //成功取得共享锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //设置head节点并向后传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //获取锁失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/** 仅用于标记节点处于共享模式下 */
static final Node SHARED = new Node();
//判断是否是共享模式
final boolean isShared() {
    return nextWaiter == SHARED;
}
//在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就可以直接通知后继节点来拿锁,而不必等待锁被释放的时候再通知。
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

共享锁的释放

AQS.releaseShared()方法源码:

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
    //尝试释放共享锁,由子类实现。
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//共享模式唤醒
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}