Monday, October 17, 2016

Java 8 Concurrency



http://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/
The method updateAndGet()accepts a lambda expression in order to perform arbitrary arithmetic operations upon the integer:
IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });
The method accumulateAndGet() accepts another kind of lambda expression of type IntBinaryOperator. We use this method to sum up all values from 0 to 1000 concurrently in the next sample:


ConcurrentHashMap


The method search() accepts a BiFunction returning a non-null search result for the current key-value pair or null if the current iteration doesn't match the desired search criteria. As soon as a non-null result is returned further processing is suppressed. Keep in mind that ConcurrentHashMap is unordered. The search function should not depend on the actual processing order of the map. If multiple entries of the map match the given search function the result may be non-deterministic.
String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
String result = map.searchValues(1, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() > 3) {
        return value;
    }
    return null;
});
The method reduce() already known from Java 8 Streams accepts two lambda expressions of type BiFunction. The first function transforms each key-value pair into a single value of any type. The second function combines all those transformed values into a single result, ignoring any possible null values.
String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });
Java SE 8 for the Really Impatient
AtomicIntegerArray, AtomicLongArray

compare-and-swap operation
You get the value of the variable, which is the old value of the variable.
You change the value of the variable in a temporal variable, which is the new value of the variable.
You substitute the old value with the new value, if the old value is equal to the actual value of the variable. The old value may be different from the actual value if another thread has changed the value of the variable.

LongAccumulator/DoubleAccumulator are more efficient than AtomicLong/AtomicDouble under high contention.

ConcurrentHashMap
- search, reduce, forEach, compute and merge

public static AtomicLong largest = new AtomicLong();
largest.set(Math.max(largest.get(), observed)); // Error—race condition!
This update is not atomic. Instead, compute the new value and use compareAndSet in a loop:
do {
   oldValue = largest.get();
   newValue = Math.max(oldValue, observed); if(oldValue==newValue) break;
} while (!largest.compareAndSet(oldValue, newValue));

In Java 8, you don’t have to write the loop boilerplate any more.
largest.updateAndGet(x -> Math.max(x, observed));
largest.accumulateAndGet(observed, Math::max);

The accumulateAndGet method takes a binary operator that is used to combine the atomic value and the supplied argument.

A LongAdder is composed of multiple variables whose collective sum is the current value. Multiple threads can update different summands, and new summands are automatically provided when the number of threads increases. This is efficient in the common situation where the value of the sum is not needed until after all work has been done

The LongAccumulator generalizes this idea to an arbitrary accumulation operation. In the constructor, you provide the operation, as well as its neutral element. To incorporate new values, call accumulate. Call get to obtain the current value.

LongAccumulator adder = new LongAccumulator(Long::sum, 0);
// In some thread...
adder.accumulate(value);
Internally, the accumulator has variables a1, a2, ..., an. Each variable is initialized with the neutral element (0 in our example).

When accumulate is called with value v, then one of them is atomically updated as ai = ai op v, where op is the accumulation operation written in infix form. In our example, a call to accumulate computes ai = ai + v for some i.

The result of get is a1 op a2 op ... op an. In our example, that is the sum of the accumulators, a1 + a2 + ... + an.

If you choose a different operation, you can compute maximum or minimum (see Exercise 4). In general, the operation must be associative and commutative. That means that the final result must be independent of the order in which the intermediate values were combined.

map.putIfAbsent(word, new LongAdder()).increment();
map.compute(word, (k, v) -> v == null ? 1 : v + 1);
map.computeIfAbsent(word, k -> new LongAdder()).increment();
That is almost like the call to putIfAbsent that you saw before, but the LongAdder constructor is only called when a new counter is actually needed.

map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);
map.merge(word, 1L, Long::sum);
If the function that is passed to compute or merge returns null, the existing entry is removed from the map.

Bulk Operations
String result = map.search(threshold, (k, v) -> v > 1000 ? k : null);
map.forEach(threshold,
   (k, v) -> System.out.println(k + " -> " + v));
The second variant takes an additional transformer function, which is applied first, and its result is passed to the consumer:
map.forEach(threshold,
  (k, v) -> k + " -> " + v, // Transformer
  System.out::println); // Consumer
Long sum = map.reduceValues(threshold, Long::sum);
Integer maxlength = map.reduceKeys(threshold,
   String::length, // Transformer
   Integer::max); // Accumulator
Long count = map.reduceValues(threshold,
  v -> v > 1000 ? 1L : null,
  Long::sum);
Set<String> words = ConcurrentHashMap.<String>newKeySet();

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts