Tuesday, May 10, 2016

Counting Word - Multi-Thread - ConcurrentHashMap



http://howtodoinjava.com/core-java/collections/best-practices-for-using-concurrenthashmap/
Fully parametrized constructor of ConcurrentHashMap takes 3 parameters, initialCapacity, loadFactor and concurrencyLevel.
1) initialCapacity
2) loadFactor
3) concurrencyLevel
First two are fairly simple as their name implies but last one is tricky part. This denotes the number of shards. It is used to divide the ConcurrentHashMap internally into this number of partitions and equal number of threads are created to maintain thread safety maintained at shard level.
ConcurrentHashMap
ConcurrentHashMap
 In most cases in normal application, a single shard is able to handle multiple threads with reasonable count of key-value pairs. And performance will be also optimal. Having multiple shards just makes the things complex internally and introduces a lot of un-necessary objects for garbage collection, and all this for no performance improvement.
The extra objects created per concurrent hashmap using default constructor are normally in ratio of 1 to 50 i.e. for 100 such instance of ConcurrentHashMap, there will be 5000 extra objects created.
Based on above, I will suggest to use the constructor parameters wisely to reduce the number of unnecessary objects and improving the performance.
A good approach can be having initialization like this:
ConcurrentHashMap<String, Integer> instance = new ConcurrentHashMap<String, Integer>(16, 0.9f, 1);
An initial capacity of 16 ensures a reasonably good number of elements before resizing happens. Load factor of 0.9 ensures a dense packaging inside ConcurrentHashMap which will optimize memory use. And concurrencyLevel set to 1 will ensure that only one shard is created and maintained.

The default value of “concurrencyLevel” is 16. It means 16 shards whenever we create an instance of ConcurrentHashMap using default constructor, before even adding first key-value pair. It also means the creation of instances for various inner classes like ConcurrentHashMap$Segment, ConcurrentHashMap$HashEntry[] and ReentrantLock$NonfairSync.
Please note that if you are working on very high concurrent application with very high frequency of updates in ConcurrentHashMap, you should consider increasing the concurrencyLevel more than 1, but again it should be a well calculated number to get the best results
https://dimitrisli.wordpress.com/2014/02/06/concurrenthashmap-computeifabsent-method-in-java-8/
The very nifty method computeIfAbsent has been added in the ConcurrentMap interface in Java 8 as part of the atomic operations of the ConcurrentMap interface. It’s more precisely a default method that provides an alternative to what we use to code ourselves:
1
2
3
4
5
6
if (map.get(key) == null) {
   V newValue = mappingFunction.apply(key);
   if (newValue != null)
      return map.putIfAbsent(key, newValue);
   }
}
but this time providing a function as a second argument.
private final Map counters = new ConcurrentHashMap();
 
private void accumulate(String name) {
    counters.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet();
}

http://blog.takipi.com/java-8-longadders-the-fastest-way-to-add-numbers-concurrently/
Volatile – this fairly misunderstood keyword essentially instructs the JIT compiler to de-optimize the run-time machine code, so that any modification to the field is immediately seen by other threads.
This invalidates some of the JIT compiler favorite optimizations of playing with the order in which assignments are applied to memory. Come again you say? You heard me. The JIT compiler may change the order in which assignments to fields are made. This arcane little strategy (also known as happens-before) allows it to minimize the number of times the program needs to access global heap, while still making sure your code is unaffected by it. Pretty sneaky…
So when should I use volatile counters? If you have just one thread updating a value and multiple threads consuming it, this is a really good strategy – no contention at all.
So why not use it always you ask? Because this doesn’t work well when more than one thread is updating the field. Since A += B is not atomic, you’re running a risk of overriding somebody else’s write. 
AtomicInteger – this set of classes uses CAS (compare-and-swap) processor instructions to update the value of the counter. Sounds great, doesn’t it? Well, yes and no. This works well as it utilizes a direct machine code instruction to set the value with minimum effect on the execution of other threads. The downside is that if it fails to set the value due to a race with another thread, it has to try again. Under high contention this can turn into a spin lock, where the thread has to continuously try and set the value in an infinite loop, until it succeeds. 
Java 8 Adders – this is such a cool new API I just can’t stop gushing about it ♥♥♥. From a usage perspective it’s very similar to an AtomicInteger. Simply create a LongAdder and use intValue() and add() to get / set the value. The magic happens behind the scenes.
What this class does is when a straight CAS fails due to contention, it stores the delta in an internal cell object allocated for that thread. It then adds the value of pending cells to the sum when intValue() is called. This reduces the need to go back and CAS or block other threads. Pretty smart stuff!
With Java 8 this problem is solved at the framework level with new concurrent accumulator classes that enable you to very efficiently increase / decrease the value of a counter in a thread safe manner


LongAdder:
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.

https://www.goodreads.com/author/quotes/73409.Brian_Goetz
https://github.com/mission-peace/interview/blob/master/src/com/interview/multithreaded/CountingWord.java
* Write a program to count words. This program should be threadsafe.
* Implement two apis
* 1) void addWord(String word) -> increment count of this word
* 2) long getCount(String word) -> get count of this word
*
* Solution
* Keep a concurrent map. Key to this map should be word while value should be AtomicLong to update it
* in threadsafe way
*
* Test cases
* One word updated by many threads
* Many words updated by many threads
public class CountingWord {

   private ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();

   public void addWord(String word){
       AtomicLong l = map.get(word);
       if(l == null){
           l = new AtomicLong(1);
           l = map.putIfAbsent(word, l);
           if(l != null){
               l.incrementAndGet();
           }
       }else{
           l.incrementAndGet();
       }
   }
 
   public long getCount(String word){
       AtomicLong l = map.get(word);
       if(l != null){
           return l.longValue();
       }
       return 0;
   }
}

http://stackoverflow.com/questions/3339801/atomically-incrementing-counters-stored-in-concurrenthashmap
ConcurrentHashMap<String, LongAdder> map = new ConcurrentHashMap<>();
map.computeIfAbsent("key", k -> new LongAdder()).increment();

private ConcurrentHashMap<String,AtomicInteger> counters = new ConcurrentHashMap<String, AtomicInteger>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();

public void count(String invoker) {

    rwLock.readLock().lock();

    try{
        AtomicInteger currentValue = counters.get(invoker);
        // if entry is absent - initialize it. If other thread has added value before - we will yield and not replace existing value
        if(currentValue == null){
            // value we want to init with
            AtomicInteger newValue = new AtomicInteger(0);
            // try to put and get old
            AtomicInteger oldValue = counters.putIfAbsent(invoker, newValue);
            // if old value not null - our insertion failed, lets use old value as it's in the map
            // if old value is null - our value was inserted - lets use it
            currentValue = oldValue != null ? oldValue : newValue;
        }

        // counter +1
        currentValue.incrementAndGet();
    }finally {
        rwLock.readLock().unlock();
    }

}

/**
 * @return Map with counting results
 */
public Map<String, Integer> getCount() {
    // stop all updates (readlocks)
    rwLock.writeLock().lock();
    try{
        HashMap<String, Integer> resultMap = new HashMap<String, Integer>();
        // read all Integers to a new map
        for(Map.Entry<String,AtomicInteger> entry: counters.entrySet()){
            resultMap.put(entry.getKey(), entry.getValue().intValue());
        }
        // reset ConcurrentMap
        counters.clear();
        return resultMap;

    }finally {
        rwLock.writeLock().unlock();
    }

}

Guava's new AtomicLongMap (in release 11) might address this need.
If you invoke concurrentHashMap.get(key) and it returns an object, that object is guaranteed to be fully initialized. Each put (or putIfAbsent) will obtain a bucket specific lock and will append the element to the bucket's entries.
Now you may go through the code and notice that the get method doesnt obtain this same lock. So you can argue that there can be an out of date read, that isn't true either. The reason here is that value within the entry itself is volatile. So you will be sure to get the most up to date read.
putIfAbsent method in ConcurrentHashMap is check-if-absent-then-set method. It's an atomic operation. But to answer the following part: "Then does this ensures that myObj would be 100% intialiased when another thread does a cHashM.get() ", it would depend on when the object is put into the HashMap. Usually there is a happens-before precedence, i.e., if the caller gets first before the object is placed in the map, then null would be returned, else the value would be returned.
http://www.javamex.com/tutorials/synchronization_concurrency_8_hashmap2.shtml
  private final Map<String,Integer> queryCounts =
    Collections.synchronizedMap(new HashMap<String,Integer>(1000));

  private void incrementCount(String q) {
    Integer cnt = queryCounts.get(q);
    if (cnt == null) {
      queryCounts.put(q, 1);
    } else {
      queryCounts.put(q, cnt + 1);
    }
  }
}

Recall that wrapping the map with Collections.synchronizedMap(...) makes it safe to access the map concurrently: each call to get()put()size()containsKey() etc will synchronize on the map during the call. (One problem that we'll see in a minute is that iteratingover the map does still require explicit synchronization.)
Note that this doesn't make incrementCount() atomic, but it does make it safe. That is, concurrent calls to incrementCount() will never leave the map in a corrupted state. But they might 'miss a count' from time to time. For example, two threads could concurrently read a current value of, say, 2 for a particular query, both independently increment it to 3, and both set it to 3, when in fact two queries have been made

public final class MyServlet extends MyAbstractServlet {
  private final ConcurrentMap<String,Integer> queryCounts =
    new ConcurrentHashMap<String,Integer>(1000);

  private void incrementCount(String q) {
    Integer oldVal, newVal;
    do {
      oldVal = queryCounts.get(q);
      newVal = (oldVal == null) ? 1 : (oldVal + 1);
    } while (!queryCounts.replace(q, oldVal, newVal));
  }
}


http://www.javamex.com/tutorials/synchronization_concurrency_8_hashmap3_iteration.shtml
An additional benefit of ConcurrentHashMap is that we can iterate over the map without locking. (Indeed, it is not actually possible to lock a ConcurrentHashMap during this or any other operation.)
Recall that with an ordinary HashMap– even one wrapped in a Collections.synchronizedMap(...) wrapper!– iteration over the map must occur whilst synchronized on the map in order to be thread-safe. If, due to incorrect synchronization, one thread does update the map whilst another is iterating over it, one of the threads is liable to throw a ConcurrentModificationException.
In contrast, whilst one or more threads is iterating over a ConcurrntHashMap, other threads can concurrently access the map. Such operations will never throw a ConcurrentModificationException. In this case, the thread that is iterating over the map is not guaranteed to "see" updates since the iteration began, but it will still see the map in a "safe state", reflecting at the very least the state of the map at the time iteration began. This is both good news and bad news:
  • Good news: it is perfect for cases where we want iteration not to affect concurrency, at the expense of possibly missing an update while iterating (e.g. in our imaginary web server, while iterating in order to persist the current query counts to a database: we probably wouldn't care about missing the odd count);
  • Bad news: because there's no way to completely lock a ConcurrentHashMap, there's no easy option for taking a "snapshot" of the map as a truly atomic operation.
https://ria101.wordpress.com/2011/12/12/concurrenthashmap-avoid-a-common-misuse/
ConcurrentHashMap is often introduced to simplify code and application logic. For example:
HashMap<String, MyClass> m = new HashMap<String, MyClass>();
...
synchronized (m) {
    for each (Entry<String, MyClass> e in m.entrySet())
        system.out.println(e.getKey()+"="+e.getValue());
}
might be replaced with the following so long as a consistent snapshot is not required:
ConcurrentHashMap<String, MyClass> m = new ConcurrentHashMap<String, MyClass>();
...
for each (Entry<String, MyClass> e in m.entrySet())
    system.out.println(e.getKey()+"="+e.getValue());
More often though, ConcurrentHashMap is introduced to improve performance: internally ConcurrentHashMap allows concurrent threads to read values without locking at all, except in a minority of cases, and for concurrent writers to add and update values while only acquiring locks over localised segments of the internal data structure. Read operations are therefore mostly synchronization free and have greatly improved performance, and if there are many concurrent writers then lock contention will be reduced improving performance even further.
there is unfortunately a big caveat that 99% have missed: The designers of ConcurrentHashMap originally conceived the class for occasional use in in high concurrency areas at the center of systems and the default construction options reflect this!!
Specifically, the fully parametized constructor of ConcurrentHashMap has 3 parameters, initialCapacity, loadFactor and concurrencyLevel and this last one can cause problems.
Recall that ConcurrentHashMap shards its data into segments to reduce writer lock contention. Well, in actual fact the concurrencyLevel parameter directly specifies the number of shards that are created by the class internally. The idea is that the number of shards should equal the number of concurrent writer threads that you normally see. And, the default value for concurrencyLevel is 16!
http://www.cs.umd.edu/class/fall2013/cmsc433/examples/wordcount/WordCountParallel.java

http://ashkrit.blogspot.com/2014/12/what-is-new-in-java8-concurrenthashmap.html

return jdk8UrlContent.computeIfAbsent(s, (key) -> {
     return loadUrlContent(key);
 });

One of the most common problem with maps is when values of same keys needs to merged, one of the example is you have map of word & count.
Every time word is added to map and if it exists then count needs to be incremented
Pre-jdk8
public void mergeValue_jdk7(String word, int occurrence) {
    Integer count = wordMap.get(word);
    if(count==0) {
        count=0;
    }
    wordMap.put(word,count+occurrence);
}

public void mergeValue_jdk8(String word, int occurrence) {
   wordMap.merge(word, occurrence, (key, value) -> wordMap.getOrDefault(key, 0) + value);
}
This is also very common use case where single value is derived from values in map for eg total number of words
public int jdk7TotalWords() {
   int totalWords=0;
   for(Map.Entry<String,Integer> entry : wordMap.entrySet()) {
       totalWords+=entry.getValue();
   }
   return totalWords;
}

public int jdk8TotalWords() {
  return wordMap.reduceValues(1,(count1,count2) -> count1+count2);
}

One thing to note is that all the parallel function are executed on common forkjoin pool, there is way by which you can provide your own pool to execute task.
public void loop_jdk8() {
 urlContent.forEach((key, value) -> {
     System.out.println(Thread.currentThread().getName() + " Key " + key + " Value " + value);
 });
}

public void loop_jdk8_parallel() {
 urlContent.forEach(10,(key, value) -> {
     System.out.println(Thread.currentThread().getName() + " Key " + key + " Value " + value);
 });
}

https://blog.jooq.org/2014/02/14/java-8-friday-goodies-map-enhancements/ HashMap
map.forEach((k, v) ->
    System.out.println(k + "=" + v));
Often, we fetch a value from a map, make some calculations on it and put it back into the map. This can be verbose and hard to get right if concurrency is involved. With Java 8, we can pass a BiFunction to the new compute()computeIfAbsent(), or computeIfPresent() methods and have the Map implementation handle the semantics of replacing a value.
Map<String, Integer> map = new HashMap<>();
map.put("A", 1);
map.put("B", 2);
map.put("C", 3);
 
// Compute a new value for the existing key
System.out.println(map.compute("A",
    (k, v) -> v == null ? 42 : v + 41));
System.out.println(map);
 
// This will add a new (key, value) pair
System.out.println(map.compute("X",
    (k, v) -> v == null ? 42 : v + 41));
System.out.println(map);


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts