Tuesday, March 1, 2016

Java Concurrency Future



https://mp.weixin.qq.com/s/LN0ms-1ABLSEN629zGs8Ng
CountDownLatch虽然能实现我们需要满足的功能但是其任然有个问题是,在我们的业务代码需要耦合CountDownLatch的代码,比如在我们获取用户信息之后我们会执行countDownLatch.countDown(),很明显我们的业务代码显然不应该关心这一部分逻辑,并且在开发的过程中万一写漏了,那我们的await方法将只会被各种异常唤醒。

利用CompletableFuture.allOf汇聚成一个大的CompletableFuture,那么利用get()方法就可以阻塞。
  1. CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
  2.        allDoneFuture.get(10, TimeUnit.SECONDS);

  1.        list.parallelStream().forEach(integer -> {
  2. //            System.out.println("当前线程" + Thread.currentThread().getName());
  3.            sum.add(integer);
  4.        });
https://dzone.com/articles/parallel-and-asynchronous-programming-in-java-8
There are two main methods that let you start the asynchronous part of your code: supplyAsync if you want to do something with the result of the method, and runAsync if you don’t.
CompletableFuture.runAsync(() → System.out.println("Run async in completable future " + Thread.currentThread()));
 CompletableFuture.supplyAsync(() → 5);

Now you can add those callbacks to handle the result of your supplyAsync.
CompletableFuture.supplyAsync(()  5)
.thenApply(i  i * 3)
.thenAccept(i  System.out.println(“The result is  + i)
.thenRun(()  System.out.println("Finished."));

.thenApply is similar to the .map function for streams: it performs a transformation. In the example above it takes the result (5) and multiplies it by 3. It will then pass that result (15) further down the pipe.
 .thenAccept performs a method on the result without transforming it. It will also not return a result. Here it will print “The result is 15” to the console. It can be compared to the .foreach method for streams.
 .thenRun doesn’t use the result of the async operation and also doesn’t return anything, it just waits to call its Runnable until the previous step is completed.
You can chain multiple  CompletableFutures by using the  thenCompose method. Without it, the result would be nested  CompletableFutures. This makes  thenCompose and  thenApply like flatMap and map for streams.
CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture
.supplyAsync(() -> s + "World"));

If you want to combine the result of two CompletableFutures, you will need a method conveniently called  thenCombine.
future.thenCombine(future2, Integer::sum)
.thenAccept(value  System.out.println(value));

As you can see in the example above, the result of the callback in  thenCombine can be handled like a normal  CompletableFuture with all your favourite CompletionStage  methods.
http://www.deadcoderising.com/java8-writing-asynchronous-code-with-completablefuture/
Beside implementing the Future interface, CompletableFuture also implements the CompletionStage interface.
CompletionStage is a promise. It promises that the computation eventually will be done.
The great thing about the CompletionStage is that it offers a vast selection of methods that let you attach callbacks that will be executed on completion.
This way we can build systems in a non-blocking fashion.
CompletableFuture.supplyAsync(this::sendMsg);  

If you've worked a bit with Futures in the past, you may wonder where the Executor went. If you want to, you can still define it as a second argument. However, if you leave it out it will be submitted to the ForkJoinPool.commonPool()
The beauty of a callback is that we can say what should happen when an asynchronous computation is done without waiting around for the result.
CompletableFuture.supplyAsync(this::sendMsg)  
                 .thenAccept(this::notify);

If you want to continue passing values from one callback to another, thenAccept won't cut it since Consumer doesn't return anything.
To keep passing values, you can simply use thenApply instead.
thenApply takes a Function which accepts a value, but also return one.
    CompletableFuture.supplyAsync(this::findReceiver)
                     .thenApply(this::sendMsg)
                     .thenAccept(this::notify);

You'll usually want to compose new pieces of code based on smaller pieces of code. Each of these pieces would typically be asynchronous — in our case returning CompletionStages.
Until now, sendMsg has been a normal blocking function. Let's now assume that we got a sendMsgAsync method that returns a CompletionStage.
If we kept using thenApply to compose the example above, we would end up with nested CompletionStages.
CompletableFuture.supplyAsync(this::findReceiver)  
                 .thenApply(this::sendMsgAsync);

// Returns type CompletionStage<CompletionStage<String>> 
We don't want that, so instead we can use thenCompose which allows us to give a Function that returns a CompletionStage. This will have a flattening effect like a flatMap.
CompletableFuture.supplyAsync(this::findReceiver)  
                 .thenCompose(this::sendMsgAsync);

// Returns type CompletionStage<String>

CompletableFuture<String> receiver  
            = CompletableFuture.supplyAsync(this::findReceiver);

receiver.thenApplyAsync(this::sendMsg);  
receiver.thenApplyAsync(this::sendAnotherMsg);  
By using the async suffix, each message is submitted as separate tasks to the ForkJoinPool.commonPool(). This results in both the sendMsg callbacks being executed when the preceding calculation is done.
CompletableFuture.supplyAsync(this::failingMsg)  
                 .exceptionally(ex -> new Result(Status.FAILED))
                 .thenAccept(this::notify);
exceptionally gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception.
CompletableFuture<String> to =  
    CompletableFuture.supplyAsync(this::findReceiver);

CompletableFuture<String> text =  
    CompletableFuture.supplyAsync(this::createContent);

to.thenCombine(text, this::sendMsg);  
First, we've started two asynchronous jobs — finding a receiver and creating some content. Then we use thenCombine to say what we want to do with the result of these two computations by defining our BiFunction.
It's worth mentioning that there is another variant of thenCombine as well — called runAfterBoth. This version takes a Runnable not caring about the actual values of the preceding computation — only that they're actually done.
CompletableFuture<String> firstSource =  
    CompletableFuture.supplyAsync(this::findByFirstSource);

CompletableFuture<String> secondSource =  
    CompletableFuture.supplyAsync(this::findBySecondSource);

firstSource.acceptEither(secondSource, this::sendMsg);  
As you can see, it's solved easily by acceptEither taking the two awaiting calculations and a Consumer that will be executed with the result of the first one to return.
CompletableFuture.supplyAsync(this::getArticles)  
                 .orTimeout(1, TimeUnit.MINUTES);

CompletableFuture.supplyAsync(this::getArticles)  
                 .completeOnTimeout(getBackupArticles(),1, TimeUnit.SECONDS);
http://www.ibm.com/developerworks/library/j-jvmc2/index.html
ompletableFuture with completion handler
public DistancePair bestMatch(String target) {
    AtomicReference<DistancePair> best = new AtomicReference<>(DistancePair.worstMatch());
    CountDownLatch latch = new CountDownLatch(chunkCheckers.size());
    for (ChunkDistanceChecker checker: chunkCheckers) {
        CompletableFuture.supplyAsync(() -> checker.bestDistance(target))
            .thenAccept(result -> {
                best.accumulateAndGet(result, DistancePair::best);
                latch.countDown();
            });
    }
    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException("Interrupted during calculations", e);
    }
    return best.get();
}
the CountDownLatch is initialized to the number of futures created in the code. As I create each future, I attach a handler (in the form of a lambda instance of the java.util.function.Consumer<T> functional interface) using the CompletableFuture.thenAccept()method. The handler, which executes when the future completes normally, uses the AtomicReference.accumulateAndGet() method (added in Java 8) to update the best value found and then decrements the latch. Meanwhile, the main thread of execution enters the try-catch block and waits for the latch to release. After all the futures have completed, the main thread continues, returning the final best value found.

public DistancePair bestMatch(String target) {
    CompletableFuture<DistancePair> last =
        CompletableFuture.supplyAsync(bestDistanceLambda(0, target));
    for (int i = 1; i < chunkCheckers.size(); i++) {
        last = CompletableFuture.supplyAsync(bestDistanceLambda(i, target))
            .thenCombine(last, DistancePair::best);
    }
    return last.join();
}

private Supplier<DistancePair> bestDistanceLambda(int i, String target) {
    return () -> chunkCheckers.get(i).bestDistance(target);
}
This code uses the CompletableFuture.thenCombine () method to merge two futures by applying a java.util.function.BiFunction(in this case, the DistancePair.best() method) to the two results, returning a future for the result of the function.
Listing 4 is the most concise and perhaps cleanest version of the code, but it has the drawback that it creates an extra layer ofCompletableFutures to represent the combination of each chunk operation with the prior operations. As of the initial Java 8 release, this has the potential to cause a StackOverflowException which is lost within the code, resulting in the final future never completing. The bug is being addressed and should be fixed in a near-term future release.
https://github.com/java8/Java8InAction
https://github.com/java8/Java8InAction/blob/master/src/main/java/lambdasinaction/chap11/BestPriceFinder.java
public Future<Double> getPrice(String product) {
/*
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
                try {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);
                } catch (Exception ex) {
                    futurePrice.completeExceptionally(ex);
                }
    }).start();
    return futurePrice;
*/
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
This Supplier will be run by one of the Executors in the ForkJoinPool, but you can specify a different Executor by passing it as a second argument to the overloaded version of this method.

MAKE YOUR CODE NON-BLOCKING
public List<String> findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> shop.getPrice(product))
            .map(Quote::parse)
            .map(Discount::applyDiscount)
            .collect(Collectors.toList());
}

public List<String> findPricesFuture(String product) {
    List<CompletableFuture<String>> priceFutures = findPricesStream(product)
            .collect(Collectors.<CompletableFuture<String>>toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

public void printPricesStream(String product) {
    long start = System.nanoTime();
    CompletableFuture[] futures = findPricesStream(product)
            .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
            .toArray(size -> new CompletableFuture[size]);
    CompletableFuture.allOf(futures).join();
    System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
}

CompletableFutures have an advantage because, in contrast to what’s offered by the parallel Streams API, they allow you to specify a different Executor to submit their tasks to.

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shops.stream()
             .map(shop -> CompletableFuture.supplyAsync(
                                   () -> shop.getPrice(product), executor))
             .map(future -> future.thenApply(Quote::parse))
             .map(future -> future.thenCompose(quote ->
                      CompletableFuture.supplyAsync(
                          () -> Discount.applyDiscount(quote), executor)));
}

what we can do with CompletableFuture:
  1. Combine several asynchronous operation
  2. Wait for task completion
  3. Listen to Future completion and react to it success or error completion
  4. Chaining results of dependent futures

http://colobu.com/2016/02/29/Java-CompletableFuture/
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法:

Google guava也提供了通用的扩展Future:ListenableFutureSettableFuture 以及辅助
Futures等,方便异步编程。

Scala也提供了简单易用且功能强大的Future/Promise异步编程模式

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。



Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts