Java多线程-29丨JUC-Phaser

Posted by jiefang on December 26, 2019

Phaser

简介

Java注释

A reusable synchronization barrier, similar in functionality to {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and {@link java.util.concurrent.CountDownLatch CountDownLatch} but supporting more flexible usage.

翻译

可重用的同步屏障,其功能类似于CyclicBarrierCountDownLatch,但支持更灵活的用法。

Phaser运行机制

同步器 作用
CountDownLatch 倒数计数器,初始时设定计数器值,线程可以在计数器上等待,当计数器值归0后,所有等待的线程继续执行
CyclicBarrier 循环栅栏,初始时设定参与线程数,当线程到达栅栏后,会等待其它线程的到达,当到达栅栏的总数满足指定数后,所有等待的线程继续执行
Phaser 多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class PhaserTest {
    public static final int PARTIES = 3;
    public static final int PHASES = 4;

    public static void main(String[] args) {
        Phaser phaser = new Phaser(PARTIES){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("--------------阶段"+phase+"完成-------");
                return super.onAdvance(phase, registeredParties);
            }
        };
        for(int i=0;i<PARTIES;i++){
            new Thread(()->{
                for(int j=0;j<PHASES;j++){
                    System.out.println("--------------"+Thread.currentThread().getName()+","+j);
                    phaser.arriveAndAwaitAdvance();
                }
            }).start();
        }
    }
}

运行结果

概念

phase(阶段)

Phaser也有栅栏,在Phaser中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser只处于某一个phase(阶段),初始阶段为0,最大达到Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增。Phaser中的phase(阶段)这个概念其实和CyclicBarrier中的Generation很相似,只不过Generation没有计数。

parties(参与者)

parties(参与者)其实就是CyclicBarrier中的参与线程的概念。

CyclicBarrier中的参与者在初始构造指定后就不能变更,而Phaser既可以在初始构造时指定参与者的数量,也可以中途通过registerbulkRegisterarriveAndDeregister等方法注册/注销参与者。

arrive(到达) / advance(进阶)

Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance),也就是phase+1。 image

Termination(终止)

代表当前Phaser对象达到终止状态,有点类似于CyclicBarrier中的栅栏被破坏的概念。

Tiering(分层)

Phaser支持分层(Tiering),一种树形结构,通过构造函数可以指定当前待构造的Phaser对象的父结点。之所以引入Tiering,是因为当一个Phaser有大量参与者(parties)的时候,内部的同步操作会使性能急剧下降,而分层可以降低竞争,从而减小因同步导致的额外开销。

在一个分层Phasers的树结构中,注册和撤销子Phaser或父Phaser是自动被管理的。当一个Phaser的参与者(parties)数量变成0时,如果有该Phaser有父结点,就会将它从父结点中溢移除。

源码

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    //状态变量,用于存储当前阶段phase、参与者数parties、未完成的参与者数unarrived_count
    private volatile long state;
    //最多可以有多少个参与者,即每个阶段最多有多少个任务
    private static final int  MAX_PARTIES     = 0xffff;
    //最多可以有多少阶段
    private static final int  MAX_PHASE       = Integer.MAX_VALUE;
    //参与者数量的偏移量
    private static final int  PARTIES_SHIFT   = 16;
    //当前阶段的偏移量
    private static final int  PHASE_SHIFT     = 32;
    //未完成的参与者数的掩码,低16位
    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
    //参与者数,中间16位
    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
    //counts的掩码,counts等于参与者数和未完成的参与者数的'|'操作
    private static final long COUNTS_MASK     = 0xffffffffL;
    //
    private static final long TERMINATION_BIT = 1L << 63;

    //一次一个参与者完成
    private static final int  ONE_ARRIVAL     = 1;
    //增加减少参与者时使用
    private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
    //减少参与者时使用
    private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
    //没有参与者时使用
    private static final int  EMPTY           = 1;
    //用于求未完成参与者数量
    private static int unarrivedOf(long s) {
        int counts = (int)s;
        return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
    }
    //用于求参与者数量(中间16位)
    private static int partiesOf(long s) {
        return (int)s >>> PARTIES_SHIFT;
    }
    //用于求阶段数(高32位)
    private static int phaseOf(long s) {
        return (int)(s >>> PHASE_SHIFT);
    }
    //已完成参与者的数量
    private static int arrivedOf(long s) {
        int counts = (int)s;
        return (counts == EMPTY) ? 0 :
            (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
    }
    //父级如果没有,则为null
    private final Phaser parent;
    //树根。如果不在树中,则等于此值
    private final Phaser root;
    //用于存储已完成参与者所在的线程,根据当前阶段的奇偶性选择不同的队列
    private final AtomicReference<QNode> evenQ;
    private final AtomicReference<QNode> oddQ;    

state,状态变量,高32位存储当前阶段phase,中间16位存储参与者的数量,低16位存储未完成参与者的数量

state

内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//等待节点
static final class QNode implements ForkJoinPool.ManagedBlocker {
    final Phaser phaser;
    final int phase;
    final boolean interruptible;
    final boolean timed;
    boolean wasInterrupted;
    long nanos;
    final long deadline;
    volatile Thread thread; // nulled to cancel wait
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        thread = Thread.currentThread();
    }
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    public Phaser() {
        this(null, 0);
    }
    public Phaser(int parties) {
        this(null, parties);
    }
    public Phaser(Phaser parent) {
        this(parent, 0);
    }
    public Phaser(Phaser parent, int parties) {
        if (parties >>> PARTIES_SHIFT != 0)
            throw new IllegalArgumentException("Illegal number of parties");
        int phase = 0;
        this.parent = parent;
        //如果父阶段不为空
        if (parent != null) {
            final Phaser root = parent.root;
            this.root = root;
            this.evenQ = root.evenQ;
            this.oddQ = root.oddQ;
            if (parties != 0)
                phase = parent.doRegister(1);
        }
        else {
            this.root = this;
            //新建偶数栈
            this.evenQ = new AtomicReference<QNode>();
            //新建奇数栈
            this.oddQ = new AtomicReference<QNode>();
        }
        //设置state
        this.state = (parties == 0) ? (long)EMPTY :
            //高32位存储当前阶段phase,中间16位存储参与者的数量,低16位存储未完成参与者的数量
            ((long)phase << PHASE_SHIFT) |
            ((long)parties << PARTIES_SHIFT) |
            ((long)parties);
    }    

注册

register()

单个注册参与者

1
2
3
    public int register() {
        return doRegister(1);
    }

bulkRegister(int parties)

批量注册参与者。

1
2
3
4
5
6
7
8
    public int bulkRegister(int parties) {
        if (parties < 0)
            throw new IllegalArgumentException();
        //如果为0,返回当前phase数
        if (parties == 0)
            return getPhase();
        return doRegister(parties);
    }

doRegister(int registrations)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    private int doRegister(int registrations) {
        //首先计算注冊后当前State要调整的值adjust
        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
        final Phaser parent = this.parent;
        int phase;
        for (;;) {
            //state的值
            long s = (parent == null) ? state : reconcileState();
            //state的低32位,也就是parties和unarrived的值
            int counts = (int)s;
            //参与者数目
            int parties = counts >>> PARTIES_SHIFT;
            //未到达的数目  
            int unarrived = counts & UNARRIVED_MASK;
            //检查是否溢出
            if (registrations > MAX_PARTIES - parties)
                throw new IllegalStateException(badRegister(s));
            //当前阶段
            phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                break;
            //当前Phaser已经注册过参与者
            if (counts != EMPTY) {                  // not 1st registration
                if (parent == null || reconcileState() == s) {
                    //未到达数目为0,说明正在执行onAdvance()方法,等待执行完毕
                    if (unarrived == 0)             // wait out advance
                        root.internalAwaitAdvance(phase, null);
                    //否则CAS更新state的值,增加adjust,如果成功就跳出循环
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adjust))
                        break;
                }
            }//当前Phaser未注册过参与者(第一次注册)且没有父结点
            else if (parent == null) {              // 1st root registration
                //计算state
                long next = ((long)phase << PHASE_SHIFT) | adjust;
                //CAS更新state,成功跳出
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    break;
            }//当前Phaser未注册过参与者(第一次注册),且有父结点
            else {
                //第一次子注册
                synchronized (this) {               // 1st sub registration
                    //再次检查状态
                    if (state == s) {               // recheck under lock
                        //向父结点注册一个参与者
                        phase = parent.doRegister(1);
                        if (phase < 0)
                            break;
                        // finish registration whenever parent registration
                        // succeeded, even when racing with termination,
                        // since these are part of the same "transaction".
                        //CAS更新phase值,即使失败也要更新
                        while (!UNSAFE.compareAndSwapLong
                               (this, stateOffset, s,
                                ((long)phase << PHASE_SHIFT) | adjust)) {
                            s = state;
                            phase = (int)(root.state >>> PHASE_SHIFT);
                            // assert (int)s == EMPTY;
                        }
                        break;
                    }
                }
            }
        }
        return phase;
    }
    //阻塞并等待阶段前进
    private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        //唤醒上一代所有等待线程,确保旧队列中没有遗留的等待线程
        releaseWaiters(phase-1);          // ensure old queue clean
        //节点入队后为true
        boolean queued = false;           // true when node is enqueued
        //未到达线程数
        int lastUnarrived = 0;            // to increase spins upon change
        //自旋次数
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        //检查当前阶段是否变化,如果变化了说明进入下一个阶段了,这时候就没有必要自旋了
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            //节点为空
            if (node == null) {           // spinning in noninterruptible mode
                //未完成的参与者数量
                int unarrived = (int)s & UNARRIVED_MASK;
                //unarrived有变化,增加自旋次数
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                    spins += SPINS_PER_ARRIVAL;
                //线程中断状态
                boolean interrupted = Thread.interrupted();
                //如果线程已中断或自旋次数小于0,新建节点
                if (interrupted || --spins < 0) { // need node to record intr
                    node = new QNode(this, phase, false, false, 0L);
                    node.wasInterrupted = interrupted;
                }
            }//完成或放弃
            else if (node.isReleasable()) // done or aborted
                break;
            //节点入队
            else if (!queued) {           // push onto queue
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                QNode q = node.next = head.get();
                //检查状态
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    //更新当前节点为头节点,头结点为后继节点
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    //当前线程进入阻塞状态,跟调用LockSupport.park()一样,等待被唤醒
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    node.wasInterrupted = true;
                }
            }
        }
        //节点已经被唤醒
        if (node != null) {
            //线程不为空,设置为空
            if (node.thread != null)
                node.thread = null;       // avoid need for unpark()
            //处理中断
            if (node.wasInterrupted && !node.interruptible)
                Thread.currentThread().interrupt();
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                return abortWait(phase); // possibly clean up on abort
        }
        //唤醒当前阶段阻塞着的线程
        releaseWaiters(phase);
        return p;
    }

到达

arrive()

使当前线程到达phaser,不等待其他任务到达。返回到达phase数量。

1
2
3
    public int arrive() {
        return doArrive(ONE_ARRIVAL);
    }

arriveAndAwaitAdvance()

使当前线程到达phaser并等待其他任务到达,等价于awaitAdvance(arrive())。如果需要等待中断或超时,可以使用awaitAdvance方法完成一个类似的构造。如果需要在到达后取消注册,可以使用awaitAdvance(arriveAndDeregister())。效果类似于CyclicBarrier.await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    public int arriveAndAwaitAdvance() {
        // Specialization of doArrive+awaitAdvance eliminating some reads/paths
        final Phaser root = this.root;
        for (;;) {
            //state的值
            long s = (root == this) ? state : reconcileState();
            //当前阶段
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            //parties的值
            int counts = (int)s;
            //unarrived的值
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            //未到达线程数目小于等于0,抛出
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //CAS更新state的值-1
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                          s -= ONE_ARRIVAL)) {
                //如果未到达的数大于1,当前不是最后一个到达
                if (unarrived > 1)
                    //阻塞等待其他任务
                    return root.internalAwaitAdvance(phase, null);
                //子Phaser交给父节点处理
                if (root != this)
                    return parent.arriveAndAwaitAdvance();
                //n只保留了state中parties的部分,也就是中16位
                long n = s & PARTIES_MASK;  // base of next state
                //parties的值,即下一次需要到达的参与者数量
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                //执行onAdvance()方法,返回true表示下一阶段参与者数量为0了,也就是结束了
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    //n 加上unarrived的值
                    n |= nextUnarrived;
                //下一个阶段等待当前阶段加1
                int nextPhase = (phase + 1) & MAX_PHASE;
                //n 加上下一阶段的值
                n |= (long)nextPhase << PHASE_SHIFT;
                //修改state的值为n
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT); // terminated
                //唤醒其它参与者并进入下一个阶段
                releaseWaiters(phase);
                //返回下一阶段值
                return nextPhase;
            }
        }
    }
  1. 修改state中unarrived部分的值减1;
  2. 如果不是最后一个到达的,则调用internalAwaitAdvance()方法自旋或排队等待;
  3. 如果为分层结构,则交由父节点处理arriveAndAwaitAdvance逻辑;
  4. 如果是最后一个到达的,则调用onAdvance()方法,然后修改state的值为下一阶段对应的值,并唤醒其它等待的线程;
  5. 返回下一阶段的值;

arriveAndDeregister()

到达这个阶段,并从它注销,而无需等待别人的到来。

1
2
3
    public int arriveAndDeregister() {
        return doArrive(ONE_DEREGISTER);
    }

doArrive(int adjust)

动调整到达数,使当前线程到达phaser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    private int doArrive(int adjust) {
        final Phaser root = this.root;
        for (;;) {
            //state值
            long s = (root == this) ? state : reconcileState();
            //当前阶段
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            //parties的值
            int counts = (int)s;
            //未到达数量
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            //CAS更新state值
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                //未到达线程数为1,当前为最后一个未到达任务
                if (unarrived == 1) {
                    long n = s & PARTIES_MASK;  // base of next state
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    if (root == this) {
                        //检查是否需要终止phaser
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase << PHASE_SHIFT;
                        //设置下一阶段的state
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        //释放等待phase的线程
                        releaseWaiters(phase);
                    }分层结构使用父节点操作
                    else if (nextUnarrived == 0) { // propagate deregistration
                        phase = parent.doArrive(ONE_DEREGISTER);
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    else
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                return phase;
            }
        }
    }
  1. 首先更新state(state - adjust);
  2. 如果当前不是最后一个未到达的任务,直接返回phase;
  3. 如果当前是最后一个未到达的任务:
    • a) 如果当前是root节点,判断是否需要终止phaser,CAS更新phase,最后释放等待的线程;
    • b) 如果是分层结构,并且已经没有下一代未到达的parties,则交由父节点处理doArrive逻辑,然后更新state为EMPTY。

阻塞到达

awaitAdvance(int phase)

阻塞等待线程到达,直到phase前进到下一代,返回下一代的phase number。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        //获取state值
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        //传入阶段小于0,返回
        if (phase < 0)
            return phase;
        //传入阶段和当前节点相等
        if (p == phase)
            //阻塞等待
            return root.internalAwaitAdvance(phase, null);
        return p;
    }
    

awaitAdvanceInterruptibly(int phase)

阻塞等待线程到达,响应中断版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    public int awaitAdvanceInterruptibly(int phase)
        throws InterruptedException {
        final Phaser root = this.root;
        //获取state
        long s = (root == this) ? state : reconcileState();
        //当前阶段
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, false, 0L);
            p = root.internalAwaitAdvance(phase, node);
            //响应中断
            if (node.wasInterrupted)
                throw new InterruptedException();
        }
        return p;
    }

awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)

阻塞等待线程到达,响应中断并设置超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    public int awaitAdvanceInterruptibly(int phase,
                                         long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        long nanos = unit.toNanos(timeout);
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase) {
            QNode node = new QNode(this, phase, true, true, nanos);
            p = root.internalAwaitAdvance(phase, node);
            if (node.wasInterrupted)
                throw new InterruptedException();
            else if (p == phase)
                throw new TimeoutException();
        }
        return p;
    }

总结

  1. Phaser适用于多阶段多任务的场景,每个阶段的任务都可以控制得很细;
  2. Phaser内部使用state变量及队列实现整个逻辑;
  3. state的高32位存储当前阶段phase,中16位存储当前阶段参与者(任务)的数量parties,低16位存储未完成参与者的数量unarrived;
  4. 队列会根据当前阶段的奇偶性选择不同的队列;
  5. 当不是最后一个参与者到达时,会自旋或者进入队列排队来等待所有参与者完成任务;
  6. 当最后一个参与者完成任务时,会唤醒队列中的线程并进入下一个阶段;