Java集合-22丨DelayQueue

Posted by jiefang on December 20, 2019

DelayQueue

简介

Java注释

An unbounded {@linkplain BlockingQueue blocking queue} of {@code Delayed} elements, in which an element can only be taken when its delay has expired. The head of the queue is that {@code Delayed} element whose delay expired furthest in the past. If no delay has expired there is no head and {@code poll} will return {@code null}. Expiration occurs when an element’s {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using {@code take} or {@code poll}, they are otherwise treated as normal elements. For example, the {@code size} method returns the count of both expired and unexpired elements.This queue does not permit null elements.

翻译

Delayed元素的无界阻塞队列,其中一个元素只能在其延迟过期后才能使用。队列的head是该Delayed元素,其延迟在过去最远过期。如果没有延迟,则没有头,poll将返回null。当元素的getDelay(TimeUnit.NANOSECONDS)方法返回的值小于或等于零时,就会发生过期。即使无法使用takepoll删除未过期的元素,也会将它们视为普通元素。例如,size方法返回已过期和未过期元素的计数。 此队列不允许空元素。

DelayQueue的延时策略:

  • 存储元素必须实现Delayed接口;
  • 内部持有一个ReentrantLock保证线程安全;
  • 使用优先级队列PriorityQueue实现元素存储;
  • 持有一个优化内部阻塞通知的线程leader;
  • 用于实现阻塞的Condition对象;

DelayQueue 其实就是在每次往优先级队列中添加元素,然后以元素的 delay(过期值)作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。

类图

DelayQueue

源码

属性

1
2
3
4
5
6
7
8
    //锁
    private final transient ReentrantLock lock = new ReentrantLock();
    //优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //用于标记当前是否有线程在排队
    private Thread leader = null;
    //条件
    private final Condition available = lock.newCondition();    

构造方法

1
2
3
4
    public DelayQueue() {}
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }    

入队

DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    public void put(E e) {
        offer(e);
    }
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }    
  1. 加锁;
  2. 添加元素到优先级队列;
  3. 如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
  4. 解锁;

出队

remove(Object o)

1
2
3
4
5
6
7
8
9
10
11
12
    //从该队列中移除指定元素的单个实例,如果其存在,其是否已过期
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //调用优先队列的remove方法
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    //检索并移除此队列的头部,或者返回null ,如果该队列具有已到期延迟的元素
    public E poll() {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //第一个元素
            E first = q.peek();
            //如果为空或者还没到期,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else//弹出第一个元素
                return q.poll();
        } finally {
            //解锁
            lock.unlock();
        }
    }

poll(long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //可中断锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                //第一个元素
                E first = q.peek();
                if (first == null) {
                    //超时返回null
                    if (nanos <= 0)
                        return null;
                    else
                        //阻塞一段时间
                        nanos = available.awaitNanos(nanos);
                } else {
                    //获取延迟
                    long delay = first.getDelay(NANOSECONDS);
                    //延迟时间已过去,返回元素
                    if (delay <= 0)
                        return q.poll();
                    //到达超时时间,返回null
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    //超时时间小于延迟时间或leader不为空
                    if (nanos < delay || leader != null)
                        //阻塞nanos时间
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //阻塞delay时间
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //唤醒阻塞线程
            if (leader == null && q.peek() != null)
                available.signal();
            //解锁
            lock.unlock();
        }
    }
  1. 加锁;
  2. 取出堆顶元素,如果为空,判断是否超时,如果超时直接返回null,否则阻塞nanos时间;
  3. 栈顶元素不为空判断延迟时间是否到期,到期取出元素返回;
  4. 超时时间小于延迟时间或leader不为空,阻塞nanos时间;
  5. 超时时间大于延迟时间且leader为空,阻塞delay时间,最后leader为null;
  6. 唤醒阻塞线程,解锁;

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    //检索并移除此队列的头部,如有必要等到到期延迟的元素可以用此队列
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //可中断锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                //第一个元素
                E first = q.peek();
                //first为空,阻塞
                if (first == null)
                    available.await();
                else {
                    //判断第一个元素延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    //到达延迟时间,返回弹出栈顶元素
                    if (delay <= 0)
                        return q.poll();
                    //方便gc
                    first = null; // don't retain ref while waiting
                    //如果前面有其它线程在等待,直接进入等待
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //等待delay时间后自动醒过来
                            // 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
                            // 这里即使醒过来后也不一定能获取到元素
                            // 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
                            available.awaitNanos(delay);
                        } finally {
                            //如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
            if (leader == null && q.peek() != null)
                available.signal();
            //解锁,这才是真正的唤醒
            lock.unlock();
        }
    }
  1. 加锁;
  2. 判断堆顶元素是否为空,为空的话直接阻塞等待;
  3. 判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
  4. 没到期,再判断前面是否有其它线程在等待,有则直接等待;
  5. 前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
  6. 获取到元素之后再唤醒下一个等待的线程;
  7. 解锁;

总结

  1. DelayQueue是阻塞队列;
  2. DelayQueue内部存储结构使用优先级队列;
  3. DelayQueue使用重入锁和条件来控制并发安全;
  4. DelayQueue常用于定时任务;