怎么理解Condition
怎么理解 Condition
在 java.util.concurrent 包中,Condition 和 ReentrantLock 是两个非常特殊的工具类。使用过的人都知道,ReentrantLock(重入锁)是 JDK 并发包提供的一种独占锁实现。它基于 Doug Lea 实现的 AbstractQueuedSynchronizer(同步器,简称 AQS)。确切地说,是 ReentrantLock 的一个内部类继承了 AbstractQueuedSynchronizer,而 ReentrantLock 本身只是代理了该内部类的一些方法。
有人可能会问,为什么要使用内部类再包装一层?主要是出于安全与封装的考虑。因为 AbstractQueuedSynchronizer 中包含很多方法,还实现了共享锁、Condition 等功能。如果直接让 ReentrantLock 继承它,很容易导致 AbstractQueuedSynchronizer 中的 API 被误用或暴露不必要的接口。
言归正传,今天我们来讨论 Condition 工具类的实现。
使用示例
ReentrantLock 和 Condition 的典型使用方式如下:
public static void main(String[] args) {
final ReentrantLock reentrantLock = new ReentrantLock();
final Condition condition = reentrantLock.newCondition();
Thread thread = new Thread(() -> {
try {
reentrantLock.lock();
System.out.println("我要等一个新信号" + this);
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("拿到一个信号!!" + this);
reentrantLock.unlock();
}, "waitThread1");
thread.start();
Thread thread1 = new Thread(() -> {
reentrantLock.lock();
System.out.println("我拿到锁了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
System.out.println("我发了一个信号!!");
reentrantLock.unlock();
}, "signalThread");
thread1.start();
}运行后,结果如下:
我要等一个新信号 lock.ReentrantLockTest$1@a62fc3
我拿到锁了
我发了一个信号!!
拿到一个信号!!可以看到,Condition 的执行方式是:当在线程 1 中调用 await 方法后,线程 1 将释放锁,并且将自己挂起(沉睡),等待唤醒;线程 2 获取到锁后,开始执行任务,完毕后调用 Condition 的 signal 方法,唤醒线程 1,线程 1 恢复执行。
以上说明 Condition 是一个多线程间协调通信的工具类,使得某个或某些线程一起等待某个条件(Condition)。只有当该条件具备(signal 或者 signalAll 方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。
实现原理
那么,它是如何实现的呢?
首先要明白,reentrantLock.newCondition() 返回的是 Condition 的一个实现。该类在 AbstractQueuedSynchronizer 中被实现,方法名为 newCondition():
public Condition newCondition() {
return sync.newCondition();
}它可以访问 AbstractQueuedSynchronizer 中的方法和其余内部类(AbstractQueuedSynchronizer 是个抽象类,至于内部类如何访问,这里涉及内部类的访问机制)。
为了方便书写,下文将 AbstractQueuedSynchronizer 缩写为 AQS。
await 方法分析
当 await 被调用时,代码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程包装后,添加到 Condition 自己维护的一个链表中
Node node = addConditionWaiter();
// 释放当前线程占有的锁。从 demo 中看到,调用 await 前,当前线程是占有锁的
int savedState = fullyRelease(node);
int interruptMode = 0;
// 释放完毕后,遍历 AQS 的队列,看当前节点是否在队列中
while (!isOnSyncQueue(node)) {
// 不在说明它还没有竞争锁的资格,所以继续将自己沉睡
// 直到它被加入到队列中(在 signal 的时候加入)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,重新开始正式竞争锁。同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}线程协作流程
回到上面的 demo,锁被释放后,线程 1 开始沉睡。这个时候,因为线程 1 沉睡时会唤醒 AQS 队列中的头结点,所以线程 2 会开始竞争锁并获取到。等待 3 秒后,线程 2 会调用 signal 方法,“发出”信号。signal 方法如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// firstWaiter 为 condition 自己维护的一个链表的头结点
// 取出第一个节点后开始唤醒操作
Node first = firstWaiter;
if (first != null)
doSignal(first);
}说明一下,其实 Condition 内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待 signal 信号的线程。该线程被封装为 Node 节点后存放于此:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
// ...
}关键就在于此。我们知道 AQS 自己维护的队列是当前等待资源的队列,AQS 会在资源被释放后,依次唤醒队列中从前到后的所有节点,使它们对应的线程恢复执行,直到队列为空。
而 Condition 自己也维护了一个队列,该队列的作用是维护一个等待 signal 信号的队列。两个队列的作用是不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个。流程是这样的:
- 线程 1 调用
reentrantLock.lock时,线程被加入到 AQS 的等待队列 中。 - 线程 1 调用
await方法时,该线程从 AQS 中移除(对应操作是锁的释放)。 - 接着马上被加入到 Condition 的等待队列 中,意味着该线程需要
signal信号。 - 线程 2 因为线程 1 释放锁的关系,被唤醒,并判断可以获取锁,于是线程 2 获取锁,并被加入到 AQS 的等待队列 中。
- 线程 2 调用
signal方法,这个时候 Condition 的等待队列中只有线程 1 一个节点,于是它被取出来,并被加入到 AQS 的等待队列 中。注意:这个时候,线程 1 并没有被唤醒。 signal方法执行完毕,线程 2 调用reentrantLock.unlock()方法释放锁。这个时候因为 AQS 中只有线程 1,于是 AQS 释放锁后按从头到尾的顺序唤醒线程时,线程 1 被唤醒,线程 1 恢复执行。- 直到释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在 AQS 的等待队列 和 Condition 的等待队列 中来回移动实现的。Condition 作为一个条件类,很好地自己维护了一个等待信号的队列,并在适时的时候将结点加入到 AQS 的等待队列中来实现唤醒操作。
signal 方法深入分析
看到这里,signal 方法的代码应该不难理解了。取出头结点,然后执行 doSignal:
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) {
doSignal(first);
}
}
private void doSignal(Node first) {
do {
// 修改头结点,完成旧头结点的移出工作
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 将老的头结点,加入到 AQS 的等待队列中
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or attempt
* to set waitStatus fails, wake up to resync (in which case the
* waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
// 如果该结点的状态为 cancel 或者修改 waitStatus 失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}可以看到,正常情况下 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为 true 的,所以,不会在这个时候唤醒该线程。只有到发送 signal 信号的线程调用 reentrantLock.unlock() 后,因为它已经被加到 AQS 的等待队列中,所以才会被唤醒。
总结
本文从代码的角度说明了 Condition 的实现方式。其中涉及到了 AQS 的很多操作,比如 AQS 的等待队列实现独占锁功能。不过,这不是本文讨论的重点,后续有机会再将 AQS 的实现单独分享出来。
说明:本文基于 JDK 1.5+ 引入的 java.util.concurrent 包机制进行分析,核心原理在后续 JDK 版本(如 JDK 8、11 等)中保持一致。 版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/zen-me-li-jie-condition.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。