同步状态

AQS(AbstractQueuedSynchronizer)基于 CLH 队列变体实现,该队列由若干结点构成。如前所述,结点中包含一个与线程状态密切相关的状态位 waitStatus。这是一个 32 位的整型常量,其取值定义如下:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

各状态值的含义如下:

  • CANCELLED (1):表示结点因超时或中断已被取消。处于该状态的结点不应再竞争锁,只能保持取消状态直至被踢出队列并由 GC 回收。
  • SIGNAL (-1):表示后继结点已被阻塞,当前结点释放锁时需要通知(unpark)后继结点。
  • CONDITION (-2):表示结点位于条件队列中,因等待某个条件而被阻塞。
  • PROPAGATE (-3):主要用于共享模式。表示头结点处于该状态时,锁的下一次获取可以无条件传播。
  • 0 (Default):表示新结点初始状态,无上述特殊状态。

获取

AQS 的核心操作主要包括获取锁与释放锁。以下是相关的获取操作方法:

public final void acquire(int arg);
public final void acquireInterruptibly(int arg);
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg);
protected boolean tryAcquire(int arg); 
protected int tryAcquireShared(int arg);
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;        

获取操作的流程如下所示:

  1. 尝试获取锁:首先调用 tryAcquire 方法尝试获取锁。若成功,则整个获取操作结束;若失败,则进入步骤 2。

    • tryAcquire 方法在 AQS 中仅抛出不支持操作异常,具体实现由子类完成(如 ReentrantLock)。若获取成功,该方法返回 true
  2. 加入等待队列:若获取锁失败,则创建一个代表当前线程的结点并加入等待队列尾部。此过程通过 addWaiter 方法实现:

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    该方法首先尝试快速路径入队:若尾结点不为空,则设置当前结点的前驱为尾结点,并通过 CAS 更新尾指针。若成功则返回;若失败或队列为空(pred == null),则调用 enq 方法。

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    enq 方法采用自旋方式确保结点入队:

    • 若队列为空(tail == null),则创建一个傀儡结点作为头结点,头尾指针均指向该结点(仅在初始化时执行)。
    • 若队列非空,则通过 CAS 操作将当前结点插入到尾结点之后。若插入期间尾结点发生变化,则继续自旋直到成功。
  3. 挂起或获取锁:结点入队后,线程有两种状态:获取锁或挂起。若该结点不是头结点,则通过 shouldParkAfterFailedAcquire 方法判断是否应该挂起:

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    该方法逻辑如下:

    • 若前驱结点的 waitStatusSIGNAL,表示前驱结点释放锁时会通知当前结点,当前线程可以安全地挂起。
    • 若前驱结点已被取消(waitStatus > 0),则向前遍历跳过所有被取消的结点,直到找到一个有效结点作为前驱,并返回 false(表示本次不挂起,需重试)。
    • 若前驱结点状态为 0 或 PROPAGATE,则通过 CAS 将其状态设置为 SIGNAL,返回 false(表示需要重试以确保获取锁失败后再挂起)。

    若是头结点,当前线程会再次调用 tryAcquire 尝试获取锁。若成功,则将当前结点设置为头结点并返回;若失败,则循环尝试直至成功。至此,acquire 过程分析完毕。

释放

释放操作涉及以下方法:

public final boolean release(int arg); 
protected boolean tryRelease(int arg); 
protected boolean tryReleaseShared(int arg); 

release 方法的实现流程如下:

  1. 释放锁:首先调用 tryRelease 释放锁。若释放失败,直接返回;若成功,则进入步骤 2。
  2. 唤醒后继结点:释放锁成功后,需要唤醒等待队列中的后继结点,通过 unparkSuccessor 方法实现:

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
     
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    具体逻辑如下:

    1. 传入的 node 通常为头结点。首先检查其 waitStatus,若为负值,表示需要通知后继结点。由于即将唤醒后继,此处将该标志位清零。
    2. 查找后继结点:

      • 优先查看 node.next。若后继结点不为空且未被取消(waitStatus <= 0),则直接唤醒。
      • 若后继结点为空或已被取消,则需要寻找下一个可唤醒的结点。

    为何从尾结点向前遍历寻找后继?

    在 CLH 队列中,结点可能随时因中断被取消(waitStatus 设为 CANCELLED)。被取消的结点会被逻辑上“踢出”队列:其前驱结点的 next 指针会指向下一个非取消结点,而该取消结点的 next 指针指向自身。

    若从头向尾遍历,遇到 next 指向自身的取消结点会导致无法找到正确的后继。因此,从尾结点向前遍历(tail -> prev)可以跳过取消结点,找到最近的有效后继。

    关于取消结点的 next 指针指向自身的问题:

    • 为何不指向真正的 next 结点? 被取消的结点最终会被 GC 回收。若指向后继结点,会导致后继结点无法被回收(引用链未断)。
    • 为何不为 NULL? JDK 源码注释指出:The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue. 大致意思是为了简化 isOnSyncQueue 方法的判断逻辑。

    isOnSyncQueue 方法用于判断结点是否在同步队列中,实现如下:

    /**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    逻辑表明:若结点 next 不为空,则判定其在同步队列中。若取消结点的 nextnull,则会被误判为不在队列中,这与事实矛盾。因此,将取消结点的 next 指向自身是合理的设计。

参考资料

说明:本文源码分析基于 JDK 经典实现(JDK 1.6+),AQS 核心逻辑在后续版本中保持稳定,但具体实现细节可能随 JDK 版本迭代略有调整。