ScheduledThreadPoolExecutor
简介
Java注释
A {@link ThreadPoolExecutor} that can additionally schedule commands to run after a given delay, or to execute periodically. This class is preferable to {@link java.util.Timer} when multiple worker threads are needed, or when the additional flexibility or capabilities of {@link ThreadPoolExecutor} (which this class extends) are required.
翻译
可以安排命令在给定延迟后运行或定期执行的
ThreadPoolExecutor
。当需要多个工作线程时,或者需要ThreadPoolExecutor
的附加灵活性或功能时,此类比java.util.Timer
更可取。
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
,为任务提供延迟或周期执行,属于线程池的一种。
类图
源码
属性
1
2
3
4
5
6
7
8
//如果应在关机时取消/取消定期任务,则为False。
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//如果应在关闭时取消非定期任务,则为False。
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//如果ScheduledFutureTask.cancel应该从队列中删除,则为True
private volatile boolean removeOnCancel = false;
//为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();
内部类
DelayedWorkQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//为存储周期或延迟任务定义的延迟队列,继承了 AbstractQueue,实现BlockingQueue 接口。它内部只允许存储 RunnableScheduledFuture 类型的任务。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//初始容量
private static final int INITIAL_CAPACITY = 16;
//初始队列
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//锁
private final ReentrantLock lock = new ReentrantLock();
//元素数量
private int size = 0;
//队列头等待任务的线程
private Thread leader = null;
//当队列头上有新任务可用时或新线程可能成为领导者时,发出条件信号。
private final Condition available = lock.newCondition();
//ThreadPoolExecutor.getTask()会调到take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
for (;;) {
//堆顶任务
RunnableScheduledFuture<?> first = queue[0];
//任务为空,阻塞
if (first == null)
available.await();
else {
//返回剩余延迟时间
long delay = first.getDelay(NANOSECONDS);
//小于等于0,说明这个任务到时间了,可以从队列中出队了
if (delay <= 0)
//出队,堆化
return finishPoll(first);
//没到时间
first = null; // don't retain ref while waiting
//如果前面有线程在等待,直接进入等待
if (leader != null)
available.await();
else {
//否则当前线程是leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//等待延时时间,然后唤醒
available.awaitNanos(delay);
} finally {
//leader置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//唤醒下一个等待的线程
if (leader == null && queue[0] != null)
available.signal();
//解锁
lock.unlock();
}
}
}
ScheduledFutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//FIFO的任务编号
private final long sequenceNumber;
//延迟时间
private long time;
//重复任务的周期(以纳秒为单位)。正表示固定汇率执行。负值表示固定延迟执行。值为0表示非重复任务。
private final long period;
//实际任务
RunnableScheduledFuture<V> outerTask = this;
//队列索引
int heapIndex;
//覆盖FutureTask的run方法
public void run() {
//是否周期性任务
boolean periodic = isPeriodic();
//线程池状态判断,线程池关闭策略
if (!canRunInCurrentRunState(periodic))
//取消
cancel(false);
//一次性任务调用FutureTask的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//周期性任务调用FutureTask的runAndReset方法
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下次任务执行时间
setNextRunTime();
//重新排队周期性任务
reExecutePeriodic(outerTask);
}
}
//设置下次任务执行时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
//重复执行周期性任务
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//判断线程池状态
if (canRunInCurrentRunState(true)) {
//添加任务到队列
super.getQueue().add(task);
//再次判断线程池状态,队列移除任务,取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else//添加线程执行任务
ensurePrestart();
}
}
}
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
compareTo(Delayed other)
排序算法,首先按照time排序,time小的排在前面,大的排在后面,如果time相同,则使用sequenceNumber排序,小的排在前面,大的排在后面。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//设置核心线程数
public ScheduledThreadPoolExecutor(int corePoolSize) {
//调用ThreadPoolExecutor的构造方法
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
//设置核心线程数和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
//设置核心线程数和线程工厂
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//设置核心线程数、线程工厂和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
定时任务
- 未来执行一次的任务,无返回值;
schedule(Runnable command,long delay, TimeUnit unit)
- 未来执行一次的任务,有返回值;
schedule(Callable<V> callable,long delay, TimeUnit unit)
- 未来按固定频率重复执行的任务;
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
- 未来按固定延时重复执行的任务;
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
schedule(Runnable command,long delay, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//参数判断
if (command == null || unit == null)
throw new NullPointerException();
//普通任务包装成ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//延时执行
delayedExecute(t);
return t;
}
schedule(Callable callable,long delay,TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//参数判断
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//普通任务包装成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//延时执行
delayedExecute(t);
return t;
}
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
delayedExecute(RunnableScheduledFuture<?> task)
延时执行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void delayedExecute(RunnableScheduledFuture<?> task) {
//线程池关闭了。执行拒绝策略
if (isShutdown())
reject(task);
else {
//任务添加到队列中
super.getQueue().add(task);
//检查线程池状态,如果关闭需要判断关闭策略
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))//移除任务
//取消任务
task.cancel(false);
else//启动一个新的线程等待执行任务
ensurePrestart();
}
}
void ensurePrestart() {
//当前工作线程数
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
//即使corePoolSize为0也会新增线程
else if (wc == 0)
addWorker(null, false);
}
普通任务
execute
和submit
当做普通任务执行。
1
2
3
4
5
6
7
8
9
10
11
12
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ScheduledThreadPoolExecutorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
System.out.println("currentTimeMillis: " + System.currentTimeMillis());
//执行一个无返回值的任务,5秒后执行,只执行一次
executor.schedule(()->{
System.out.println("currentTimeMillis: " + System.currentTimeMillis());
},5, TimeUnit.SECONDS);
//执行一个有返回值的任务,5秒后执行,只执行一次
ScheduledFuture<String> future = executor.schedule(()->{
System.out.println("currentTimeMillis: " + System.currentTimeMillis());
return "returnValue";
},5,TimeUnit.SECONDS);
System.out.println(future.get() + System.currentTimeMillis());
//固定频率执行任务,1秒后执行,每2秒执行一次
executor.scheduleAtFixedRate(()->{
System.out.println("固定频率: " + System.currentTimeMillis());
},1,2,TimeUnit.SECONDS);
//固定延时执行任务,1秒后执行,每2秒执行一次
executor.scheduleWithFixedDelay(()->{
System.out.println("固定延时: " + System.currentTimeMillis());
},1,2,TimeUnit.SECONDS);
}
}
总结
- 指定某个时刻执行任务,是通过延时队列的特性来解决的;
- 重复执行,是通过在任务执行后再次把任务加入到队列中来解决的。