ConcurrentHashMap 之实现细节

ConcurrentHashMap 是 Java 5 中引入的支持高并发、高吞吐量的线程安全 HashMap 实现。通过深入阅读源代码,我们可以理解其精巧的实现机制。本文将与大家共享 ConcurrentHashMap 的核心实现细节。

实现原理

锁分离 (Lock Stripping)

ConcurrentHashMap 允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用多个锁来控制对 hash 表不同部分的修改。ConcurrentHashMap 内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的 hash table,它们拥有独立的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

有些方法需要跨段操作,比如 size()containsValue()。它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,再按顺序释放所有段的锁。这里“按顺序”非常重要,否则极有可能出现死锁。在 ConcurrentHashMap 内部,段数组是 final 的,并且其成员变量实际上也是 final 的。但是,仅仅是将数组声明为 final 并不保证数组成员也是 final 的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。不变性在多线程编程中占有重要地位,下文还将谈到。

/**
 * The segments, each of which is a specialized hash table
 */
final Segment<K,V>[] segments;

不变 (Immutable) 和易变 (Volatile)

ConcurrentHashMap 完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如 HashMap 中的实现,如果允许在 hash 链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap 的实现技术是保证 HashEntry 几乎是不可变的。HashEntry 代表每个 hash 链中的一个节点,其结构如下所示:

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

可以看到,除了 value 不是 final 的,其它值都是 final 的。这意味着不能从 hash 链的中间或尾部添加或删除节点,因为这需要修改 next 引用值,所有的节点修改只能从头部开始。对于 put 操作,可以一律添加到 Hash 链的头部。但是对于 remove 操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将 value 设置成 volatile,这避免了加锁。

哈希与定位

为了加快定位段以及段中 hash 槽的速度,每个段 hash 槽的个数都是 2^n,这使得通过位运算就可以定位段和段中 hash 槽的位置。当并发级别为默认值 16 时,也就是段的个数,hash 值的高 4 位决定分配在哪个段中。但是我们也不要忘记《算法导论》给我们的教训:hash 槽的个数不应该是 2^n,这可能导致 hash 槽分配不均,这需要对 hash 值重新再 hash 一次。

以下是重新 hash 的算法:

private static int hash(int h) {
    // Spread bits to regularize both segment and index locations,
    // using variant of single-word Wang/Jenkins hash.
    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}

这是定位段的方法:

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

数据结构

关于 Hash 表的基础数据结构,这里不做过多探讨。Hash 表的一个重要方面就是如何解决 hash 冲突,ConcurrentHashMapHashMap 使用相同的方式,都是将 hash 值相同的节点放在一个 hash 链中。与 HashMap 不同的是,ConcurrentHashMap 使用多个子 Hash 表,也就是段(Segment)。下面是 ConcurrentHashMap 的数据成员:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
    /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     */
    final int segmentMask;

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;

    /**
     * The segments, each of which is a specialized hash table
     */
    final Segment<K,V>[] segments;
}

所有的成员都是 final 的,其中 segmentMasksegmentShift 主要是为了定位段,参见上面的 segmentFor 方法。

每个 Segment 相当于一个子 Hash 表,它的数据成员如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    /**
     * The number of elements in this segment's region.
     */
    transient volatile int count;

    /**
     * Number of updates that alter the size of the table. This is
     * used during bulk-read methods to make sure they see a
     * consistent snapshot: If modCounts change during a traversal
     * of segments computing size or checking containsValue, then
     * we might have an inconsistent view of state so (usually)
     * must retry.
     */
    transient int modCount;

    /**
     * The table is rehashed when its size exceeds this threshold.
     * (The value of this field is always <tt>(int)(capacity *
     * loadFactor)</tt>.)
     */
    transient int threshold;

    /**
     * The per-segment table.
     */
    transient volatile HashEntry<K,V>[] table;

    /**
     * The load factor for the hash table.  Even though this value
     * is same for all segments, it is replicated to avoid needing
     * links to outer object.
     * @serial
     */
    final float loadFactor;
}

count 用来统计该段数据的个数,它是 volatile,用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的:每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写 count 值;每次读取操作开始都要读取 count 的值。这利用了 Java 5 中对 volatile 语义的增强,对同一个 volatile 变量的写和读存在 happens-before 关系。modCount 统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时还会详述。threshold 用来表示需要进行 rehash 的界限值。table 数组存储段中节点,每个数组元素是个 hash 链,用 HashEntry 表示。table 也是 volatile,这使得能够读取到最新的 table 值而不需要同步。loadFactor 表示负载因子。

实现细节

修改操作

先来看下删除操作 remove(key)

public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

整个操作是先定位到段,然后委托给段的 remove 操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是 Segmentremove 方法实现:

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点 e。接下来,如果不存在这个节点就直接返回 null,否则就要将 e 前面的结点复制一遍,尾结点指向 e 的下一个结点。e 后面的结点不需要复制,它们可以重用。下面是个示意图:

删除元素之前:
这里写图片描述
删除元素 3 之后:
这里写图片描述

第二个图其实有点问题,复制的结点中应该是值为 2 的结点在前面,值为 1 的结点在后面,也就是刚好和原来结点顺序相反,还好这不影响我们的讨论。

整个 remove 实现并不复杂,但是需要注意如下几点:

  1. 当要删除的结点存在时,删除的最后一步操作要将 count 的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。
  2. remove 执行的开始就将 table 赋给一个局部变量 tab,这是因为 tablevolatile 变量,读写 volatile 变量的开销很大。编译器也不能对 volatile 变量的读写做任何优化,直接多次访问非 volatile 实例变量没有多大影响,编译器会做相应优化。

接下来看 put 操作,同样地 put 操作也是委托给段的 put 方法。下面是段的 put 方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

该方法也是在持有段锁的情况下执行的,首先判断是否需要 rehash,需要就先 rehash。接着是找是否存在同样一个 key 的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到 hash 链的头部,这时一定要修改 modCountcount 的值,同样修改 count 的值一定要放在最后一步。put 方法调用了 rehash 方法,rehash 方法实现得也很精巧,主要利用了 table 的大小为 2^n,这里就不介绍了。

修改操作还有 putAllreplaceputAll 就是多次调用 put 方法。replace 甚至不用做结构上的更改,实现要比 putremove 简单得多,理解了 putremove,理解 replace 就不在话下了,这里也不介绍了。

获取操作

首先看下 get 操作,同样 ConcurrentHashMapget 操作是直接委托给 Segmentget 方法,直接看 Segmentget 方法:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

get 操作不需要锁。第一步是访问 count 变量,这是一个 volatile 变量,由于所有的修改操作在进行结构修改时都会在最后一步写 count 变量,通过这种机制保证 get 操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于 HashEntryvalue 变量是 volatile 的,也能保证读取到最新的值。接下来就是对 hash 链进行遍历找到要获取的结点,如果没有找到,直接返回 null。对 hash 链进行遍历不需要加锁的原因在于链指针 nextfinal 的。但是头指针却不是 final 的,这是通过 getFirst(hash) 方法返回,也就是存在 table 数组中的值。这使得 getFirst(hash) 可能返回过时的头结点,例如,当执行 get 方法时,刚执行完 getFirst(hash) 之后,另一个线程执行了删除操作并更新头结点,这就导致 get 方法中返回的头结点不是最新的。这是可以允许的,通过对 count 变量的协调机制,get 能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为 put 的时候就进行了判断,如果为空就要抛 NullPointerException。空值的唯一源头就是 HashEntry 中的默认值,因为 HashEntry 中的 value 不是 final 的,非同步读取有可能读取到空值。仔细看下 put 操作的语句:tab[index] = new HashEntry

V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}

另一个操作是 containsKey,这个实现就要简单得多了,因为它不需要读取值:

boolean containsKey(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key))
                return true;
            e = e.next;
        }
    }
    return false;
}

跨段操作

有些操作需要涉及到多个段,比如说 size(), containsValue()。先来看下 size() 方法:

public int size() {
    final Segment<K,V>[] segments = this.segments;
    long sum = 0;
    long check = 0;
    int[] mc = new int[segments.length];
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        check = 0;
        sum = 0;
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            sum += segments[i].count;
            mcsum += mc[i] = segments[i].modCount;
        }
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                check += segments[i].count;
                if (mc[i] != segments[i].modCount) {
                    check = -1; // force retry
                    break;
                }
            }
        }
        if (check == sum)
            break;
    }
    if (check != sum) { // Resort to locking all segments
        sum = 0;
        for (int i = 0; i < segments.length; ++i)
            segments[i].lock();
        for (int i = 0; i < segments.length; ++i)
            sum += segments[i].count;
        for (int i = 0; i < segments.length; ++i)
            segments[i].unlock();
    }
    if (sum > Integer.MAX_VALUE)
        return Integer.MAX_VALUE;
    else
        return (int)sum;
}

size 方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行 RETRIES_BEFORE_LOCK 次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用 Segment 中的 modCount 进行检测,在遍历过程中保存每个 SegmentmodCount,遍历完成之后再检测每个 SegmentmodCount 有没有改变,如果有改变表示有其它线程正在对 Segment 进行结构性并发更新,需要重新计算。

其实这种方式是存在问题的,在第一个内层 for 循环中,在这两条语句 sum += segments[i].count;mcsum += mc[i] = segments[i].modCount; 之间,其它线程可能正在对 Segment 进行结构性的修改,导致 segments[i].countsegments[i].modCount 读取的数据并不一致。这可能使 size() 方法返回任何时候都不曾存在的大小,很奇怪 javadoc 居然没有明确标出这一点,可能是因为这个时间窗口太小了吧。size() 的实现还有一点需要注意,必须要先访问 segments[i].count,才能访问 segments[i].modCount,这是因为 segment[i].count 是对 volatile 变量的访问,接下来 segments[i].modCount 才能得到几乎最新的值。这点在 containsValue 方法中得到了淋漓尽致的展现:

public boolean containsValue(Object value) {
    if (value == null)
        throw new NullPointerException();

    // See explanation of modCount use above

    final Segment<K,V>[] segments = this.segments;
    int[] mc = new int[segments.length];

    // Try a few times without locking
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        int sum = 0;
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            int c = segments[i].count;
            mcsum += mc[i] = segments[i].modCount;
            if (segments[i].containsValue(value))
                return true;
        }
        boolean cleanSweep = true;
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                int c = segments[i].count;
                if (mc[i] != segments[i].modCount) {
                    cleanSweep = false;
                    break;
                }
            }
        }
        if (cleanSweep)
            return false;
    }
    // Resort to locking all segments
    for (int i = 0; i < segments.length; ++i)
        segments[i].lock();
    boolean found = false;
    try {
        for (int i = 0; i < segments.length; ++i) {
            if (segments[i].containsValue(value)) {
                found = true;
                break;
            }
        }
    } finally {
        for (int i = 0; i < segments.length; ++i)
            segments[i].unlock();
    }
    return found;
}

同样注意内层的第一个 for 循环,里面有语句 int c = segments[i].count; 但是 c 却从来没有被使用过,即使如此,编译器也不能做优化将这条语句去掉,因为存在对 volatile 变量 count 的读取,这条语句存在的唯一目的就是保证 segments[i].modCount 读取到几乎最新的值。关于 containsValue 方法的其它部分就不分析了,它和 size 方法差不多。

跨段方法中还有一个 isEmpty() 方法,其实现比 size() 方法还要简单,也不介绍了。最后简单地介绍下迭代方法,如 keySet(), values(), entrySet() 方法,这些方法都返回相应的迭代器,所有迭代器都继承于 HashIterator 类,里实现了主要的方法。其结构是:

abstract class HashIterator {
    int nextSegmentIndex;
    int nextTableIndex;
    HashEntry<K,V>[] currentTable;
    HashEntry<K, V> nextEntry;
    HashEntry<K, V> lastReturned;
}

nextSegmentIndex 是段的索引,nextTableIndexnextSegmentIndex 对应段中 hash 链的索引,currentTablenextSegmentIndex 对应段的 table。调用 next 方法时主要是调用了 advance 方法:

final void advance() {
    if (nextEntry != null && (nextEntry = nextEntry.next) != null)
        return;

    while (nextTableIndex >= 0) {
        if ( (nextEntry = currentTable[nextTableIndex--]) != null)
            return;
    }

    while (nextSegmentIndex >= 0) {
        Segment<K,V> seg = segments[nextSegmentIndex--];
        if (seg.count != 0) {
            currentTable = seg.table;
            for (int j = currentTable.length - 1; j >= 0; --j) {
                if ( (nextEntry = currentTable[j]) != null) {
                    nextTableIndex = j - 1;
                    return;
                }
            }
        }
    }
}

不想再多介绍了,唯一需要注意的是跳到下一个段时,一定要先读取下一个段的 count 变量。

这种迭代方式的主要效果是不会抛出 ConcurrentModificationException。一旦获取到下一个段的 table,也就意味着这个段的头结点在迭代过程中就确定了,在迭代过程中就不能反映对这个段节点并发的删除和添加,对于节点的更新是能够反映的,因为节点的值是一个 volatile 变量。

结束语

ConcurrentHashMap 是一个支持高并发的高性能的 HashMap 实现,它支持完全并发的读以及一定程度并发的写。ConcurrentHashMap 的实现也是很精巧,充分利用了最新的 JMM 规范,值得学习,却不值得模仿。最后由于本人水平有限,对大师的作品难免有误解,如果存在,还望大牛们不吝指出。

装载自:http://www.iteye.com/topic/344876

版本说明

说明:本文内容基于 Java 7 及以下版本的 ConcurrentHashMap 实现(基于 Segment 分段锁机制)。Java 8 及之后版本已对该类进行了重构,改为使用 CAS + synchronized + 红黑树结构,不再使用 Segment 类,具体实现细节与本文所述有所不同。