java中并发包简要分析01
引言
本文参考《分布式 Java 应用》一书,对 Java 并发包(java.util.concurrent)中的核心类进行简要分析。
ConcurrentHashMap
ConcurrentHashMap 是线程安全的 HashMap 实现。
1. 添加操作(put)
ConcurrentHashMap 并没有采用 synchronized 进行整体控制,而是使用了 ReentrantLock。
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}这里计算出 key 的 hash 值,根据 hash 值获取对应的数组中的 Segment 对象。接下来的工作都交由 Segment 完成。
Segment 可以看成是 HashMap 的一个部分(ConcurrentHashMap 基于 concurrencyLevel 划分出了多个 Segment 来对 key-value 进行存储)。每次操作都只对当前 Segment 进行锁定,从而避免每次 put 操作锁住整个 Map。
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
} else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}这个方法进来就上锁(lock),并在 finally 中确保释放锁(unlock)。
添加 key-value 的过程中,先判断当前存储对象个数加 1 后是否大于 threshold,如果大于则进行扩容(对象数组扩大两倍,进行重新 hash,转移到新数组)。
如果不大于,则进行后续操作。通过对 hash 值和对象数组大小减 1 的值进行按位与操作(取余),得到当前 key 需要放入数组的位置,接着寻找对应位置上的 HashEntry 对象链表,并进行遍历。
- 如果找到相同 key 值的 Entry,则替换该 Entry 对象的 value。
- 如果没有找到,就创建一个 Entry 对象,赋值给对应位置的数组对象,并构成链表。
注意:采用 Segment 这种方式,在并发操作过程中,可以在很多程度上减少阻塞现象。
2. 删除操作(remove)
public V remove(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);
}和 put 类似,删除也要根据 hash 先获得 Segment,然后在 Segment 上执行 remove 操作。
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}Segment 的 remove 操作,首先加锁,然后对 hash 值与数组大小减 1 的值按位与操作,得到数组对应位置上的 HashEntry 对象,接下来遍历此链表,查找 hash 值相等并且 key 相等(equals)的对象。
- 如果没有找到,返回 null,释放锁。
- 如果找到了,则重新创建位于删除元素之前的所有
HashEntry,位于其后的不用处理。释放锁!
3. 获取操作(get)
直接看看 Segment 中的 get 操作,如下:
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}可以看出并没有加锁操作,只有 v == null 时,进入 readValueUnderLock 才有加锁操作。
这里假设一种情况:例如两条线程 A、B,A 执行 get 操作,B 执行 put 操作。
当 A 执行到 getFirst,与当前数组长度减 1 按位与操作后得到指定位置 index,此时 CPU 将执行权交给 B,B 线程 put 一对 key-value,导致扩容并重新 hash 排列,然后 CPU 又将执行权还给 A,A 然后根据之前的 index 去获取 HashEntry 就会发生问题。
当然,这种情况发生的概率很小。
4. 遍历
其实这个过程和读取过程类似,读取所有分段中的数据即可。
小结
ConcurrentHashMap 默认情况下采用将数据分为 16 个段(Segment)进行存储,并且每个段各自拥有自己的锁。锁仅用于 put 和 remove 等改变集合对象的操作,基于 volatile 及 HashEntry 链表的不变性实现读取的不加锁。
这些方式使得 ConcurrentHashMap 能够保持极好的并发操作,尤其是对于读远比插入和删除频繁的 Map 而言。它采用的这些方法也可谓是对于 Java 内存模型、并发机制深刻掌握的体现,是一个设计得非常不错的支持高并发的集合对象。
——摘自《分布式 Java 应用》
CopyOnWriteArrayList
CopyOnWriteArrayList 是一个线程安全、并且在读操作时无锁的 ArrayList。
1. 添加操作(add)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制数组
newElements[len] = e; // 添加到末尾
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}这里同样没有使用 synchronized 关键字,而是使用 ReentrantLock。
和 ArrayList 不同的是,这里每次都会创建一个新的 Object 数组,大小比之前数组大 1。将之前的数组复制到新数组,并将新加入的元素加到数组末尾。
2. 删除操作(remove)
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
if (len != 0) {
// Copy while searching for element to remove
// This wins in the normal case of element being present
int newlen = len - 1;
Object[] newElements = new Object[newlen]; // 新建数组
for (int i = 0; i < newlen; ++i) {
if (eq(o, elements[i])) {
// found one; copy remaining and exit
for (int k = i + 1; k < len; ++k)
newElements[k - 1] = elements[k];
setArray(newElements);
return true;
} else
newElements[i] = elements[i];
}
// special handling for last cell
if (eq(o, elements[newlen])) {
setArray(newElements);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}此方法为什么这么直接进行数组的复制呢?为何不使用 System 的 arrayCopy 来完成?这是一个值得思考的实现细节。
3. 获取操作(get)
public E get(int index) {
return (E) (getArray()[index]);
}这里有可能出现脏读(读取到旧数据),但是性能非常高。
通过看集合包和并发包可以看出一些不同的编程思路。这里为什么就不事先做范围的检查?
小结
从上可见,CopyOnWriteArrayList 基于 ReentrantLock 保证了增加元素和删除元素动作的互斥。在读操作上没有任何锁,这样就保证了读的性能,带来的副作用是有时候可能会读取到旧数据(弱一致性)。
CopyOnWriteArraySet
CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的。可以知道 Set 是不容许重复数据的,因此 add 操作和 CopyOnWriteArrayList 有所区别,它是调用 CopyOnWriteArrayList 的 addIfAbsent 方法。
public boolean addIfAbsent(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Copy while checking if already present.
// This wins in the most common case where it is not present
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = new Object[len + 1];
for (int i = 0; i < len; ++i) {
if (eq(e, elements[i])) // 如果存在,直接返回!
return false; // exit, throwing away copy
else
newElements[i] = elements[i];
}
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}由此可见,addIfAbsent 需要每次都遍历。在 add 方面,CopyOnWriteArraySet 效率要比 CopyOnWriteArrayList 低一点。
ArrayBlockingQueue
ArrayBlockingQueue 是一个基于数组、先进先出、线程安全的集合类,其特点是实现指定时间的阻塞读写,并且容量是可以限制的。
1. 创建
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}初始化锁和两个锁上的 Condition,一个为 notEmpty,一个为 notFull。
2. 添加操作
带超时的 offer 方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}这个方法将元素插入数组的末尾,如果数组满,则进入等待,直到以下三种情况发生才继续:
- 被唤醒
- 达到指定的时间
- 当前线程被中断
该方法首先将等待时间转换成纳秒。然后加锁,如果数组未满,则在末尾插入数据;如果数组已满,则调用 notFull.awaitNanos 进行等待。如果被唤醒或超时,重新判断是否满。如果线程被 interrupt,则直接抛出异常。
不带超时的 offer 方法
另外一个不带时间的 offer 方法在数组满的情况下不进去等待,而是直接返回 false。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}put 方法
同时还可以选择 put 方法,此方法在数组已满的情况下会一直等待,直到数组不为空或线程被 interrupt。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}3. 获取操作
带超时的 poll 方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}poll 获取队列中的第一个元素,如果队列中没有元素,则进入等待。
poll 首先将指定 timeout 转换成纳秒,然后加锁。如果数组个数不为 0,则从当前对象数组中获取最后一个元素,在获取后将位置上的元素置为 null。
如果数组中的元素个数为 0,首先判断 timeout 是否小于等于 0,若小于等于 0 则直接返回 null。若大于 0 则进行等待,如果被唤醒或者超时,重新判断数据元素个数是否大于 0。
如果线程被 interrupt,则直接抛出 InterruptedException。
不带超时的 poll 方法
和 offer 一样,不带时间的 poll 方法在数组元素个数为 0 直接返回 null,不进行等待。
take 方法
take 方法在数据为空的情况下会一直等待,直到数组不为空或者 interrupt。
说明
版本时效性说明:文中关于ConcurrentHashMap的分析基于 Java 7 及之前版本 的实现(基于 Segment 分段锁)。在 Java 8 及之后版本 中,ConcurrentHashMap已废弃Segment,改用Node数组 + 链表 + 红黑树 + CAS 的方式实现,并发性能进一步优化。阅读代码时请注意 JDK 版本差异。
版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/java-zhong-bing-fa-bao-jian-yao-fen-xi-01.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。