引言

本文参考《分布式 Java 应用》一书,对 Java 并发包(java.util.concurrent)中的核心类进行简要分析。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的 HashMap 实现。

1. 添加操作(put)

ConcurrentHashMap 并没有采用 synchronized 进行整体控制,而是使用了 ReentrantLock

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key.hashCode());
    return segmentFor(hash).put(key, hash, value, false);
}

这里计算出 key 的 hash 值,根据 hash 值获取对应的数组中的 Segment 对象。接下来的工作都交由 Segment 完成。

Segment 可以看成是 HashMap 的一个部分(ConcurrentHashMap 基于 concurrencyLevel 划分出了多个 Segment 来对 key-value 进行存储)。每次操作都只对当前 Segment 进行锁定,从而避免每次 put 操作锁住整个 Map。

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        } else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

这个方法进来就上锁(lock),并在 finally 中确保释放锁(unlock)。

添加 key-value 的过程中,先判断当前存储对象个数加 1 后是否大于 threshold,如果大于则进行扩容(对象数组扩大两倍,进行重新 hash,转移到新数组)。

如果不大于,则进行后续操作。通过对 hash 值和对象数组大小减 1 的值进行按位与操作(取余),得到当前 key 需要放入数组的位置,接着寻找对应位置上的 HashEntry 对象链表,并进行遍历。

  • 如果找到相同 key 值的 Entry,则替换该 Entry 对象的 value。
  • 如果没有找到,就创建一个 Entry 对象,赋值给对应位置的数组对象,并构成链表。

注意:采用 Segment 这种方式,在并发操作过程中,可以在很多程度上减少阻塞现象。

2. 删除操作(remove)

public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

put 类似,删除也要根据 hash 先获得 Segment,然后在 Segment 上执行 remove 操作。

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

Segmentremove 操作,首先加锁,然后对 hash 值与数组大小减 1 的值按位与操作,得到数组对应位置上的 HashEntry 对象,接下来遍历此链表,查找 hash 值相等并且 key 相等(equals)的对象。

  • 如果没有找到,返回 null,释放锁。
  • 如果找到了,则重新创建位于删除元素之前的所有 HashEntry,位于其后的不用处理。释放锁!

3. 获取操作(get)

直接看看 Segment 中的 get 操作,如下:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

可以看出并没有加锁操作,只有 v == null 时,进入 readValueUnderLock 才有加锁操作。

这里假设一种情况:例如两条线程 A、B,A 执行 get 操作,B 执行 put 操作。

当 A 执行到 getFirst,与当前数组长度减 1 按位与操作后得到指定位置 index,此时 CPU 将执行权交给 B,B 线程 put 一对 key-value,导致扩容并重新 hash 排列,然后 CPU 又将执行权还给 A,A 然后根据之前的 index 去获取 HashEntry 就会发生问题。

当然,这种情况发生的概率很小。

4. 遍历

其实这个过程和读取过程类似,读取所有分段中的数据即可。

小结

ConcurrentHashMap 默认情况下采用将数据分为 16 个段(Segment)进行存储,并且每个段各自拥有自己的锁。锁仅用于 putremove 等改变集合对象的操作,基于 volatileHashEntry 链表的不变性实现读取的不加锁。

这些方式使得 ConcurrentHashMap 能够保持极好的并发操作,尤其是对于读远比插入和删除频繁的 Map 而言。它采用的这些方法也可谓是对于 Java 内存模型、并发机制深刻掌握的体现,是一个设计得非常不错的支持高并发的集合对象。

——摘自《分布式 Java 应用》

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全、并且在读操作时无锁的 ArrayList

1. 添加操作(add)

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制数组
        newElements[len] = e; // 添加到末尾
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

这里同样没有使用 synchronized 关键字,而是使用 ReentrantLock

ArrayList 不同的是,这里每次都会创建一个新的 Object 数组,大小比之前数组大 1。将之前的数组复制到新数组,并将新加入的元素加到数组末尾。

2. 删除操作(remove)

public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (len != 0) {
            // Copy while searching for element to remove
            // This wins in the normal case of element being present
            int newlen = len - 1;
            Object[] newElements = new Object[newlen]; // 新建数组

            for (int i = 0; i < newlen; ++i) {
                if (eq(o, elements[i])) {
                    // found one;  copy remaining and exit
                    for (int k = i + 1; k < len; ++k)
                        newElements[k - 1] = elements[k];
                    setArray(newElements);
                    return true;
                } else
                    newElements[i] = elements[i];
            }

            // special handling for last cell
            if (eq(o, elements[newlen])) {
                setArray(newElements);
                return true;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}

此方法为什么这么直接进行数组的复制呢?为何不使用 SystemarrayCopy 来完成?这是一个值得思考的实现细节。

3. 获取操作(get)

public E get(int index) {
    return (E) (getArray()[index]);
}

这里有可能出现脏读(读取到旧数据),但是性能非常高。

通过看集合包和并发包可以看出一些不同的编程思路。这里为什么就不事先做范围的检查?

小结

从上可见,CopyOnWriteArrayList 基于 ReentrantLock 保证了增加元素和删除元素动作的互斥。在读操作上没有任何锁,这样就保证了读的性能,带来的副作用是有时候可能会读取到旧数据(弱一致性)。

CopyOnWriteArraySet

CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的。可以知道 Set 是不容许重复数据的,因此 add 操作和 CopyOnWriteArrayList 有所区别,它是调用 CopyOnWriteArrayListaddIfAbsent 方法。

public boolean addIfAbsent(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Copy while checking if already present.
        // This wins in the most common case where it is not present
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = new Object[len + 1];
        for (int i = 0; i < len; ++i) {
            if (eq(e, elements[i])) // 如果存在,直接返回!
                return false; // exit, throwing away copy
            else
                newElements[i] = elements[i];
        }
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

由此可见,addIfAbsent 需要每次都遍历。在 add 方面,CopyOnWriteArraySet 效率要比 CopyOnWriteArrayList 低一点。

ArrayBlockingQueue

ArrayBlockingQueue 是一个基于数组、先进先出、线程安全的集合类,其特点是实现指定时间的阻塞读写,并且容量是可以限制的。

1. 创建

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

初始化锁和两个锁上的 Condition,一个为 notEmpty,一个为 notFull

2. 添加操作

带超时的 offer 方法

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            if (count != items.length) {
                insert(e);
                return true;
            }
            if (nanos <= 0)
                return false;
            try {
                nanos = notFull.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
        }
    } finally {
        lock.unlock();
    }
}

这个方法将元素插入数组的末尾,如果数组满,则进入等待,直到以下三种情况发生才继续:

  1. 被唤醒
  2. 达到指定的时间
  3. 当前线程被中断

该方法首先将等待时间转换成纳秒。然后加锁,如果数组未满,则在末尾插入数据;如果数组已满,则调用 notFull.awaitNanos 进行等待。如果被唤醒或超时,重新判断是否满。如果线程被 interrupt,则直接抛出异常。

不带超时的 offer 方法

另外一个不带时间的 offer 方法在数组满的情况下不进去等待,而是直接返回 false

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

put 方法

同时还可以选择 put 方法,此方法在数组已满的情况下会一直等待,直到数组不为空或线程被 interrupt

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

3. 获取操作

带超时的 poll 方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            if (count != 0) {
                E x = extract();
                return x;
            }
            if (nanos <= 0)
                return null;
            try {
                nanos = notEmpty.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
        }
    } finally {
        lock.unlock();
    }
}

poll 获取队列中的第一个元素,如果队列中没有元素,则进入等待。

poll 首先将指定 timeout 转换成纳秒,然后加锁。如果数组个数不为 0,则从当前对象数组中获取最后一个元素,在获取后将位置上的元素置为 null

如果数组中的元素个数为 0,首先判断 timeout 是否小于等于 0,若小于等于 0 则直接返回 null。若大于 0 则进行等待,如果被唤醒或者超时,重新判断数据元素个数是否大于 0。

如果线程被 interrupt,则直接抛出 InterruptedException

不带超时的 poll 方法

offer 一样,不带时间的 poll 方法在数组元素个数为 0 直接返回 null,不进行等待。

take 方法

take 方法在数据为空的情况下会一直等待,直到数组不为空或者 interrupt

说明

版本时效性说明:文中关于 ConcurrentHashMap 的分析基于 Java 7 及之前版本 的实现(基于 Segment 分段锁)。在 Java 8 及之后版本 中,ConcurrentHashMap 已废弃 Segment,改用 Node 数组 + 链表 + 红黑树 + CAS 的方式实现,并发性能进一步优化。阅读代码时请注意 JDK 版本差异。