一、AQS原理
1、AQS是什么?
它是用来构建锁或者其他同步组件的基础框架。
从它的名拆开来看:
Abstract:因为它不知道怎么上锁,所以采用
模板方法
暴露出上锁的逻辑。
AQS的主要使用方式是
继承
,子类通过继承AQS同步器并实现它的抽象方法来管理同步状态。比如:获取锁的方法(
tryAcquire()
)和释放锁的方法(tryRelease()
)- 子类调用AQS提供的模板方法时,模板方法会调用使用者重写的方法。
Queue:阻塞队列
- 通过内置的FIFO队列来完成线程的排队工作;即抢不到锁的线程来这排队。
Synchronizer:同步
- 使用一个int成员变量state表示同步状态;采用CAS完成多线程抢锁的逻辑。
子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS完成。
和真正的锁不同的是:AQS同步器是面向锁的实现者,而锁则是面向使用者。
2、AQS的核心:CLH队列
AQS里面的CLH队列是CLH同步锁的一种变形。AQS依赖内部的这个同步队列(FIFO)来完成同步状态的管理。
AQS的核心就是用CAS去操作这个同步队列的head和tail,也就是用CAS操作代替了锁整条双向链表的操作,所以它效率高。
**1>
当线程获取同步状态失败时**,AQS会将
< 当前线程、等待状态等信息>`构建成一个Node节点通过addWaiter()方法加入到阻塞队列的尾部,同时会阻塞当前线程。Node节点的内容:获取同步状态失败的线程引用、等待状态、前驱和后继节点等。
AQS的addWaiter()方法是基于CAS的设置尾节点,其中:
compareAndSetTail(Node expext, Node update),保证节点入队的线程安全性。
最后会在enq(fianl Node node)方法中,通过“死循环”来保证节点的正确添加。
- 2> 当同步状态释放时,会把首节点的第一个活跃的后继节点线程唤醒,使其再次尝试获取同步状态。
唤醒操作只需要将首节点设置成为原首节点的后续节点,断开原节点的next引用即可。
3、以独占锁同步状态的获取和释放为例:
- 在使用tryAcquire()方法获取同步状态时,AQS维护了一个同步阻塞队列,获取同步状态失败的线程都会通过CAS被加入到队列中并在队列中进行自旋。
- 线程移出队列(停止自旋)的前提是前驱节点是头节点且成功获取了同步状态。
- 在释放同步状态时,AQS调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
- 注意:维护同步队列的FIFO原则,只有前驱节点是头结点的节点才能获取同步状态。
4、从具体的代码实现来看
此处做一个总述,具体细节见后面的源码分析
1)互斥锁
<1>
acquire()获取互斥锁
acquire()方法中有一个模板方法tryAcquire(),用于给子类实现调用,判断是否获取锁成功。如果获取失败:
第一步先调用
addWaiter()
方法,通过CAS把当前线程节点封装成Node
阻塞节点,放到AQS的阻塞队列中;
addWaiter()
方法是基于CAS的设置尾节点,其中:
- compareAndSetTail(Node expext, Node update),保证节点入队的线程安全性。
- 最后会在enq(fianl Node node)方法中,通过“死循环”来保证节点的正确添加。
第二步,放到AQS队列中后,调用
acquireQueued()
方法尝试去获取一下锁,如果获取锁成功,则把当前线程节点设置为head节点。
- 如果获取锁失败,则调用
shouldParkAfterFailedAcquire()
方法,判断当前线程节点是否可以阻塞;- 即从当前线程节点一直往前找节点、直到找到一个SIGNAL的活跃节点,并把当前线程节点作为它的后继节点。
- 最后以响应中断的方式阻塞当前线程节点。
<2>
release()释放互斥锁
release()方法中有一个模板方法tryRelease(),用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
调用
unparkSuccessor()
方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
- 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从
tail
节点开始往前遍历找到第一有效的后继节点。- 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
2)共享锁
<3>
acquireShared()获取共享锁
acquireShared()方法中有一个模板方法tryAcquireShared(),用于给子类实现调用,判断是否获取锁成功,如果获取成功直接返回,否者:
调用
doAcquireShared()
方法阻塞当前线程节点,其中包括:
- 第一步:和
tryAcquire()
方法一样的调用addWaiter()
方法,通过CAS把当前线程节点封装成Node
阻塞节点,放到AQS的阻塞队列中;只是这里Node节点的类型是SHARED。第二步和
tryAcquire()
方法类似,尝试获取共享锁;
首先判断当前Node节点的前驱节点是不是头结点,如果是:
- 调用子类实现的
tryAcquireShared()
方法尝试获取锁,如果获取锁成功,则将当前Node节点设置为head节点,并且调用doReleaseShared()
方法唤醒后继的SHARED类型的共享Node节点。- 第三步:继续往下走,如果线程获取锁失败,则采用响应中断的方式使用
LockSupport.park(this)
方法阻塞当前线程。- 最后,如果在尝试获取锁的过程中失败,则调用
cancelAcquire()
方法取消当前线程尝试获取共享锁的操作。将节点从竞争队列中移除,节点状态设置为CANCELLED、引用的线程对象置为null。
<4>
releaseShared()释放共享锁
releaseShared()方法中有一个模板方法tryReleaseShared(),用于给子类实现调用,判断是否释放锁成功,如果失败直接返回FALSE,否者:
调用
doReleaseShared()
方法唤醒head节点的有效后继节点,即:从head节点开始往后找到第一个有效后继节点,唤醒。
- 不过,找后继节点时,如果head的第一个后继节点是个无效节点,需要从
tail
节点开始往前遍历找到第一有效的后继节点。- 因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
5、面试题2:AQS中线程Node节点的PROPAGATE状态是什么意思?
在共享锁中,线程调用doReleaseShared()
方法执行唤醒时,方法的代码段不是互斥的,所以为了保证线程的正确唤醒,也就有了这么一个中间状态。
以一个场景为例:Semaphore有两个许可,A线程和B线程都获取到了许可,C线程和D线程在等着。
- 假如A和B同时执行、释放锁,A在doReleaseShared()中CAS成功,将head节点B状态设置为0,并通过unparkSuccessor()方法去唤醒C。
- C被唤醒之后,它去抢锁,并且抢到锁了,在doAcquireShared()中有可能执行到了setHeadAndPropagate()方法去设置头结点为自己,也有可能没执行到,即可能将head节点从B更新到了C,可能还是B。这时我们再去看线程B释放锁的情况。
- 此时线程B他获取到的head节点就有两种情况:
head还是B
- 表明线程A释放了一个共享锁唤醒了线程C,且线程C还没有将head节点更新到C上,即此时的head节点还是B,并且head节点的线程状态是0;
- B线程通过
doReleaseShared()
又释放了一个共享锁,释放锁的过程中发现head节点B的状态是0,这时就会将head节点B的状态设置为PROPAGATE状态。如果没有设置为PROPAGATE状态这一步,在线程B调用
doReleaseShared()
方法释放锁的过程,直接就h == head
break退出循环了。即D不会由B唤醒,
- 此时给人的感觉就是:原本应该有两个信号量,这时只有一个了。
- head是C就正常走了。
二、AQS源码分析
1、acquire()获取互斥锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类中判断获取锁失败返回false,这里取反则为true
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter()方法为:获取锁失败后,加入到阻塞队列。
// 进入阻塞阻塞队列之后,考虑线程是否睡眠?
selfInterrupt();
}
// 子类实现获取锁的逻辑, AQS并不知道你用这个state来上锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1、addWaiter()方法
获取互斥锁失败后,首先调用addWaiter()
方法,通过CAS把当前线程节点封装成Node
阻塞节点,放到AQS的阻塞队列中;
addWaiter()方法中基于CAS的设置尾节点,将阻塞线程节点放入阻塞队列;
保证节点入队的线程安全性;
并采用全路径 +
优化前置
(把enq()方法中for循环里的一些判断前置到addWaiter()方法)的技巧,实现快速入队。
private Node addWaiter(Node mode) {
// 创建等待节点:当前线程对象 + 锁模式(互斥锁?共享锁?)
Node node = new Node(Thread.currentThread(), mode);
// 快速加入到队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//(面试重点,最难理解的地方)特别注意:当上面的CAS成功之后,有一瞬间pred.next并没有关联。这样会导致什么问题?
// 那一瞬间,我们通过head引用遍历的时候,是到达不了最后一个节点的,A(heade) -> B(旧tail) <- C(新tail)
// 如何获取到最新的节点? 直接通过tail指针往前遍历即可。
pred.next = node;
return node;
}
}
// "死循环"确保节点正确添加
enq(node);
return node;
}
// 全路径入队方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 懒加载时,tail 和head分别代表aqs的头尾节点
// 通过CAS实现原子初始化操作;当一个空节点实现初始化,
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS更新tail节点,期望是t,想改成node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
面试题3:为什么CAS可以保证线程安全?
正常将一个节点添加到队列尾部需要三个操作:
- 将tail节点的next引用指向newNode节点;
- 将newNode节点的prev引用指向tail节点;
- 将tail节点指针指向newNode节点。
- 那么CAS如何保证三个操作的原子性?
addWaiter()中的逻辑为:
- 先复制引用,在线程栈中引用tail节点;
- 将newNode节点的prev指向tail节点;
- CAS将tail指向newNode节点;
- 修改旧tail节点的next引用到newNode上。
- 这时,如果另外一个线程在第三步CAS失败,会for循环继续重试。直到CAS成功才会返回。
面试题4:是否存在通过head引用遍历不到最后一个节点的情况?
存在,因为:
在addWaiter()方法将newNode节点设置为新tail节点之后,才会将旧tail节点的next引用指向newNode。
- 如果在设置旧tail.next = newNode之前,我们通过head引用遍历,这一瞬间是遍历不到最后一个节点的。
如何获取到最新的节点?
- 直接通过tail指针往前遍历即可。
2、acquireQueued()方法
获取互斥锁失败后,其次调用acquireQueued()
方法尝试再获取一次锁,进而决定是否将当前线程进行阻塞。
// node节点为添加到阻塞队列中的线程节点
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前线程节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点(持有锁的线程节点),这是有可能头结点释放了锁,所以尝试tryAcquire()让子类抢一下锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,更新头结点;释放原头结点的next引用,帮助GC
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前驱节点不是头结点 或 抢锁失败,需要先判断线程是否应该被阻塞? 因为有可能上一瞬间抢锁失败,但是到这的时候头结点已经释放了锁。
// 如何阻塞呢? 调用parkAndCheckInterrupt()方法来阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 假如上面出现了任何异常,或者 没有获取到锁;取消获取锁操作
if (failed)
cancelAcquire(node);
}
}
面试题5:为什么添加到阻塞队列后,要先尝试获取锁?
因为如果添加到阻塞队列后,当前锁状态是无锁的话,就不会有活跃线程获取到锁。
1、shouldParkAfterFailedAcquire()方法
如果前驱节点不是头结点 或 抢锁失败,需要先调用shouldParkAfterFailedAcquire()
方法判断线程是否应该被阻塞?
- 也就是找到一个可靠的(活着有效的)节点,然后将当前线程节点作为其后继节点即可。
// pred为前驱节点,node为当前线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点是SIGNAL,此时当前线程节点可以安全的阻塞
// 为什么可以安全阻塞? 因为SIGNAL状态,代表了上一个线程是活的,它可以通知当前线程节点,所以当前节点是可以安全的阻塞了。
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 前驱节点是CALLCEL无效节点时,怎么办? 一直往前找,直到找到有效节点。
do {
// 取前驱节点的前驱节点,赋值给当前线程节点的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 更新 新的前驱节点的next指针指向
pred.next = node;
} else {
// 前驱节点是个正常节点时,使用CAS将其变为SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2、parkAndCheckInterrupt()方法
如果前驱节点不是头结点 或 抢锁失败,并且可以阻塞当前线程,则调用parkAndCheckInterrupt()
方法以响应式中断的方法阻塞当前线程。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 判断是否是通过中断的方式来唤醒。 包括:1、unpark 2、interrupt
return Thread.interrupted();
}
2、release()释放互斥锁
public final boolean release(int arg) {
// 子类尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null //
&& h.waitStatus != 0) // 那就是h的状态为SINGAL
// 唤醒阻塞队列中的后继线程
unparkSuccessor(h);
return true;
}
return false;
}
// 子类实现释放锁的逻辑,AQS并不知道。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
1、unparkSuccessor()方法
该方法用于释放node节点后的有效后继节点,即:从node节点开始往后找到有效后继节点,唤醒。
找后继节点时,如果后继节点是个无效节点,需要从tail开始往前遍历找到第一有效的后继节点。
因为因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以可能有一瞬间遍历不到新的tail节点。
private void unparkSuccessor(Node node) {
// 获取当前节点(head)的状态
int ws = node.waitStatus;
// ws的状态为SIGNAL
if (ws < 0)
// CAS将其变为0,代表了我当前已经响应了这一次的唤醒操作
compareAndSetWaitStatus(node, ws, 0);
// 取当前节点的后继节点,作为唤醒节点;但是注意条件
Node s = node.next;
if (s == null // 为什么后继节点可能为nul?因为我们往阻塞队列中添加线程节点时,首先更新的是tail引用,然后将旧的tail.next指向新tail,所以有可能一瞬间为空。
|| s.waitStatus > 0) {
// 后继节点是个无效节点!
// 将后继节点设置为空
s = null;
// 因为tail引用一定是最新的引用,所以从后往前找,一定能找到 第一个(node节点的第一个后继) 有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 更新后继节点
s = t;
}
// 此时的s就是要唤醒的节点
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
3、acquireShared()获取共享锁
public final void acquireShared(int arg) {
// 由子类判断是否获取锁成功,若不成功?则该阻塞阻塞去,<0表示获取锁失败
if (tryAcquireShared(arg) < 0)
// 阻塞当前线程节点
doAcquireShared(arg);
}
// 由子类实现获取锁的逻辑,返回值表示还剩几个可以获取的锁资源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
1、doAcquireShared()方法
获取共享锁失败后,首先调用doAcquireShared()
方法,阻塞当前线程节点;其中包括:
- 和
tryAcquire()
方法一样的调用addWaiter()
方法,通过CAS把当前线程节点封装成Node
阻塞节点,放到AQS的阻塞队列中;只是这里Node节点的类型是SHARED。- 和
tryAcquire()
方法类似,尝试获取共享锁;- 继续往下走,如果线程获取锁失败,则采用响应中断的方式使用
LockSupport.park(this)
方法阻塞当前线程。- 最后,如果在尝试获取锁的过程中失败,则调用
cancelAcquire()
方法取消当前线程尝试获取共享锁的操作。
private void doAcquireShared(int arg) {
// 和互斥锁一样,只是这里的是共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前驱节点是head节点时,可能头结点很快就释放锁了,尝试获取锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 获取锁成功;尝试唤醒后面的共享节点,因为共享锁是可以多线程共享的
if (r >= 0) {
// 将当前节点更新为head节点,然后唤醒后继节点
setHeadAndPropagate(node, r);
// p.next指针已经没有用了
p.next = null; // help GC
// 如果在等待过程中发生了中断,由于中断被唤醒,则会置位当前线程的中断标志位。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该阻塞获取锁失败的线程节点?因为有可能上一瞬间抢锁失败,但是到这的时候头结点已经释放了锁。
parkAndCheckInterrupt()) // 以响应中断的方式阻塞当前线程节点
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1、setHeadAndPropagate()方法
获取共享锁成功后,会调用setHeadAndPropagate()
方法将当前节点更新为head节点,并唤醒后继节点;
private void setHeadAndPropagate(Node node, int propagate) {
// 保存旧head结点的快照
Node h = head;
setHead(node);
if (propagate > 0 // 信号量有多余,则直接唤醒
|| h == null // 不可能发生
|| h.waitStatus < 0 // SIGNAL,表明必须唤醒后继节点,则直接唤醒
|| (h = head) == null // 不可能发生,但是将最新的head节点引用赋给h是有意义的
|| h.waitStatus < 0) {
// SIGNAL,表明必须唤醒后继节点,则直接唤醒
// 最新head节点的后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
在调用setHeadAndPropagate()
方法将当前节点更新为head节点,并唤醒后继节点时,如果当前节点没有后继节点了,则调用doReleaseShared()
方法释放共享锁;
doReleaseShared()
方法在下面的releaseShared()
方法中解析
4、releaseShared()释放共享锁
public final boolean releaseShared(int arg) {
// tryReleaseShared方法由子类实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
1、doReleaseShared()方法
private void doReleaseShared() {
//
for (;;) {
// 保存head节点快照
Node h = head;
// 头结点不为null 并且头节点不等于尾节点,说明链表中还有等待节点
if (h != null && h != tail) {
// 理应要唤醒后面的节点,因为SIGNAL的语义就是必须要唤醒后面的节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// CAS将线程节点状态由SINGNAL改为0,表示唤醒成功,如果唤醒失败呢?
// 唤醒失败,表明多个线程想同时释放共享锁,都需要唤醒后继节点,此时state的状态为2 表示可用资源数为2,需要唤醒后面两个等待的共享节点;失败就重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // Node.PROPAGATE = -3,如CAS成功,表明线程A释放了一个共享锁唤醒了线程C,且线程C还没有将head节点更新到C上,B线程通过doReleaseShared()又释放了一个共享锁,释放锁的过程中发现head节点B的状态是0,这时就会将head节点B的状态设置为PROPAGATE状态。
// 假如没有这个else if片段,B就不会唤醒D。感觉我有两个信号量,然而只有一个C运行了,D木得了。
continue; // loop on failed CAS
}
// 头结点没有被改变过,且h != null && h != tail 成功,退出循环
if (h == head)
break;
}
}
5、acquireInterruptibly()以响应中断的方式获取互斥锁
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 就比正常的多这一步,如果线程被中断了,直接抛出异常返回
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
总结
子类只需要实现自己的获取锁逻辑和释放锁逻辑即可,至于排队阻塞等待、唤醒机制均由AQS完成。