澳门新浦京app下载带你读 ConcurrentHashMap 1.7 1.8

澳门新浦京app下载 5

总体描述:

concurrentHashmap是为了高并发而实现,内部采用分离锁的设计,有效地避开了热点访问。而对于每个分段,ConcurrentHashmap采用final和内存可见修饰符Volatile澳门新浦京app下载 ,关键字(内存立即可见:Java
的内存模型可以保证:某个写线程对 value
域的写入马上可以被后续的某个读线程“看”到。注:并不能保证对volatile变量状态有依赖的其他操作的原子性)

借用某博客对concurrentHashmap对结构图:

澳门新浦京app下载 1

不难看出,concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶中。

为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。

1.
Concurrent相关历史

概述:

ConcurrentHashmap是java并发中重要的类,用来替代HashTable,实现可并发的hashtalbe。我将以1.7和1.8中的实现,分别为大家讲解ConcurrentHashMap的关键数据结构,和关键函数,看看他到底是如何实现并发的。

代码实现:

该数据结构中,最核心的部分是两个内部类,HashEntry和Segment

concurrentHashmap维护一个segment数组,将元素分成若干段(第一次hash)

/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;

segments的每一个segment维护一个链表数组

代码:

再来看看构造方法

public ConcurrentHashMap(int initialCapacity,
    float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
    throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
    concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
    ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
    cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 =
    new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
    (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

代码28行,一旦指定了concurrencyLevel(segments数组大小)便不能改变,这样,一旦threshold超标,rehash真不会影响segments数组,这样,在大并发的情况下,只会影响某一个segment的rehash而其他segment不会受到影响

(put方法都要上锁)

JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器(bins)时,吞吐量严重降低。因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了Java.util.concurrent包,在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点挺符合CAP理论的)。与Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的并发容器主要解决了两个问题: 

JDK1.7中的实现

JDK1.7
中的ConcurrentHashMap采用了分段锁的设计,先来看一下它的数据结构。ConcurrentHashMap中含有一个<b>segment</b>数组。每个segment中又含有一个HashEntry数组。即每个segment中含有一个完整的hashtable。
来看看上述segment结构的定义:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;
        final float loadFactor;
        ... ...
}

去掉方法,可以看到几个熟悉的域。HashEntry(哈希数组),threshold(扩容阈值),loadFactor(负载因子)表示segment是一个完整的hashmap。
接下来我们看看ConcurrentHashMap的构造函数

   public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel)

三个参数分别代表了:
初始容量:初始容量表示所有的segment数组中,一共含有多少个hashentry。若initialCapacity不为2的幂,会取一个大于initialCapacity的2的幂。
负载因子:默认0.75。
并发级别:可以同时允许多少个线程并发。concurrencyLevel为多少,就有多少个segment,当然也会取一个大于等于这个值的2的幂。

接下来我们看一下ConcurrentHashMap中的几个关键函数,put, get,
rehash(扩容), size方法,看看他是如何实现并发的。

HashEntry

与hashmap类似,concurrentHashmap也采用了链表作为每个hash桶中的元素,不过concurrentHashmap又有些不同

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
    this.hash = hash;
    this.key = key;
    this.value = value;
    this.next = next;
    }

    /**
    * Sets next field with volatile write semantics. (See above
    * about use of putOrderedObject.)
    */
    final void setNext(HashEntry<K,V> n) {
    UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class k = HashEntry.class;
    nextOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("next"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }
}

HashEntry的key,hash采用final,可以避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并发性能灾难,新版(jdk1.7)的concurrentHashmap大量使用java
Unsafe类提供的原子操作,直接调用底层操作系统,提高性能(这块我也不是特别清楚)

1)根据具体场景进行设计,尽量避免synchronized,提供并发性。

put 方法的实现

现在来跟踪一个put方法的调用过程

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

这里注意几点:
1.key和value都不能为空,value有判断,key值在计算hashcode时候,也会有判断。
2.首先并发度算出一个key值的segment号,例如并发度为16,那么hash值得高4位会被用来算segment号。
3.找到对应的segment,调用segment的put方法。

接着看看segment的put方法:

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

流程大概是这样:
1.先对该segment进行加锁
2.找到hashentry数组中,对应的位置(索引)。用了unsafe的方法,保证得到数组元素的原子性。
3.如果找到了对应的key,修改这个值。因为value是volatile,没有使用unsafe的方法。

可以看到,put方法是需要加锁的,这个时候加的是分段锁。

get方法(1.6 vs 1.7)

1.6

V get(Object key, int hash) { 
    if (count != 0) { // read-volatile 
    HashEntry<K,V> e = getFirst(hash); 
    while (e != null) { 
    if (e.hash == hash && key.equals(e.key)) { 
    V v = e.value; 
    if (v != null) 
    return v; 
    return readValueUnderLock(e); // recheck 
    } 
    e = e.next; 
    } 
    } 
    return null; 
}

1.6的jdk采用了乐观锁的方式处理了get方法,在get的时候put方法正在new对象,而此时value并未赋值,这时判断为空则加锁访问

1.7

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
    (tab = s.table) != null) {
    for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    e != null; e = e.next) {
    K k;
    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    return e.value;
    }
    }
    return null;
}

1.7并没有判断value=null的情况,不知为何

跟同事沟通过,无论是1.6还是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提升,但同时也带来数据的弱一致性,如果你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中一样二次加锁获取)

这篇文章可以很好地解释弱一致性问题

2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。

get方法

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

可以看到,get方法采用了unsafe的方法,来保证线程安全,并且这里没有加锁。看了几个版本的get方法,均没有加锁,都使用各种方式来实现线程安全。

put方法

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

对于put,concurrentHashmap采用自旋锁的方式,不同于1.6的直接获取锁

注:个人理解,这里采用自旋锁可能作者是觉得在分段锁的状态下,并发的可能本来就比较小,并且锁占用时间又并不是特别长,因此自旋锁可以减小线程唤醒和切换的开销

util.concurrent中容器在迭代时,可以不封装在synchronized中,可以保证不抛异常,但是未必每次看到的都是”最新的、当前的”数据。

rehash方法:

当segment中的count(指的是segment数组中使用的个数)大于threshold的时候,需要使用rehash方法来扩容。扩容操作也在put的锁中,保证线程安全。

关于hash

private int hash(Object k) {
        int h = hashSeed;
        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }
        h ^= k.hashCode();
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

concurrentHashMap采用本身hashcode的同时,采用Wang/Jenkins算法对每位都做了处理,使得发生hash冲突的可能性大大减小(否则效率会很差)

而对于concurrentHashMap,segments的大小在初始时确定,此后不变,而元素所在segments桶序列由hash的高位决定

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

segmentShift为(32-segments大小的二进制长度)

并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,它的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响,无论对于Java并发编程的学习还是Java内存模型的理解,ConcurrentHashMap的设计以及源码都值得非常仔细的阅读与揣摩。

size方法:

    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

size()方法会尝试三次去获取总个数,如果其中任意连续两次的modcount相等,则,不同加锁,返回个数。若都不相等,则加锁去算出个数。

总结

concurrentHashmap主要是为并发设计,与Collections的包装不同,他不是采用全同步的方式,而是采用非锁get方式,通过数据的弱一致性带来性能上的大幅提升,同时采用分段锁的策略,提高并发能力

2.
对比说明

JDK1.8 中的实现

JDK1.8中,出现了较大的改动。没有使用段锁,改成了Node数组 + 链表 +
红黑树的方式。

其中有个重要的变量:sizeCtl

  • 负数表示正在进行初始化或者扩容,-1表示正在初始化,-N表示有N –
    1个线程正在扩容
  • 正数0,表示还没有被初始化。其他正数表示下一次扩容的大小。

我们先来看一下核心数据结构:

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}

还有两个数据结构TreeNode、TreeBin,用来当链表的大小超过阈值的时候,将链表变作红黑树。

ForwardingNode,扩容用到的链表,扩容时,若遍历到了这个node,表示有其他在对这个node进行扩容。跳过即可。

HashTable是一个线程安全的类,它使用synchronized来锁住整张Hash表来实现线程安全,即每次锁住整张表让线程独占,相当于所有线程进行读写时都去竞争一把锁,导致效率非常低下。ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的Hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

CAS 操作

这一版本大量使用了CAS操作。所谓的CAS就是,比较内存对应的区域的值,和期望值是不是相等,如果相等,就设置一个新的值进去。
一般是这样使用,先获取对象中的某个域的值,并以这个值为期望值去调用CAS算法。

ConcurrentHashMap中有三个核心的CAS操作。
tabAt:获得数组中位置i上的节点
casTabAt:设置数组位置i上的节点
setTabAt:利用volatile设置位置i上的节点。

接下来介绍几个重点方法:

有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

initTable

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

在put方法调用时,回去判断table是不是为null,若为null就调用initTable去初始化。调用initTable会判断sizeCtl的值,若正在初始化,会调用yield()去等待。若为0,这是先调用CAS算法去设置为-1,再初始化。
初始化是单线程操作。

3.
CouncurrentHashMap实现原理

扩容方法transfer

当ConcurrentHashMap中数组使用的数量大于了阈值,这时需要进行扩容。这里的扩容分为两个部分:
1.单线程处理,新建一个newTable。容量变为原来的两倍。
2.多线程处理,把原来table中的数据拷贝到newTable中。

多线程处理的逻辑:

  • 利用tabAt获取i的位置。
  • 如果这个位置为空,不用加锁,将它设置为forward节点。
  • 若为普通的Node节点,则加锁,就分析,并把他们放到newTable的i和i+n的位置上,处理完后将这个节点设置为forward。

ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类HashTable的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构详情图:

put方法

逻辑很复杂,就不贴代码了,我说一下流程。
1.若table==null,则调用上面说过的初始化。
2.若table中i位置为空,则不用加锁,新建一个node,以cas的方式放入。
3.若该Node,正在扩容,则需要帮助扩容,调用helpTransfer函数。
4.若是正常的添加,此时需要加锁。用synchronized,若存在碰撞,需要判断的,当链表的长度大于8,将链表改为红黑树。

澳门新浦京app下载 2

get方法

get方法不用加锁。利用CAS操作,可以达到无锁的访问。当map正在扩容的时候,forwardnode也提供了find()方法,保证正在扩容的时候,也能找到对应的value。

 

size()操作也只是一个大概的值。

concurrentHashMap是弱一致的,不能保证get,size得到正确的值。

从宏观上来看,大体结构图可以简单描绘为如下:

澳门新浦京app下载 3

  不难看出,ConcurrentHashMap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶(bucket)中。

  为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。

 4.
源代码解析

Segment

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;    //Segment中元素的数量
    transient int modCount;          //对table的大小造成影响的操作的数量(比如put或者remove操作)
    transient int threshold;        //阈值,Segment里面元素的数量超过这个值那么就会对Segment进行扩容
    final float loadFactor;         //负载因子,用于确定threshold
    transient volatile HashEntry<K,V>[] table;    //链表数组,数组中的每一个元素代表了一个链表的头部
}

HashEntry

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

  可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现ConcurrentModification的情况。

ConcurrentHashMap构造函数

 1 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
 2     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 3         throw new IllegalArgumentException();
 4   
 5     if (concurrencyLevel > MAX_SEGMENTS)
 6         concurrencyLevel = MAX_SEGMENTS;
 7   
 8     // Find power-of-two sizes best matching arguments
 9     int sshift = 0;
10     int ssize = 1;
11     while (ssize < concurrencyLevel) {
12         ++sshift;
13         ssize <<= 1;
14     }
15     segmentShift = 32 - sshift;
16     segmentMask = ssize - 1;
17     this.segments = Segment.newArray(ssize);
18   
19     if (initialCapacity > MAXIMUM_CAPACITY)
20         initialCapacity = MAXIMUM_CAPACITY;
21     int c = initialCapacity / ssize;
22     if (c * ssize < initialCapacity)
23         ++c;
24     int cap = 1;
25     while (cap < c)
26         cap <<= 1;
27   
28     for (int i = 0; i < this.segments.length; ++i)
29         this.segments[i] = new Segment<K,V>(cap, loadFactor);
30 }

  CurrentHashMap的初始化一共有三个参数

  initialCapacity:表示初始的容量

  loadFactor:表示负载参数,最后一个是

  concurrencyLevel:代表ConcurrentHashMap内部的Segment的数量

  其中,concurrencyLevel
一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

  整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrencyLevel来new出Segment,这里Segment的数量是不大于concurrencyLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

  这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

ConcurrentHashMap之get操作

JDK1.6的代码如下:

V get(Object key, int hash) { 
    if (count != 0) { // read-volatile 
        HashEntry<K,V> e = getFirst(hash); 
        while (e != null) { 
            if (e.hash == hash && key.equals(e.key)) { 
                V v = e.value; 
                if (v != null) 
                    return v; 
                return readValueUnderLock(e); // recheck 
            } 
            e = e.next; 
        } 
    } 
    return null; 
}

  1.6的jdk采用了乐观锁的方式处理了get方法,取出key对应的value的值,如果拿出的value的值是null,则可能这个key–value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

JDK1.7代码如下:

 

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

 

  1.7并没有判断value=null的情况,不知为何,仔细思考了一下,个人认为无论是1.6还是1.7的实现,实际上都是一种乐观的方式,而乐观的方式带来的是性能上的提升,但同时也带来数据的弱一致性,如果你的业务是强一致性的业务,可能就要考虑另外的解决办法(用Collections包装或者像jdk6中一样二次加锁获取)

  具体ConcurrentHashMap为什么会带来数据的若一致性详情可查看博客:

  

ConcurrentHashMap之put操作

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

  对于put,ConcurrentHashMap采用自旋锁的方式,不同于1.6的直接获取锁

  注:个人理解,这里采用自旋锁可能作者是觉得在分段锁的状态下,并发的可能本来就比较小,并且锁占用时间又并不是特别长,因此自旋锁可以减小线程唤醒和切换的开销。针对自旋锁相关的详情,请参看博文(原理详情):

ConcurrentHashMap之remove操作

 
Remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:

 1 final V remove(Object key, int hash, Object value) {
 2     if (!tryLock())
 3         scanAndLock(key, hash);
 4     V oldValue = null;
 5     try {
 6         HashEntry<K,V>[] tab = table;
 7         int index = (tab.length - 1) & hash;
 8         HashEntry<K,V> e = entryAt(tab, index);
 9         HashEntry<K,V> pred = null;
10         while (e != null) {
11             K k;
12             HashEntry<K,V> next = e.next;
13             if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
14                 V v = e.value;
15                 if (value == null || value == v || value.equals(v)) {
16                     if (pred == null)
17                         setEntryAt(tab, index, next);
18                     else
19                         pred.setNext(next);
20                     ++modCount;
21                     --count;
22                     oldValue = v;
23                 }
24                 break;
25             }
26             pred = e;
27             e = next;
28         }
29     } finally {
30         unlock();
31     }
32     return oldValue;
33 }

  首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,前面已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:

澳门新浦京app下载 4

假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:

澳门新浦京app下载 5

 

注意:下面的图1和2的元素顺序相反了,为什么这样,不防再仔细看看源码或者再读一遍上面remove的分析过程,元素复制是从待删除元素位置起将前面的元素逐一复制的,然后再将后面的链接起来。

ConcurrentHashMap之size操作

  如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?

  前面我们提到了一个Segment中的有一个modCount变量,代表的是对Segment中元素的数量造成影响的操作的次数,这个值只增不减,size操作就是遍历了两次Segment,每次记录Segment的modCount值,然后将两次的modCount进行比较,如果相同,则表示期间没有发生过写入操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次,如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了,具体的实现大家可以看ConcurrentHashMap的源码,这里就不贴了。

总结

 
  concurrentHashmap主要是为并发设计,与Collections的包装不同,他不是采用全同步的方式,而是采用非锁get方式,通过数据的弱一致性带来性能上的大幅提升,同时采用分段锁的策略,提高并发能力。

 

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图