Friday, November 20, 2015

Java ForkJoin Parallel



Executors reduce the overhead introduced by thread creation reusing the threads. Internally, it manages a pool of threads that reuses threads to execute multiple tasks.

the Fork/Join framework must be used to implement solutions to problems based on the divide and conquer technique. You have to divide the original problem into smaller problems until they are small enough to be solved directly

if ( problem.size() > DEFAULT_SIZE) {
    divideTasks();
    executeTask();
    taskResults=joinTasksResult();
    return taskResults;
} else {
    taskResults=solveBasicProblem();
    return taskResults;
}
The fork() method: This method allows you to send a child task to the Fork/Join executor
The join() method: This method allows you to wait for the finalization of a child task and returns its result

the work-stealing algorithm, which determines which tasks to be executed. When a task is waiting for the finalization of a child task using the join() method, the thread that is executing that task takes another task from the pool of tasks that are waiting and starts its execution. In this way, the threads of the Fork/Join executor are always executing a task by improving the performance of the application.

every Java application has a default ForkJoinPool named common pool. You can obtain it by calling the ForkJoinPool.commonPool() static method. You don't need to create one explicitly (although you can). This default Fork/Join executor will use by default the number of threads determined by the available processors of your computer. You can change this default behavior changing the value of the system property java.util.concurrent.ForkJoinPool.common.parallelism.

ForkJoinTask
RecursiveTask - return result
RecursiveAction - don't return result
CountedCompleter -  implement tasks that trigger other tasks when they're completed

http://sighingnow.github.io/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/java_thread.html

Fork/Join

Fork/Join机制是JDK 7新增加的多线程框架,如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。下图可大概说明Fork/Join模式的结构:
Fork/Join模式

Fork/Join框架的几个核心类:

  1. ForkJoinPool: 线程池实现。实现了Work-stealing(工作窃取)算法,线程会主动寻找新创建的任务去执行,从而保证较高的线程利用率。程序运行时只需要唯一的一个ForkJoinPool。
  2. ForkJoinTask: 任务类,有两个子类实现具体的Task:
    1. RecursiveAction: 无返回值;
    2. RecursiveTask: 有返回值。

使用Fork/Join

首先,我们需要为程序创建一个ForkJoinPool:
Creates a ForkJoinPool with parallelism equal to java.lang.Runtime.availableProcessors,
private static ForkJoinPool pool = new ForkJoinPool();
这儿ForkJoinPool的构造函数有重载方法,可以通过参数设置线程数量(并行级别,parallelism level)以及ThreadFactory。
Task类需要实现compute方法,ForkJoinTask的代码框架:
If (problem size > default size){
    task s = divide(task);
    execute(tasks);
} 
else {
    resolve problem using another algorithm;
}

无返回值Task的示例

通过并行,将一个数组中每个元素的值都置为其索引值,即令a[i] = i
class Task {
    // Creates a ForkJoinPool with parallelism equal to
    // java.lang.Runtime.availableProcessors,
    private static ForkJoinPool pool = new ForkJoinPool();
    private static final int default_size = 10;

    public void solve() {
        System.out.println("available processor number: " +
                java.lang.Runtime.getRuntime().availableProcessors());
        int[] a = new int[100];
        SubTask task = new SubTask(a, 0, 100);
        System.out.println("task start!");
        pool.invoke(task);
        System.out.println("task finish!");
    }

    class SubTask extends RecursiveAction {
        int[] a;
        int l, r;

        public SubTask(int[] a, int l, int r) {
            this.a = a;
            this.l = l;
            this.r = r;
        }

        @Override
        protected void compute() {
            System.out.println("Thread id: " + Thread.currentThread().getId());
            if (r - l > BB.default_size) {
                int mid = (l + r) / 2;
                invokeAll(new SubTask(a, l, mid + 1), new SubTask(a, mid, r));
            }
            else {
                for (int i = l; i < r; ++i) {
                    a[i] = i;
                }
            }
        }
    }
}

有返回值的Task

如果子任务有返回值,只需要改成继承RecursiveTask类,然后compute方法返回对应类型的返回值即可。例如:
class SubTask extends RecursiveTask<Integer> {
    public Integer compute() {
        // ...
    }
}

SubTask task = new SubTask(...);
Integer result = task.get()
在fork子任务时,只需要:
SubTask t1 = new SubTask(...);
SubTask t2 = new SubTask(...);
invokeAll(t1, t2);
try {
    result = t1.get()+t2.get();  
} catch (InterruptedException | ExecutionException e) {  
    e.printStackTrace();  
}
return result;

异步执行Task

上面两个示例都是同步执行的,invoke与invokeAll都是阻塞当前线程的。当Task线程运行时会阻塞父线程,而在很多场合,我们需要Task线程异步执行。这是需要使用到execute或者submit方法。execute方法直接执行task,而submit方法是将task提交到任务队列里边去。而shutdown方法则表示线程池不再接收新的task(ForkJoinPool是有守护线程的)(shutdown之后再submit后产生RejectedExecutionException)。ForkJoinPool线程池提供了execute()方法来异步启动任务,而作为任务本身,可以调用fork()方法异步启动新的子任务,并调用子任务的join()方法来取得计算结果。通过通过task.isDone()方法来判断任务是否结束。
public boolean allDone(List<SubTask> tasks) {
    for(SubTask task: tasks) {
        if(!task.isDone()) {
            return false;
        }
    }
    return true;
}
当使用Fork/Join框架时,如果主线程(main方法)先于task线程结束了,那么task线程也会结束,而不会等待执行完。这也是与Java中传统的Thread/Runnable有区别的地方。至于原因,应该是主线程(main方法)结束导致ForkJoinPool的守护线程结束了。此外,ForkJoinPoolawaitTermination方法也值得注意。execute/submit/fork/join也可以与invoke/invokeAll配合使用,来调整线程间的阻塞关系。
JDK 7 中的 Fork/Join 模式一文中,作者使用Fork/Join模式实现了并行快速排序算法,很值得参考。其实Fork/Join与Callable/Future/ExecutorService挺像的,ExecutorService与ForkJoinPool的API也很类似,ForkJoinPool比ExecutorService多出了Work-stealing(工作窃取)算法的调度,线程池和服务(Service)的概念对应地也能很好。在Fork/Join机制中,Task也是可以取消(cancel)的。

invoke与fork差异

这两个方法有很大的区别,当使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给线程池的任务完成它的执行。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。

Work-stealing

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。核心思想在于以下两点:
  1. 将一个任务分解为多个互不依赖的子任务。
  2. 当一个线程完成自己队列中的任务后,将其他线程的任务队列里的任务取出来执行。
Work-stealing算法示意图

Disruptor机制

单核情形下提高CPU利用率。

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts