前言

在 JDK 1.8 中,AbstractQueuedSynchronizer(简称 AQS)拥有众多子类,如下图所示:

AQS 子类结构

AQS 同步器架构

可以看到,公平锁与非公平锁(包括 ReentrantLock)都是基于 AQS 实现的。

  • 公平锁(Fair Lock):线程抢占锁的顺序严格按照调用 lock 方法的先后顺序,依次获取锁。
  • 非公平锁(Unfair Lock):线程抢占锁的顺序不定。线程尝试获取锁时,无论是否有其他线程在等待,都先尝试直接获取。若失败则加入等待队列。这意味着后来者可能“插队”成功(但因为抢占锁失败而加入到等待队列的线程不能参与下一次抢占,直到被 unpark)。

AQS 核心结构

AQS 通过内置的 FIFO 同步队列来完成资源获取线程的排队工作。如果当前线程获取同步状态失败(即获取锁失败),AQS 会将当前线程及等待状态等信息构造成一个节点(Node)并加入同步队列,同时阻塞当前线程。当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

类结构

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    protected AbstractQueuedSynchronizer() { }
    
    // 同步器队列头结点
    private transient volatile Node head;
    
    // 同步器队列尾结点
    private transient volatile Node tail;
    
    // 同步状态(当 state 为 0 时,无锁;当 state > 0 时,说明有锁)
    private volatile int state;
    
    // 获取锁状态
    protected final int getState() {
        return state;
    }
    
    // 设置锁状态
    protected final void setState(int newState) {
        state = newState;
    }
    // ... 其他方法
}

通过 AQS 的类结构可以看到,其内部核心包含一个队列和一个 state 变量:

  • 队列:通过双向链表实现,用于存储等待获取锁的线程。
  • state:锁的状态标志。
  • volatile 关键字headtailstate 都是 volatile 类型的变量,保证了多线程环境下的内存可见性。

同步队列的基本结构如下:

同步队列结构

Node 节点结构

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    // 表示当前的线程被取消
    static final int CANCELLED =  1;
    // 表示当前节点的后继节点包含的线程需要运行,也就是 unpark
    static final int SIGNAL    = -1;
    // 表示当前节点在等待 condition,也就是在 condition 队列中
    static final int CONDITION = -2;
    // 表示当前场景下后续的 acquireShared 能够得以执行
    static final int PROPAGATE = -3;
    // 表示节点的状态。默认为 0,表示当前节点在 sync 队列中,等待着获取锁
    volatile int waitStatus;
    
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 获取锁的线程
    volatile Thread thread;
    // 存储 condition 队列中的后继节点
    Node nextWaiter;
}

Node 节点中除了存储当前线程、节点类型、队列中前后元素的引用外,还有一个关键变量 waitStatus。该变量用于描述节点的状态。由于 AQS 队列中存在并发,队列中会存取一定数量的节点,每个节点代表了一个线程的状态。有的线程可能因超时或中断需要放弃竞争(取消),有的线程在等待条件满足(Condition)。因此需要一个变量来描述这些状态,waitStatus 主要有以下几种状态:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

waitStatus 状态图

  • CANCELLED (1):因为超时或者中断,节点会被设置为取消状态。被取消状态的节点不应再去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的节点会被踢出队列,最终被 GC 回收。
  • SIGNAL (-1):表示这个节点的后继节点被阻塞了。当自己释放锁或取消的时候,需要通知(unpark)后继节点。
  • CONDITION (-2):表示这个节点在条件队列中,因为等待某个条件而被阻塞。它不会被作为同步节点直到其状态被置为 0。
  • PROPAGATE (-3):使用在共享模式下,头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播。
  • 0 (None):新结点会处于这种状态,表示正常等待中。

应用之——公平锁

公平锁的获取锁

公平锁的 lock 方法直接调用 AQS 的 acquire 方法:

final void lock() {
    acquire(1);
}

进入 AQS 的 acquire 方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt(); // 自己给自己设置了一个中断标志,只有之前被中断过,前面才会返回 true,这里相当于重新设置一个中断标志
}

从方法语义上看,尝试获取一把锁。如果获取不到(tryAcquire 返回 false),则尝试将该线程封装成 Node 对象放入等待队列中(利用 && 实现了短路性质)。

tryAcquire 方法在 AQS 中是留空的,留给子类实现:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

设计理念:既然要给子类实现,通常应该用抽象方法。但是 Doug Lea 没有这么做,原因是 AQS 有两种功能(独占与共享),面向两种使用场景。如果都需要给子类定义的方法都是抽象方法,会导致子类无论如何都需要实现另外一种场景的抽象方法,这对子类来说是不友好的。

以下是 FairSync(公平锁内部类)的 tryAcquire 方法实现:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread(); // 获取当前线程
    int c = getState();  // 获取父类 AQS 中的标志位
    if (c == 0) {  // 0 意味着锁空闲
        if (!hasQueuedPredecessors() && // 如果前面没有已在队列中的线程!
            compareAndSetState(0, acquires)) { // 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来的,值为 1
            setExclusiveOwnerThread(current); // 如果通过 CAS 操作将状态位更新成功,则代表当前线程获取锁,将当前线程设置到 AQS 的一个变量中
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 如果不为 0 意味着,锁已经被拿走了。因为 ReentrantLock 是重入锁,判断获取锁的线程是不是当前请求锁的线程
        int nextc = c + acquires; // 如果是的,累加在 state 字段上就可以了
        if (nextc < 0) // 重入次数太多,溢出了
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

其中 hasQueuedPredecessors 方法用于判断队列中是否有前驱节点(保证公平性):

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

到此,如果获取锁,tryAcquire 返回 true,反之返回 false。回到 AQS 的 acquire 方法,如果没有获取到锁,按照逻辑应将当前线程放到队列中去。在入队之前,需要做些包装:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter 方法用于创建节点并加入队列:

/**
 * Creates and enqueues node for current thread and given mode.
 * 用当前线程去构造一个 Node 对象,mode 是一个表示 Node 类型的字段,
 * 仅仅表示这个节点是独占的,还是共享的。
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

创建好节点后,将节点加入到队列尾部。此处,在队列不为空的时候,先尝试通过 CAS 方式修改尾节点为最新的节点。如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构。队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):

队列内存结构

黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail 节点)后面。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // 前任节点
            if (p == head && tryAcquire(arg)) { // 如果当前的节点的前一个节点是 head,说明他是队列中第一个“有效的”节点,因此尝试获取锁
                setHead(node); // 成功后,将上图中的黄色节点移除,node 变成头节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果 p 节点不是头节点,或者 tryAcquire 返回 false,说明请求失败。
            // 那么首先需要判断请求失败后 node 节点是否应该被阻塞,如果应该
            // 被阻塞,那么阻塞 node 节点,并检测中断状态。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 如果需要,借助 JUC 包下的 LockSupport 类的静态方法 park 挂起当前线程,直到被唤醒
                interrupted = true;
        }
    } finally {
        if (failed) // 如果有异常
            cancelAcquire(node); // 取消请求,对应到队列操作,就是将当前节点从队列中移除
    }
}

判断是否应该被挂起(当且仅当前一个节点是 SIGNAL 状态时,才会 return true,这之后才会执行挂起方法;若返回 false,则跳出上面的 if 判定,会继续自旋):

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 如果这一步不满足,都会 return false
        /*
         * 前一节点已经设置了 signal 状态,释放锁后会通知后继节点
         * 所以可以安全挂起
         */
        return true;
    if (ws > 0) {
        /*
        判断如果前驱节点状态为 CANCELLED,那就一直往前找,直到找到最近一个正常等待的状态
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 放置在其后面
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 如果前驱节点正常,则修改前驱节点状态为 SIGNAL 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

关于状态设置的疑问:

  • 为什么不关心是否成功却还要设置呢?
  • 如果设置失败,表示前驱已经被 signal 了。如果前驱是 head,说明有机会获取锁,所以返回 false 后还可以再次 tryAcquire
  • 如果设置成功,表示前驱等待 signal。如果再次确认 pred.waitStatus 仍然是 Node.SIGNAL,则表明前驱等待释放锁的情况下必须阻塞当前线程,所以返回 true 后即被 park。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted(); // 返回在这过程中是否被中断过,如果是,返回 true,同时将 interrupted 标志(acquireQueued 方法中)设为 true
}

acquireQueued 方法中,当前线程通过自旋的方式来尝试获取同步状态:

  1. 当前节点的前驱节点为头节点才能尝试获得锁。如果获得成功,则把当前线程设置成头结点,把之前的头结点从队列中移除,等待垃圾回收(没有对象引用)。
  2. 如果获取锁失败,则进入 shouldParkAfterFailedAcquire 方法中检测当前节点是否可以被安全的挂起(阻塞)。如果可以安全挂起,则进入 parkAndCheckInterrupt 方法,把当前线程挂起,并将 interrupted 置为 true
  3. 挂起的线程被唤醒后,继续在 acquireQueuedfor 死循环中运行,直到获取锁后可以跳出循环。
待验证结论:没有获得锁的线程,有可能在下次获得锁之前都不会被挂起。因为无法被安全挂起时,会把节点移到前驱节点为 SIGNAL 的节点后(这样下次就可以安全挂起了),然后再进行自旋。有可能在自旋的过程中,该节点已经成为了头结点后的节点,因此可以直接尝试获取锁而避免被挂起。

公平锁的释放锁阶段

(非公平锁调用的也是同一个方法)

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 0 是初始值,不对应某个具体状态,所以不能为 0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unlock 方法调用了 AQS 的 release 方法,同样传入了参数 1。和获取锁相对应,获取一个锁,标志位 +1;释放一个锁,标志位 -1。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁,即当前锁是 free 状态
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

释放锁成功后,找到 AQS 的头节点,并唤醒它即可:

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    /* 
     * 为什么不关心是否成功却还要设置呢? 
     * 
     * 注意这里的 Node 实际就是 head 
     *  
     * 如果设置成功,即 head.waitStatus=0,则可以让这时即将被阻塞的线程有机会再次调用 tryAcquire 获取锁。 
     * 也就是让 shouldParkAfterFailedAcquire 方法里的 compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 执行失败返回 false,这样就能再有机会再 tryAcquire 了 
     * 
     * 如果设置失败,新跟随在 head 后面的线程被阻塞,但是没关系,下面的代码会立即将这个阻塞线程释放掉 
     */  
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,但仍是找离 node 最近的一个有效节点(也就是它的继承者)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

应用之——非公平锁

其实,非公平锁和公平锁的唯一区别就是获取锁的方式不同:一个是按前后顺序依次获取锁,一个是抢占式的获取锁。

获取锁

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

非公平锁的 lock 方法处理方式是:在 lock 的时候先直接 CAS 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

关于具体场景的解析,可参考:关于具体场景的解析

当等待队列中,某个结点内的线程被唤醒后,它参与到锁竞争中。同它一起竞争的有可能是之前没有竞争失败过的线程。等待队列中的其他结点,不会参与到竞争中。

说明

  • 本文基于 JDK 1.8 版本源码进行分析。
  • 文中代码片段为核心逻辑摘录,实际源码可能包含更多细节处理。
  • AQS 是 Java 并发包(J.U.C)的基石,理解其原理有助于深入掌握 ReentrantLockCountDownLatch 等同步工具。