Java多线程-30丨JUC-Fork丨Join框架

Posted by jiefang on December 26, 2019

Fork丨Join框架

简介

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。

Fork/Join框架基础类

  • ForkJoinPool:用来执行Task,或生成新的ForkJoinWorkerThread,执行ForkJoinWorkerThread间的 work-stealing逻辑。ForkJoinPool不是为了替代ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
  • ForkJoinTask:执行具体的分支逻辑,声明以同步/异步方式进行执行
  • ForkJoinWorkerThread: 是 ForkJoinPool 内的 worker thread,执行ForkJoinTask,内部有ForkJoinPool.WorkQueue来保存要执行的ForkJoinTask
  • ForkJoinPool.WorkQueue:任务队列保存要执行的ForkJoinTask

分治算法

分治算法(Divide-and-Conquer):把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。

image

基本思想

把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。

步骤:

  • 分割原问题;
  • 求解子问题;
  • 合并子问题的解为原问题的解。

典型应用场景

  • 二分搜索
  • 大整数乘法
  • Strassen矩阵乘法
  • 棋盘覆盖
  • 归并排序
  • 快速排序
  • 线性时间选择
  • 汉诺塔

工作窃取

image

ForkJoinPool使用了work-stealing工作窃取)算法:线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的ForkJoinPool时,对不需要合并(join)的事件类型任务也非常适用。

ForkJoinPool中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数mode决定),然后以FIFO的顺序随机窃取其他队列中的任务。

流程:

  1. 每个工作线程都有自己的工作队列WorkQueue
  2. 这是一个双端队列,它是线程私有的;
  3. ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务;
  4. 为了最大化地利用CPU,空闲的线程将从其它线程的队列中“窃取”任务来执行;
  5. 从工作队列的尾部窃取任务,以减少竞争;
  6. 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
  7. 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;

ForkJoinPool

ForkJoinPool的主要工作如下:

  1. 接受外部任务的提交(外部调用ForkJoinPoolinvoke/execute/submit方法提交任务);
  2. 接受ForkJoinTask自身fork出的子任务的提交;
  3. 任务队列数组(WorkQueue[])的初始化和管理;
  4. 工作线程(Worker)的创建/管理。

invoke、execute、submit的区别:

  • 通过invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;
  • 通过execute方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且没有返回结果;
  • 通过submit方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且有返回结果(返回Future实现类,可以通过get获取结果)。

ForkJoinPool支持两种模式:

  • 同步模式(默认方式):对于工作线程(Worker)自身队列中的任务,采用后进先出LIFO)的方式执行;
  • 异步模式:对于工作线程(Worker)自身队列中的任务,采用先进先出FIFO)的方式执行。

ForkJoinTask

ForkJoinTask实现了Future接口,是一个异步任务,我们在使用Fork/Join框架时,一般需要使用线程池来调度任务,线程池内部调度的其实都是ForkJoinTask任务(即使提交的是一个RunnableCallable任务,也会被适配成ForkJoinTask)。

子类实现:

  • RecursiveAction:表示具有返回结果的ForkJoin任务;
  • RecursiveTask:表示没有返回结果的ForkJoin任务;

ForkJoinWorkerThread

Fork/Join框架中,每个工作线程(Worker)都有一个自己的任务队列(WorkerQueue), 所以需要对一般的Thread做些特性化处理,J.U.C提供了ForkJoinWorkerThread类作为ForkJoinPool中的工作线程。

ForkJoinWorkerThread在构造过程中,会保存所属线程池信息和与自己绑定的任务队列信息。同时,它会通过ForkJoinPoolregisterWorker方法将自己注册到线程池中。

WorkQueue

任务队列(WorkQueue)是ForkJoinPool与其它线程池区别最大的地方,在ForkJoinPool内部,维护着一个WorkQueue[]数组,它会在外部首次提交任务)时进行初始化。

WorkQueue作为ForkJoinPool的内部类,表示一个双端队列。双端队列既可以作为使用(LIFO),也可以作为队列使用(FIFO)。ForkJoinPool的“工作窃取”正是利用了这个特点,当工作线程从自己的队列中获取任务时,默认总是以栈操作(LIFO)的方式从栈顶取任务;当工作线程尝试窃取其它任务队列中的任务时,则是FIFO的方式。

ForkJoinPool中的工作队列分类:

  • 有工作线程(Worker)绑定的任务队列:数组下标始终是奇数,称为task queue,该队列中的任务均由工作线程调用产生(工作线程调用FutureTask.fork方法);
  • 没有工作线程(Worker)绑定的任务队列:数组下标始终是偶数,称为submissions queue,该队列中的任务全部由其它线程提交(也就是非工作线程调用execute/submit/invoke或者FutureTask.fork方法)。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ForkJoinPoolTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int length = 100000000;
        long[] arr = new long[length];
        for (int i = 0; i < length; i++) {
            arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
        }
        forkJoinSum(arr);
    }
    private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        //构建线程池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        //提交任务
        ForkJoinTask<Long> forkJoinTask = pool.submit(new SumTask(arr, 0, arr.length));
        //获取结果
        Long sum = forkJoinTask.get();
        //关闭线程池
        pool.shutdown();
        System.out.println("sum: " + sum);
        System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));
    }
    private static class SumTask extends RecursiveTask<Long>{

        private long[] array;

        private int from;

        private int to;

        public SumTask(long[] array, int from, int to) {
            this.array = array;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            if(to-from<=1000){
                long sum = 0;
                for(int i=from;i<to;i++){
                    sum += (array[i]/3*3/3*3/3*3/3*3/3*3);
                }
                return sum;
            }
            //分成两个任务
            int middle = (from + to)/2;
            SumTask left = new SumTask(array,from,middle);
            SumTask right = new SumTask(array,middle,to);
            //提交左边任务
            left.fork();
            //右边的任务直接利用当前线程计算
            Long rightResult = right.compute();
            //获取左边任务结果
            Long leftResult = left.join();
            return leftResult + rightResult;
        }
    }
}