Fork丨Join框架
简介
Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。
Fork/Join框架基础类
- ForkJoinPool:用来执行Task,或生成新的
ForkJoinWorkerThread
,执行ForkJoinWorkerThread
间的work-stealing
逻辑。ForkJoinPool
不是为了替代ExecutorService
,而是它的补充,在某些应用场景下性能比ExecutorService
更好。 - ForkJoinTask:执行具体的分支逻辑,声明以同步/异步方式进行执行
- ForkJoinWorkerThread: 是
ForkJoinPool
内的worker thread
,执行ForkJoinTask
,内部有ForkJoinPool.WorkQueue
来保存要执行的ForkJoinTask
。 - ForkJoinPool.WorkQueue:任务队列保存要执行的
ForkJoinTask
。
分治算法
分治算法(Divide-and-Conquer):把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。
基本思想
把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。
步骤:
- 分割原问题;
- 求解子问题;
- 合并子问题的解为原问题的解。
典型应用场景
- 二分搜索
- 大整数乘法
- Strassen矩阵乘法
- 棋盘覆盖
- 归并排序
- 快速排序
- 线性时间选择
- 汉诺塔
工作窃取
ForkJoinPool
使用了work-stealing
(工作窃取)算法:线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool
在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的ForkJoinPool
时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool
中,线程池中每个工作线程(ForkJoinWorkerThread
)都对应一个任务队列(WorkQueue
),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数mode决定),然后以FIFO的顺序随机窃取其他队列中的任务。
流程:
- 每个工作线程都有自己的工作队列
WorkQueue
; - 这是一个双端队列,它是线程私有的;
ForkJoinTask
中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务;- 为了最大化地利用CPU,空闲的线程将从其它线程的队列中“窃取”任务来执行;
- 从工作队列的尾部窃取任务,以减少竞争;
- 双端队列的操作:
push()/pop()
仅在其所有者工作线程中调用,poll()
是由其它线程窃取任务时调用的; - 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;
ForkJoinPool
ForkJoinPool
的主要工作如下:
- 接受外部任务的提交(外部调用
ForkJoinPool
的invoke/execute/submit
方法提交任务); - 接受
ForkJoinTask
自身fork
出的子任务的提交; - 任务队列数组(
WorkQueue[]
)的初始化和管理; - 工作线程(
Worker
)的创建/管理。
invoke、execute、submit
的区别:
- 通过invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;
- 通过execute方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且没有返回结果;
- 通过submit方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且有返回结果(返回Future实现类,可以通过get获取结果)。
ForkJoinPool
支持两种模式:
- 同步模式(默认方式):对于工作线程(Worker)自身队列中的任务,采用后进先出(
LIFO
)的方式执行; - 异步模式:对于工作线程(Worker)自身队列中的任务,采用先进先出(
FIFO
)的方式执行。
ForkJoinTask
ForkJoinTask
实现了Future
接口,是一个异步任务,我们在使用Fork/Join框架时,一般需要使用线程池来调度任务,线程池内部调度的其实都是ForkJoinTask
任务(即使提交的是一个Runnable
或Callable
任务,也会被适配成ForkJoinTask
)。
子类实现:
- RecursiveAction:表示具有返回结果的ForkJoin任务;
- RecursiveTask:表示没有返回结果的ForkJoin任务;
ForkJoinWorkerThread
Fork/Join框架中,每个工作线程(Worker)都有一个自己的任务队列(WorkerQueue
), 所以需要对一般的Thread做些特性化处理,J.U.C提供了ForkJoinWorkerThread
类作为ForkJoinPool
中的工作线程。
ForkJoinWorkerThread
在构造过程中,会保存所属线程池信息和与自己绑定的任务队列信息。同时,它会通过ForkJoinPool
的registerWorker
方法将自己注册到线程池中。
WorkQueue
任务队列(WorkQueue
)是ForkJoinPool
与其它线程池区别最大的地方,在ForkJoinPool
内部,维护着一个WorkQueue[]
数组,它会在外部首次提交任务)时进行初始化。
WorkQueue
作为ForkJoinPool
的内部类,表示一个双端队列。双端队列既可以作为栈使用(LIFO),也可以作为队列使用(FIFO)。ForkJoinPool
的“工作窃取”正是利用了这个特点,当工作线程从自己的队列中获取任务时,默认总是以栈操作(LIFO)的方式从栈顶取任务;当工作线程尝试窃取其它任务队列中的任务时,则是FIFO的方式。
ForkJoinPool
中的工作队列分类:
- 有工作线程(Worker)绑定的任务队列:数组下标始终是奇数,称为
task queue
,该队列中的任务均由工作线程调用产生(工作线程调用FutureTask.fork方法); - 没有工作线程(Worker)绑定的任务队列:数组下标始终是偶数,称为
submissions queue
,该队列中的任务全部由其它线程提交(也就是非工作线程调用execute/submit/invoke或者FutureTask.fork方法)。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ForkJoinPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int length = 100000000;
long[] arr = new long[length];
for (int i = 0; i < length; i++) {
arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
}
forkJoinSum(arr);
}
private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
//构建线程池
ForkJoinPool pool = ForkJoinPool.commonPool();
//提交任务
ForkJoinTask<Long> forkJoinTask = pool.submit(new SumTask(arr, 0, arr.length));
//获取结果
Long sum = forkJoinTask.get();
//关闭线程池
pool.shutdown();
System.out.println("sum: " + sum);
System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));
}
private static class SumTask extends RecursiveTask<Long>{
private long[] array;
private int from;
private int to;
public SumTask(long[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
if(to-from<=1000){
long sum = 0;
for(int i=from;i<to;i++){
sum += (array[i]/3*3/3*3/3*3/3*3/3*3);
}
return sum;
}
//分成两个任务
int middle = (from + to)/2;
SumTask left = new SumTask(array,from,middle);
SumTask right = new SumTask(array,middle,to);
//提交左边任务
left.fork();
//右边的任务直接利用当前线程计算
Long rightResult = right.compute();
//获取左边任务结果
Long leftResult = left.join();
return leftResult + rightResult;
}
}
}