AbstractQueuedSynchronizer源码解析之ReentrantLock
前言
在 JDK 1.8 中,AbstractQueuedSynchronizer(简称 AQS)拥有众多子类,如下图所示:


可以看到,公平锁与非公平锁(包括 ReentrantLock)都是基于 AQS 实现的。
- 公平锁(Fair Lock):线程抢占锁的顺序严格按照调用
lock方法的先后顺序,依次获取锁。 - 非公平锁(Unfair Lock):线程抢占锁的顺序不定。线程尝试获取锁时,无论是否有其他线程在等待,都先尝试直接获取。若失败则加入等待队列。这意味着后来者可能“插队”成功(但因为抢占锁失败而加入到等待队列的线程不能参与下一次抢占,直到被 unpark)。
AQS 核心结构
AQS 通过内置的 FIFO 同步队列来完成资源获取线程的排队工作。如果当前线程获取同步状态失败(即获取锁失败),AQS 会将当前线程及等待状态等信息构造成一个节点(Node)并加入同步队列,同时阻塞当前线程。当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
类结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
// 同步器队列头结点
private transient volatile Node head;
// 同步器队列尾结点
private transient volatile Node tail;
// 同步状态(当 state 为 0 时,无锁;当 state > 0 时,说明有锁)
private volatile int state;
// 获取锁状态
protected final int getState() {
return state;
}
// 设置锁状态
protected final void setState(int newState) {
state = newState;
}
// ... 其他方法
}通过 AQS 的类结构可以看到,其内部核心包含一个队列和一个 state 变量:
- 队列:通过双向链表实现,用于存储等待获取锁的线程。
- state:锁的状态标志。
- volatile 关键字:
head、tail和state都是volatile类型的变量,保证了多线程环境下的内存可见性。
同步队列的基本结构如下:
Node 节点结构
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 表示当前的线程被取消
static final int CANCELLED = 1;
// 表示当前节点的后继节点包含的线程需要运行,也就是 unpark
static final int SIGNAL = -1;
// 表示当前节点在等待 condition,也就是在 condition 队列中
static final int CONDITION = -2;
// 表示当前场景下后续的 acquireShared 能够得以执行
static final int PROPAGATE = -3;
// 表示节点的状态。默认为 0,表示当前节点在 sync 队列中,等待着获取锁
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 获取锁的线程
volatile Thread thread;
// 存储 condition 队列中的后继节点
Node nextWaiter;
}Node 节点中除了存储当前线程、节点类型、队列中前后元素的引用外,还有一个关键变量 waitStatus。该变量用于描述节点的状态。由于 AQS 队列中存在并发,队列中会存取一定数量的节点,每个节点代表了一个线程的状态。有的线程可能因超时或中断需要放弃竞争(取消),有的线程在等待条件满足(Condition)。因此需要一个变量来描述这些状态,waitStatus 主要有以下几种状态:
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;- CANCELLED (1):因为超时或者中断,节点会被设置为取消状态。被取消状态的节点不应再去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的节点会被踢出队列,最终被 GC 回收。
- SIGNAL (-1):表示这个节点的后继节点被阻塞了。当自己释放锁或取消的时候,需要通知(unpark)后继节点。
- CONDITION (-2):表示这个节点在条件队列中,因为等待某个条件而被阻塞。它不会被作为同步节点直到其状态被置为 0。
- PROPAGATE (-3):使用在共享模式下,头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播。
- 0 (None):新结点会处于这种状态,表示正常等待中。
应用之——公平锁
公平锁的获取锁
公平锁的 lock 方法直接调用 AQS 的 acquire 方法:
final void lock() {
acquire(1);
}进入 AQS 的 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 自己给自己设置了一个中断标志,只有之前被中断过,前面才会返回 true,这里相当于重新设置一个中断标志
}从方法语义上看,尝试获取一把锁。如果获取不到(tryAcquire 返回 false),则尝试将该线程封装成 Node 对象放入等待队列中(利用 && 实现了短路性质)。
tryAcquire 方法在 AQS 中是留空的,留给子类实现:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}设计理念:既然要给子类实现,通常应该用抽象方法。但是 Doug Lea 没有这么做,原因是 AQS 有两种功能(独占与共享),面向两种使用场景。如果都需要给子类定义的方法都是抽象方法,会导致子类无论如何都需要实现另外一种场景的抽象方法,这对子类来说是不友好的。
以下是 FairSync(公平锁内部类)的 tryAcquire 方法实现:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 获取当前线程
int c = getState(); // 获取父类 AQS 中的标志位
if (c == 0) { // 0 意味着锁空闲
if (!hasQueuedPredecessors() && // 如果前面没有已在队列中的线程!
compareAndSetState(0, acquires)) { // 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来的,值为 1
setExclusiveOwnerThread(current); // 如果通过 CAS 操作将状态位更新成功,则代表当前线程获取锁,将当前线程设置到 AQS 的一个变量中
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果不为 0 意味着,锁已经被拿走了。因为 ReentrantLock 是重入锁,判断获取锁的线程是不是当前请求锁的线程
int nextc = c + acquires; // 如果是的,累加在 state 字段上就可以了
if (nextc < 0) // 重入次数太多,溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}其中 hasQueuedPredecessors 方法用于判断队列中是否有前驱节点(保证公平性):
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}到此,如果获取锁,tryAcquire 返回 true,反之返回 false。回到 AQS 的 acquire 方法,如果没有获取到锁,按照逻辑应将当前线程放到队列中去。在入队之前,需要做些包装:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}addWaiter 方法用于创建节点并加入队列:
/**
* Creates and enqueues node for current thread and given mode.
* 用当前线程去构造一个 Node 对象,mode 是一个表示 Node 类型的字段,
* 仅仅表示这个节点是独占的,还是共享的。
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}创建好节点后,将节点加入到队列尾部。此处,在队列不为空的时候,先尝试通过 CAS 方式修改尾节点为最新的节点。如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构。队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):

黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail 节点)后面。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 前任节点
if (p == head && tryAcquire(arg)) { // 如果当前的节点的前一个节点是 head,说明他是队列中第一个“有效的”节点,因此尝试获取锁
setHead(node); // 成功后,将上图中的黄色节点移除,node 变成头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果 p 节点不是头节点,或者 tryAcquire 返回 false,说明请求失败。
// 那么首先需要判断请求失败后 node 节点是否应该被阻塞,如果应该
// 被阻塞,那么阻塞 node 节点,并检测中断状态。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 如果需要,借助 JUC 包下的 LockSupport 类的静态方法 park 挂起当前线程,直到被唤醒
interrupted = true;
}
} finally {
if (failed) // 如果有异常
cancelAcquire(node); // 取消请求,对应到队列操作,就是将当前节点从队列中移除
}
}判断是否应该被挂起(当且仅当前一个节点是 SIGNAL 状态时,才会 return true,这之后才会执行挂起方法;若返回 false,则跳出上面的 if 判定,会继续自旋):
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果这一步不满足,都会 return false
/*
* 前一节点已经设置了 signal 状态,释放锁后会通知后继节点
* 所以可以安全挂起
*/
return true;
if (ws > 0) {
/*
判断如果前驱节点状态为 CANCELLED,那就一直往前找,直到找到最近一个正常等待的状态
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 放置在其后面
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前驱节点正常,则修改前驱节点状态为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}关于状态设置的疑问:
- 为什么不关心是否成功却还要设置呢?
- 如果设置失败,表示前驱已经被 signal 了。如果前驱是 head,说明有机会获取锁,所以返回 false 后还可以再次
tryAcquire。 - 如果设置成功,表示前驱等待 signal。如果再次确认
pred.waitStatus仍然是Node.SIGNAL,则表明前驱等待释放锁的情况下必须阻塞当前线程,所以返回 true 后即被 park。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 返回在这过程中是否被中断过,如果是,返回 true,同时将 interrupted 标志(acquireQueued 方法中)设为 true
}在 acquireQueued 方法中,当前线程通过自旋的方式来尝试获取同步状态:
- 当前节点的前驱节点为头节点才能尝试获得锁。如果获得成功,则把当前线程设置成头结点,把之前的头结点从队列中移除,等待垃圾回收(没有对象引用)。
- 如果获取锁失败,则进入
shouldParkAfterFailedAcquire方法中检测当前节点是否可以被安全的挂起(阻塞)。如果可以安全挂起,则进入parkAndCheckInterrupt方法,把当前线程挂起,并将interrupted置为true。 - 挂起的线程被唤醒后,继续在
acquireQueued的for死循环中运行,直到获取锁后可以跳出循环。
待验证结论:没有获得锁的线程,有可能在下次获得锁之前都不会被挂起。因为无法被安全挂起时,会把节点移到前驱节点为 SIGNAL 的节点后(这样下次就可以安全挂起了),然后再进行自旋。有可能在自旋的过程中,该节点已经成为了头结点后的节点,因此可以直接尝试获取锁而避免被挂起。公平锁的释放锁阶段
(非公平锁调用的也是同一个方法)
public void unlock() {
sync.release(1);
}public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 0 是初始值,不对应某个具体状态,所以不能为 0
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}unlock 方法调用了 AQS 的 release 方法,同样传入了参数 1。和获取锁相对应,获取一个锁,标志位 +1;释放一个锁,标志位 -1。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁,即当前锁是 free 状态
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}释放锁成功后,找到 AQS 的头节点,并唤醒它即可:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
/*
* 为什么不关心是否成功却还要设置呢?
*
* 注意这里的 Node 实际就是 head
*
* 如果设置成功,即 head.waitStatus=0,则可以让这时即将被阻塞的线程有机会再次调用 tryAcquire 获取锁。
* 也就是让 shouldParkAfterFailedAcquire 方法里的 compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 执行失败返回 false,这样就能再有机会再 tryAcquire 了
*
* 如果设置失败,新跟随在 head 后面的线程被阻塞,但是没关系,下面的代码会立即将这个阻塞线程释放掉
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,但仍是找离 node 最近的一个有效节点(也就是它的继承者)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}应用之——非公平锁
其实,非公平锁和公平锁的唯一区别就是获取锁的方式不同:一个是按前后顺序依次获取锁,一个是抢占式的获取锁。
获取锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}非公平锁的 lock 方法处理方式是:在 lock 的时候先直接 CAS 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。
关于具体场景的解析,可参考:关于具体场景的解析
当等待队列中,某个结点内的线程被唤醒后,它参与到锁竞争中。同它一起竞争的有可能是之前没有竞争失败过的线程。等待队列中的其他结点,不会参与到竞争中。
说明
- 本文基于 JDK 1.8 版本源码进行分析。
- 文中代码片段为核心逻辑摘录,实际源码可能包含更多细节处理。
- AQS 是 Java 并发包(J.U.C)的基石,理解其原理有助于深入掌握
ReentrantLock、CountDownLatch等同步工具。
版权声明:本文为原创文章,版权归 戴老师的博客 所有,转载请联系博主获得授权。
本文地址:https://1diff.fun/archives/abstractqueuedsynchronizer-yuan-ma-jie-xi-zhi-reentrantlock.html
如果对本文有什么问题或疑问都可以在评论区留言,我看到后会尽量解答。