DelayQueue
简介
Java注释
An unbounded {@linkplain BlockingQueue blocking queue} of {@code Delayed} elements, in which an element can only be taken when its delay has expired. The head of the queue is that {@code Delayed} element whose delay expired furthest in the past. If no delay has expired there is no head and {@code poll} will return {@code null}. Expiration occurs when an element’s {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using {@code take} or {@code poll}, they are otherwise treated as normal elements. For example, the {@code size} method returns the count of both expired and unexpired elements.This queue does not permit null elements.
翻译
Delayed
元素的无界阻塞队列,其中一个元素只能在其延迟过期后才能使用。队列的head
是该Delayed
元素,其延迟在过去最远过期。如果没有延迟,则没有头,poll
将返回null
。当元素的getDelay(TimeUnit.NANOSECONDS)
方法返回的值小于或等于零时,就会发生过期。即使无法使用take
或poll
删除未过期的元素,也会将它们视为普通元素。例如,size
方法返回已过期和未过期元素的计数。 此队列不允许空元素。
DelayQueue
的延时策略:
- 存储元素必须实现
Delayed
接口; - 内部持有一个
ReentrantLock
保证线程安全; - 使用优先级队列
PriorityQueue
实现元素存储; - 持有一个优化内部阻塞通知的线程leader;
- 用于实现阻塞的
Condition
对象;
DelayQueue
其实就是在每次往优先级队列中添加元素,然后以元素的 delay(过期值)作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。
类图
源码
属性
1
2
3
4
5
6
7
8
//锁
private final transient ReentrantLock lock = new ReentrantLock();
//优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//用于标记当前是否有线程在排队
private Thread leader = null;
//条件
private final Condition available = lock.newCondition();
构造方法
1
2
3
4
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
入队
DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
- 加锁;
- 添加元素到优先级队列;
- 如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
- 解锁;
出队
remove(Object o)
1
2
3
4
5
6
7
8
9
10
11
12
//从该队列中移除指定元素的单个实例,如果其存在,其是否已过期
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//调用优先队列的remove方法
return q.remove(o);
} finally {
lock.unlock();
}
}
poll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//检索并移除此队列的头部,或者返回null ,如果该队列具有已到期延迟的元素
public E poll() {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//第一个元素
E first = q.peek();
//如果为空或者还没到期,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else//弹出第一个元素
return q.poll();
} finally {
//解锁
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
for (;;) {
//第一个元素
E first = q.peek();
if (first == null) {
//超时返回null
if (nanos <= 0)
return null;
else
//阻塞一段时间
nanos = available.awaitNanos(nanos);
} else {
//获取延迟
long delay = first.getDelay(NANOSECONDS);
//延迟时间已过去,返回元素
if (delay <= 0)
return q.poll();
//到达超时时间,返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
//超时时间小于延迟时间或leader不为空
if (nanos < delay || leader != null)
//阻塞nanos时间
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//阻塞delay时间
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//唤醒阻塞线程
if (leader == null && q.peek() != null)
available.signal();
//解锁
lock.unlock();
}
}
- 加锁;
- 取出堆顶元素,如果为空,判断是否超时,如果超时直接返回null,否则阻塞nanos时间;
- 栈顶元素不为空判断延迟时间是否到期,到期取出元素返回;
- 超时时间小于延迟时间或leader不为空,阻塞nanos时间;
- 超时时间大于延迟时间且leader为空,阻塞delay时间,最后leader为null;
- 唤醒阻塞线程,解锁;
take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//检索并移除此队列的头部,如有必要等到到期延迟的元素可以用此队列
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
for (;;) {
//第一个元素
E first = q.peek();
//first为空,阻塞
if (first == null)
available.await();
else {
//判断第一个元素延迟时间
long delay = first.getDelay(NANOSECONDS);
//到达延迟时间,返回弹出栈顶元素
if (delay <= 0)
return q.poll();
//方便gc
first = null; // don't retain ref while waiting
//如果前面有其它线程在等待,直接进入等待
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//等待delay时间后自动醒过来
// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
// 这里即使醒过来后也不一定能获取到元素
// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
available.awaitNanos(delay);
} finally {
//如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
available.signal();
//解锁,这才是真正的唤醒
lock.unlock();
}
}
- 加锁;
- 判断堆顶元素是否为空,为空的话直接阻塞等待;
- 判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
- 没到期,再判断前面是否有其它线程在等待,有则直接等待;
- 前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
- 获取到元素之后再唤醒下一个等待的线程;
- 解锁;
总结
DelayQueue
是阻塞队列;DelayQueue
内部存储结构使用优先级队列;DelayQueue
使用重入锁和条件来控制并发安全;DelayQueue
常用于定时任务;