深入浅出java7中的Fork/Join框架 | Yet Another Thoughts
Fork/Join是java 7中新添加的新特性,其中ForkJoinPool用于对Jave5中添加的ExecutorService的一个补充与完善。
ExecutorService可以看做是一个线程池,而ForkJoinPool是一个运行着worker线程的线程池,worker线程用于对ForkJoinTask进行并行化的计算。ForkJoinPool使用了work-stealing算法 作为调度器实现。
ForkJoinPool常用于利用多核处理器处理可以分而治之的,可递归运算的任务。相比于『线程池』只可以在任务级别分布到多个处理器核心上计算,ForkJoinPool可以对可分解成子任务的、不同子任务之间存在依赖的偏序关系的任务的并行化处理的管理和调度,fork/join鼓励你通过task之间通过返回future的值和构造子任务的方式进行信息传递而非共享内存与资源竞争的方式。
另外一方面,ForkJoinPool在处理大量琐碎任务时也有优势。
Scala的akka库(Actor model)就是基于ForkJoinPool实现的。Java8中的java.util.Arrays.parallelSort()也是使用ForkJoinPool实现的。
How?
以下的代码可以看repo: conndots/forkjoin。
Fork/Join有两种Task:RecursiveTask< T >和RecursiveAction(无返回值的Task)。
下面的简单的例子提供了一种使用ForkJoinPool实现的快排算法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | public class QuickSortAction<T extends Comparable<T>> extends RecursiveAction { private final ArrayList<T> toSortArray; private final int start; private final int end; public QuickSortAction(ArrayList<T> toSort, int start, int end) { Preconditions.checkNotNull(toSort); Preconditions.checkArgument(start < end, "end must be larger than start"); Preconditions.checkArgument(start >= 0, "start must be larger than -1"); toSortArray = toSort; this.start = start; this.end = end; } private int pivotAndSplit() { int left = start, right = end - 1; T pivot = toSortArray.get(start); while (left < right) { while (toSortArray.get(right).compareTo(pivot) > 0 && left < right) { --right; } if (left == right) { break; } toSortArray.set(left++, toSortArray.get(right)); while (toSortArray.get(left).compareTo(pivot) <= 0 && left < right) { ++left; } if (left == right) { break; } toSortArray.set(right--, toSortArray.get(left)); } toSortArray.set(left, pivot); return left; } protected void compute() { int pivotPosition = pivotAndSplit(); List<RecursiveAction> todos = Lists.newArrayList(); if (pivotPosition > start + 1) { todos.add(new QuickSortAction(toSortArray, start, pivotPosition)); } if (end > pivotPosition + 1 + 1) { todos.add(new QuickSortAction(toSortArray, pivotPosition + 1, end)); } invokeAll(todos); //todos里面的Task依赖于当前的Task。 } } |
使用ForkJoinPool启动这个快排。
1 2 | ForkJoinPool pool = new ForkJoinPool(16, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false); pool.invoke(new QuickSortAction(toSort, 0, toSort.size()); |
ForkJoinPool提供了几个有意思的参数:parallelism(并行化的程度,默认等于机器的可用核心数), async(异步模式)。async指定异步模式有利于事件驱动的任务,这些任务不会被join。经过试验,对于随机产生的1亿随机整型的List,异步模式与非异步模式的快排时间对比如下:
- non-async: 42.303s
- async: 16.428s
下面的章节中,将介绍异步模式的实现。
ForkJoinTask
上面已经提到了,Fork/Join框架运行在比Future 更加轻量的ForkJoinTask之上。上边提到的RecursiveTask与RecursiveAction都是ForkJoinTask的子类,ForkJoin还有一个子类CountedCompleter。
ForkJoinTask是轻量级的Future。它的高效源自于许多限制,这些限制来源于ForkJoinTask的目标:它们应当被视作计算纯函数或者是互相独立的计算任务。
用于协作的主要有两个方法:fork()(或者invoke),安排异步执行,还有join(),在被调用的task完成之前当前线程不会继续执行。
不同任务之间的信息传递通过这样的机制进行,而非对共享内存的访问,因为这样的操作会带来为了同步而产生的资源竞争和阻塞。所以在ForkJoinTask中尽量避免使用synchronized方法或代码块。
可见,传统的观点中,递归实现相比迭代实现会有更低的效率,因为方法调用的开销。然而函数式的编程范式带来的好处是显而易见的,天然适用于多核甚至分布式的运行环境之下,因为递归的方式编译器可以实现对任务的分而治之的分解,而由于各个分治的子任务是无副作用、限制访问共享资源而非竞争的,所以可以无忧地将任务调度到不同的线程甚至进程去运算;需要的消息传递通过任务初始化和任务的返回结果来承担。
work-stealing算法(TODO)
ForkJoinPool包含了一个给定数目线程数目的线程池。一个任务在fork新的任务的时候,当前的线程会将任务挂载在自己的任务队列中。而任意一个线程在做完自己队列里的全部任务后会去其他的线程的任务队列去『偷取』任务过来做。work-stealing算法将调度工作分布到了闲置下来的线程中,而只要所有的线程有工作去做,就不会发生调度过度(scheduling overhead)的现象发生。
相比之下,另一种调度方法work-sharing方法会在spawn一个新的Job时候把它调度到一个线程(processor)之上,增加了不同线程间的任务迁移负担。