Friday, December 4, 2015

Understanding ConcurrentHashMap Internal



http://www.infoq.com/cn/articles/ConcurrentHashMap

线程不安全的HashMap

因为多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap,如以下代码
final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    map.put(UUID.randomUUID().toString(), "");
                }
            }, "ftf" + i).start();
        }
    }
}, "ftf");
t.start();
t.join();

锁分段技术

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

ConcurrentHashMap的结构

我们通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组 。
初始化segments数组。让我们来看一下初始化segmentShift,segmentMask和segments数组的源代码。
if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。
初始化segmentShift和segmentMask。这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。
初始化每个Segment。输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。
if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
    ++c;
int cap = 1;
while (cap < c)
    cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
    this.segments[i] = new Segment<K,V>(cap, loadFactor);
上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。

定位Segment

既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
private static int hash(int h) {
        h += (h << 15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h << 3);
        h ^= (h >>> 6);
        h += (h << 2) + (h << 14);
        return h ^ (h >>> 16);
    }
之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。ConcurrentHashMap通过以下哈希算法定位segment。
final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }
默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。

ConcurrentHashMap的get操作

Segment的get操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到segment,再通过哈希算法定位到元素,代码如下:
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}
get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。
transient volatile int count;
volatile V value;
在定位元素的代码里我们可以发现定位HashEntry和定位Segment的哈希算法虽然一样,都与数组的长度减去一相与,但是相与的值不一样,定位Segment使用的是元素的hashcode通过再哈希后得到的值的高位,而定位HashEntry直接使用的是再哈希后的值。其目的是避免两次哈希后的值一样,导致元素虽然在Segment里散列开了,但是却没有在HashEntry里散列开。
hash >>> segmentShift) & segmentMask//定位Segment所使用的hash算法
int index = hash & (tab.length - 1);// 定位HashEntry所使用的hash算法

ConcurrentHashMap的Put操作

由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。
是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

ConcurrentHashMap的size操作

如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。 因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
forEach(parallelismThreshold, action)
forEachEntry(parallelismThreshold, action):
forEachKey(parallelismThreshold, action)
forEachValue(parallelismThreshold, action)
Current implementation uses the common ForkJoinPool instance to execute the parallel tasks.

The search() method will return the first non-null value returned by the search function.
reduce
BiFunction<String, List<ExtendedProduct>, List<ExtendedProduct>> transformer = (key, value) -> value.stream().filter(product -> product.getValue() == 1).collect(Collectors.toList());
BinaryOperator<List<ExtendedProduct>> reducer = (list1, list2) ->{
        list1.addAll(list2);
        return list1;
};
List<ExtendedProduct> badReviews=productsByBuyer.reduce(10, transformer, reducer);

ConcurrentHashMap<String, LongAdder> counter=new ConcurrentHashMap<>();
badReviews.forEach(product -> {
    counter.computeIfAbsent(product.getTitle(), title -> new LongAdder()).increment();
});
counter.forEach((title, count) -> {
    System.out.println(title+":"+count);
});

merge() method that allows you to merge a (key, value) pair into the map. If the key doesn't exist in ConcurrentHashMap, it is inserted directly. If the key exists, you have to define which will be the new value associated with that key from the old one and the new one
products1996.forEach(10,(product, reviews) -> {
    products1995.merge(product, reviews, (reviews1, reviews2) -> {
        System.out.println("Merge for: "+product.getAsin());
        reviews1.addAll(reviews2);
        return reviews1;
    });
});

productList.removeIf(product -> product.getSalesrank() > 1000);

http://www.iteye.com/topic/344876
http://www.mianwww.com/html/2012/11/17092.html
如何设计线程安全的 HashMap?
锁分离 (Lock Stripping)
ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。
有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。不变性是多线程编程占有很重要的地位,下面还要谈到。
    /**
     * The segments, each of which is a specialized hash table
     */
final Segment<K,V>[] segments;
不变(Immutable)和易变(Volatile)
ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:
    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }
可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
其它
为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。但是我们也不要忘记《算法导论》给我们的教训:hash槽的的个数不应该是2^n,这可能导致hash槽分配不均,这需要对hash值重新再hash一次。(这段似乎有点多余了 )
这是重新hash的算法,还比较复杂,我也懒得去理解了。
    private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h <<  15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h <<   3);
h ^= (h >>>  6);
h += (h <<   2) + (h << 14);
return h ^ (h >>> 16);
}
这是定位段的方法:
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
数据结构
关于Hash表的基础数据结构,这里不想做过多的探讨。Hash表的一个很重要方面就是如何解决hash冲突,ConcurrentHashMap和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不同的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的数据成员:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/**
* Mask value for indexing into segments. The upper bits of a
* key’s hash code are used to choose the segment.
*/
final int segmentMask;

/**
* Shift value for indexing within segments.
*/
final int segmentShift;

/**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments;
}
所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。
每个Segment相当于一个子Hash表,它的数据成员如下:
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* The number of elements in this segment’s region.
*/
transient volatile int count;

/**
* Number of updates that alter the size of the table. This is
* used during bulk-read methods to make sure they see a
* consistent snapshot: If modCounts change during a traversal
* of segments computing size or checking containsValue, then
* we might have an inconsistent view of state so (usually)
* must retry.
*/
transient int modCount;

/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;

/**
* The per-segment table.
*/
transient volatile HashEntry<K,V>[] table;

/**
* The load factor for the hash table.  Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor;
}
count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步。loadFactor表示负载因子。
实现细节
修改操作
先来看下删除操作remove(key)。
    public V remove(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);
}
整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现:
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count – 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length – 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}
整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。下面是个示意图,我直接从这个网站 上复制的(画这样的图实在是太麻烦了,如果哪位有好的画图工具,可以推荐一下)。


删除元素之前:




删除元素3之后:


第二个图其实有点问题,复制的结点中应该是值为2的结点在前面,值为1的结点在后面,也就是刚好和原来结点顺序相反,还好这不影响我们的讨论。

整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。


接下来看put操作,同样地put操作也是委托给段的put方法。下面是段的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length – 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;

V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
该方法也是在持有段锁的情况下执行的,首先判断是否需要rehash,需要就先rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n,这里就不介绍了。
修改操作还有putAll和replace。putAll就是多次调用put方法,没什么好说的。replace甚至不用做结构上的更改,实现要比put和delete要简单得多,理解了put和delete,理解replace就不在话下了,这里也不介绍了。
获取操作
首先看下get操作,同样ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:
        V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。
最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。这种情况应当很罕见,一旦发生这种情况,ConcurrentHashMap采取的方式是在持有锁的情况下再读一遍,这能够保证读到最新的值,并且一定不会为空值。
        V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
另一个操作是containsKey,这个实现就要简单得多了,因为它不需要读取值:
boolean containsKey(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key))
return true;
e = e.next;
}
}
return false;
}
跨段操作
有些操作需要涉及到多个段,比如说size(), containsValaue()。先来看下size()方法:
    public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum)
break;
}
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}
size方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。
其实这种方式是存在问题的,在第一个内层for循环中,在这两条语句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之间,其它线程可能正在对Segment进行结构性的修改,导致segments[i].count和segments[i].modCount读取的数据并不一致。这可能使size()方法返回任何时候都不曾存在的大小,很奇怪javadoc居然没有明确标出这一点,可能是因为这个时间窗口太小了吧。size()的实现还有一点需要注意,必须要先segments[i].count,才能segments[i].modCount,这是因为segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能得到几乎最新的值(前面我已经说了为什么只是“几乎”了)。这点在containsValue方法中得到了淋漓尽致的展现:
    public boolean containsValue(Object value) {
if (value == null)
throw new NullPointerException();

// See explanation of modCount use above

final Segment<K,V>[] segments = this.segments;
int[] mc = new int[segments.length];

// Try a few times without locking
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
int sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
int c = segments[i].count;
mcsum += mc[i] = segments[i].modCount;
if (segments[i].containsValue(value))
return true;
}
boolean cleanSweep = true;
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
int c = segments[i].count;
if (mc[i] != segments[i].modCount) {
cleanSweep = false;
break;
}
}
}
if (cleanSweep)
return false;
}
// Resort to locking all segments
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
boolean found = false;
try {
for (int i = 0; i < segments.length; ++i) {
if (segments[i].containsValue(value)) {
found = true;
break;
}
}
} finally {
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
return found;
}
同样注意内层的第一个for循环,里面有语句int c = segments[i].count; 但是c却从来没有被使用过,即使如此,编译器也不能做优化将这条语句去掉,因为存在对volatile变量count的读取,这条语句存在的唯一目的就是保证segments[i].modCount读取到几乎最新的值。关于containsValue方法的其它部分就不分析了,它和size方法差不多。
跨段方法中还有一个isEmpty()方法,其实现比size()方法还要简单,也不介绍了。最后简单地介绍下迭代方法,如keySet(), values(), entrySet()方法,这些方法都返回相应的迭代器,所有迭代器都继承于Hash_Iterator类(提交时居然提醒我不能包含sh It,只得加了下划线),里实现了主要的方法。其结构是:
    abstract class Hash_Iterator{
int nextSegmentIndex;
int nextTableIndex;
HashEntry<K,V>[] currentTable;
HashEntry<K, V> nextEntry;
HashEntry<K, V> lastReturned;
}
nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex对应段中中hash链的索引,currentTable是nextSegmentIndex对应段的table。调用next方法时主要是调用了advance方法:
        final void advance() {
if (nextEntry != null && (nextEntry = nextEntry.next) != null)
return;

while (nextTableIndex >= 0) {
if ( (nextEntry = currentTable[nextTableIndex--]) != null)
return;
}

while (nextSegmentIndex >= 0) {
Segment<K,V> seg = segments[nextSegmentIndex--];
if (seg.count != 0) {
currentTable = seg.table;
for (int j = currentTable.length – 1; j >= 0; –j) {
if ( (nextEntry = currentTable[j]) != null) {
nextTableIndex = j – 1;
return;
}
}
}
}
}
不想再多介绍了,唯一需要注意的是跳到下一个段时,一定要先读取下一个段的count变量。
这种迭代方式的主要效果是不会抛出ConcurrentModificationException。一旦获取到下一个段的table,也就意味着这个段的头结点在迭代过程中就确定了,在迭代过程中就不能反映对这个段节点并发的删除和添加,对于节点的更新是能够反映的,因为节点的值是一个volatile变量。

从volatile解读ConcurrentHashMap(jdk1.6.0)无锁读
volatile常常用于修饰多线程共享变量,用来保证该变量的可见性。volatile的语意:某个写线程对volatile变量的写入马上可以被后续的某个读线程“看”到。
volatile保证可见性的原理:volatile是通过在编译器生成字节码时,在对volatile变量进行读写指令序列的前后加入内存屏障,来禁止一些处理器重排序保证写入一定发生在读之前的这种happen-before关系。

简单理解:在本次线程内,当读取一个变量时,为提高存取速度,编译器优化时有时会先把变量读取到一个线程本地内存中;以后再取变量值时,就直接从本地内存中取值;当变量值在本线程里改变时,会同时把变量的新值copy到本地内存中,以便保持一致;在某个特定的时候,将本地内存的更改写到系统主内存中去;当变量在因别的线程等而改变了值,并且该变化没有写到系统主内存,本次线程的本地内存中的值不会相应改变,从而造成应用程序读取的值和实际的变量值不一致;但是当变量被volatile修饰后,每次更改该变量的时候会将更改结果写到系统主内存中,利用多处理器的缓存一致性,其他处理器会发现自己的缓存行对应的内存地址被修改,就会将自己处理器的缓存行设置为失效,并强制从系统主内存获取最新的数据。这样就能保证即使在别的线程中改变了该变量的值,在本线程中也能取到最新更改后的值。 ConcurrentHashMap之所以有较好的并发性是因为ConcurrentHashMap是无锁读和加锁写,并且利用了分段锁(不是在所有的entry上加锁,而是在一部分entry上加锁)。
那ConcurrentHashMap是怎么实现无锁读的呢?
这是在jdk1.6.0中的读的实现。
当执行读的时候,先判断count,count就是一个Segment(充当锁的角色)所守护HashEntry的数量。

    这里的count是被volatile修饰的。当对这段表的结构进行更改时,在退出前都会去更改count。由于volatile的语意:某个写线程对volatile变量的写入马上可以被后续的某个读线程“看”到,所以这里对count的读一定发生在对count写之后,获得是最新的count。在无锁读的方法中,首先去读取这个最近的count,保证了在执行无锁读的时候表的结构没有被改变。(利用了volatile变量写读的happen-before关系)。


   同时当把value设置为volatile时,其他线程所做的改变就能马上被当前线程感知。这样就能支持多个线程并发读了~

   不过我们也知道volatile并不能保证线程安全,它是轻量级的synchronized。

   要使 volatile变量提供理想的线程安全,必须同时满足下面两个条件:
              ● 对变量的写操作不依赖于当前值。
              ● 该变量没有包含在具有其他变量的不变式中。
举例:线程安全计数器的自增操作,其实是由3个操作读取-修改-写入操作序列组成的组合操作,volatile不能保证原子性,不能保证在操作期间该变量的值不会改变。
   其实这是一种常见的volatile的利用场景——开销较低的读-写锁策略。如果读操作远远超过写操作,您可以结合使用内部锁和 volatile变量来减少公共代码路径的开销。这样读操作只是volatile读操作,性能优于一个无竞争的锁获取的开销。但是当需要对该变量执行写操作,应该加锁。

  PS:这里ConcurrentHashMap也有加锁读的情况。利用方法  V readValueUnderLock(HashEntry<K,V> e)。只有value为空的时候,才会加锁读,这种情况就是编译器对value的赋值操作进行重排序了。
        V readValueUnderLock(HashEntry<K,V> e) {
            lock();
            try {
                return e.value;
            } finally {
                unlock();
            }
        }

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts