怎么理解 Condition

java.util.concurrent 包中,ConditionReentrantLock 是两个非常特殊的工具类。使用过的人都知道,ReentrantLock(重入锁)是 JDK 并发包提供的一种独占锁实现。它基于 Doug Lea 实现的 AbstractQueuedSynchronizer(同步器,简称 AQS)。确切地说,是 ReentrantLock 的一个内部类继承了 AbstractQueuedSynchronizer,而 ReentrantLock 本身只是代理了该内部类的一些方法。

有人可能会问,为什么要使用内部类再包装一层?主要是出于安全与封装的考虑。因为 AbstractQueuedSynchronizer 中包含很多方法,还实现了共享锁、Condition 等功能。如果直接让 ReentrantLock 继承它,很容易导致 AbstractQueuedSynchronizer 中的 API 被误用或暴露不必要的接口。

言归正传,今天我们来讨论 Condition 工具类的实现。

使用示例

ReentrantLockCondition 的典型使用方式如下:

public static void main(String[] args) {
    final ReentrantLock reentrantLock = new ReentrantLock();
    final Condition condition = reentrantLock.newCondition();

    Thread thread = new Thread(() -> {
        try {
            reentrantLock.lock();
            System.out.println("我要等一个新信号" + this);
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("拿到一个信号!!" + this);
        reentrantLock.unlock();
    }, "waitThread1");

    thread.start();

    Thread thread1 = new Thread(() -> {
        reentrantLock.lock();
        System.out.println("我拿到锁了");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        condition.signalAll();
        System.out.println("我发了一个信号!!");
        reentrantLock.unlock();
    }, "signalThread");

    thread1.start();
}

运行后,结果如下:

我要等一个新信号 lock.ReentrantLockTest$1@a62fc3
我拿到锁了
我发了一个信号!!
拿到一个信号!!

可以看到,Condition 的执行方式是:当在线程 1 中调用 await 方法后,线程 1 将释放锁,并且将自己挂起(沉睡),等待唤醒;线程 2 获取到锁后,开始执行任务,完毕后调用 Conditionsignal 方法,唤醒线程 1,线程 1 恢复执行。

以上说明 Condition 是一个多线程间协调通信的工具类,使得某个或某些线程一起等待某个条件(Condition)。只有当该条件具备(signal 或者 signalAll 方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。

实现原理

那么,它是如何实现的呢?

首先要明白,reentrantLock.newCondition() 返回的是 Condition 的一个实现。该类在 AbstractQueuedSynchronizer 中被实现,方法名为 newCondition()

public Condition newCondition() {
    return sync.newCondition();
}

它可以访问 AbstractQueuedSynchronizer 中的方法和其余内部类(AbstractQueuedSynchronizer 是个抽象类,至于内部类如何访问,这里涉及内部类的访问机制)。

为了方便书写,下文将 AbstractQueuedSynchronizer 缩写为 AQS

await 方法分析

await 被调用时,代码如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 将当前线程包装后,添加到 Condition 自己维护的一个链表中
    Node node = addConditionWaiter(); 
    
    // 释放当前线程占有的锁。从 demo 中看到,调用 await 前,当前线程是占有锁的
    int savedState = fullyRelease(node);

    int interruptMode = 0;
    // 释放完毕后,遍历 AQS 的队列,看当前节点是否在队列中
    while (!isOnSyncQueue(node)) {
        // 不在说明它还没有竞争锁的资格,所以继续将自己沉睡
        // 直到它被加入到队列中(在 signal 的时候加入)
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 被唤醒后,重新开始正式竞争锁。同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

线程协作流程

回到上面的 demo,锁被释放后,线程 1 开始沉睡。这个时候,因为线程 1 沉睡时会唤醒 AQS 队列中的头结点,所以线程 2 会开始竞争锁并获取到。等待 3 秒后,线程 2 会调用 signal 方法,“发出”信号。signal 方法如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    
    // firstWaiter 为 condition 自己维护的一个链表的头结点
    // 取出第一个节点后开始唤醒操作
    Node first = firstWaiter; 
    if (first != null)
        doSignal(first);
}

说明一下,其实 Condition 内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待 signal 信号的线程。该线程被封装为 Node 节点后存放于此:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    
    /** First node of condition queue. */
    private transient Node firstWaiter;
    
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    // ...
}

关键就在于此。我们知道 AQS 自己维护的队列是当前等待资源的队列,AQS 会在资源被释放后,依次唤醒队列中从前到后的所有节点,使它们对应的线程恢复执行,直到队列为空。

Condition 自己也维护了一个队列,该队列的作用是维护一个等待 signal 信号的队列。两个队列的作用是不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个。流程是这样的:

  1. 线程 1 调用 reentrantLock.lock 时,线程被加入到 AQS 的等待队列 中。
  2. 线程 1 调用 await 方法时,该线程从 AQS 中移除(对应操作是锁的释放)。
  3. 接着马上被加入到 Condition 的等待队列 中,意味着该线程需要 signal 信号。
  4. 线程 2 因为线程 1 释放锁的关系,被唤醒,并判断可以获取锁,于是线程 2 获取锁,并被加入到 AQS 的等待队列 中。
  5. 线程 2 调用 signal 方法,这个时候 Condition 的等待队列中只有线程 1 一个节点,于是它被取出来,并被加入到 AQS 的等待队列 中。注意:这个时候,线程 1 并没有被唤醒。
  6. signal 方法执行完毕,线程 2 调用 reentrantLock.unlock() 方法释放锁。这个时候因为 AQS 中只有线程 1,于是 AQS 释放锁后按从头到尾的顺序唤醒线程时,线程 1 被唤醒,线程 1 恢复执行。
  7. 直到释放锁整个过程执行完毕。

可以看到,整个协作过程是靠结点在 AQS 的等待队列Condition 的等待队列 中来回移动实现的。Condition 作为一个条件类,很好地自己维护了一个等待信号的队列,并在适时的时候将结点加入到 AQS 的等待队列中来实现唤醒操作。

signal 方法深入分析

看到这里,signal 方法的代码应该不难理解了。取出头结点,然后执行 doSignal

public final void signal() {
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    Node first = firstWaiter;
    if (first != null) {
        doSignal(first);
    }
}

private void doSignal(Node first) {
    do {
        // 修改头结点,完成旧头结点的移出工作
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && // 将老的头结点,加入到 AQS 的等待队列中
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or attempt
     * to set waitStatus fails, wake up to resync (in which case the
     * waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果该结点的状态为 cancel 或者修改 waitStatus 失败,则直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

可以看到,正常情况下 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为 true 的,所以,不会在这个时候唤醒该线程。只有到发送 signal 信号的线程调用 reentrantLock.unlock() 后,因为它已经被加到 AQS 的等待队列中,所以才会被唤醒。

总结

本文从代码的角度说明了 Condition 的实现方式。其中涉及到了 AQS 的很多操作,比如 AQS 的等待队列实现独占锁功能。不过,这不是本文讨论的重点,后续有机会再将 AQS 的实现单独分享出来。

说明:本文基于 JDK 1.5+ 引入的 java.util.concurrent 包机制进行分析,核心原理在后续 JDK 版本(如 JDK 8、11 等)中保持一致。