Friday, November 20, 2015

Java ForkJoin Parallel

Executors reduce the overhead introduced by thread creation reusing the threads. Internally, it manages a pool of threads that reuses threads to execute multiple tasks.

the Fork/Join framework must be used to implement solutions to problems based on the divide and conquer technique. You have to divide the original problem into smaller problems until they are small enough to be solved directly

if ( problem.size() > DEFAULT_SIZE) {
    return taskResults;
} else {
    return taskResults;
The fork() method: This method allows you to send a child task to the Fork/Join executor
The join() method: This method allows you to wait for the finalization of a child task and returns its result

the work-stealing algorithm, which determines which tasks to be executed. When a task is waiting for the finalization of a child task using the join() method, the thread that is executing that task takes another task from the pool of tasks that are waiting and starts its execution. In this way, the threads of the Fork/Join executor are always executing a task by improving the performance of the application.

every Java application has a default ForkJoinPool named common pool. You can obtain it by calling the ForkJoinPool.commonPool() static method. You don't need to create one explicitly (although you can). This default Fork/Join executor will use by default the number of threads determined by the available processors of your computer. You can change this default behavior changing the value of the system property java.util.concurrent.ForkJoinPool.common.parallelism.

RecursiveTask - return result
RecursiveAction - don't return result
CountedCompleter -  implement tasks that trigger other tasks when they're completed


Fork/Join机制是JDK 7新增加的多线程框架,如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。下图可大概说明Fork/Join模式的结构:


  1. ForkJoinPool: 线程池实现。实现了Work-stealing(工作窃取)算法,线程会主动寻找新创建的任务去执行,从而保证较高的线程利用率。程序运行时只需要唯一的一个ForkJoinPool。
  2. ForkJoinTask: 任务类,有两个子类实现具体的Task:
    1. RecursiveAction: 无返回值;
    2. RecursiveTask: 有返回值。


Creates a ForkJoinPool with parallelism equal to java.lang.Runtime.availableProcessors,
private static ForkJoinPool pool = new ForkJoinPool();
这儿ForkJoinPool的构造函数有重载方法,可以通过参数设置线程数量(并行级别,parallelism level)以及ThreadFactory。
If (problem size > default size){
    task s = divide(task);
else {
    resolve problem using another algorithm;


通过并行,将一个数组中每个元素的值都置为其索引值,即令a[i] = i
class Task {
    // Creates a ForkJoinPool with parallelism equal to
    // java.lang.Runtime.availableProcessors,
    private static ForkJoinPool pool = new ForkJoinPool();
    private static final int default_size = 10;

    public void solve() {
        System.out.println("available processor number: " +
        int[] a = new int[100];
        SubTask task = new SubTask(a, 0, 100);
        System.out.println("task start!");
        System.out.println("task finish!");

    class SubTask extends RecursiveAction {
        int[] a;
        int l, r;

        public SubTask(int[] a, int l, int r) {
            this.a = a;
            this.l = l;
            this.r = r;

        protected void compute() {
            System.out.println("Thread id: " + Thread.currentThread().getId());
            if (r - l > BB.default_size) {
                int mid = (l + r) / 2;
                invokeAll(new SubTask(a, l, mid + 1), new SubTask(a, mid, r));
            else {
                for (int i = l; i < r; ++i) {
                    a[i] = i;


class SubTask extends RecursiveTask<Integer> {
    public Integer compute() {
        // ...

SubTask task = new SubTask(...);
Integer result = task.get()
SubTask t1 = new SubTask(...);
SubTask t2 = new SubTask(...);
invokeAll(t1, t2);
try {
    result = t1.get()+t2.get();  
} catch (InterruptedException | ExecutionException e) {  
return result;


public boolean allDone(List<SubTask> tasks) {
    for(SubTask task: tasks) {
        if(!task.isDone()) {
            return false;
    return true;
JDK 7 中的 Fork/Join 模式一文中,作者使用Fork/Join模式实现了并行快速排序算法,很值得参考。其实Fork/Join与Callable/Future/ExecutorService挺像的,ExecutorService与ForkJoinPool的API也很类似,ForkJoinPool比ExecutorService多出了Work-stealing(工作窃取)算法的调度,线程池和服务(Service)的概念对应地也能很好。在Fork/Join机制中,Task也是可以取消(cancel)的。




  1. 将一个任务分解为多个互不依赖的子任务。
  2. 当一个线程完成自己队列中的任务后,将其他线程的任务队列里的任务取出来执行。




Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts