什么是AQS
AQS:全称是AbstractQueuedSynchronizer,是并发容器JUC下locks包内的一个类。实现了CLH同步队列【FIFO的双向链表】
AQS框架架构图:
image.png
图解:
- 图中有颜色的为方法(Method),无颜色的为属性(Attribution)
- AQS框架共分为5层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据
- 当自定义同步器接入时,只需要重写第一层所需要的部分方法即可,不需要关注底层具体实现流程
- 当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着处理获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方法均依赖于第五层的基础数据提供层
AQS的原理
- AQS的核心思想:
如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。
- 等待唤醒机制:该唤醒机制主要是用的CLH队列的变体实现的
- CLH队列:全称Craig-Landin-Hagersten单向队列,在AQS中实现的是虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配
image.png
- 同步状态state:
- 高16位:记录读锁的持有次数【读线程数*每个线程的重入次数】
低16位:记录写锁的持有次数及持有线程的标识
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count. */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
- AQS使用一个volatile修饰的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对state值的修改
// java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
state字段结构:【在共享锁中才会有此区分】
接下来通过源码,分析获取锁的流程
非公平锁的实现
非公平锁源码的加锁流程如下:基于JDK17
//// java.util.concurrent.locks.ReentrantLock#NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
/**
* Acquire for non-reentrant cases after initialTryLock prescreen
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
代码说明
- 若通过CAS设置变量state成功,即代表获取锁成功,则将当前线程设置为独占线程;
- 若通过CAS设置变量state失败,即代表获取锁失败,则需要判断当前线程是否已经独占线程;
- 若是独占线程,则将state+1
- 若获取锁失败,且当前线程非独占线程,则获取锁失败
获取锁失败就会加入到CLH队列中
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
acquire(1);
}
公平锁的实现
公平锁源码的加锁流程如下:基于JDK17
// java.util.concurrent.locks.ReentrantLock#FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Acquires only if reentrant or queue is empty.
*/
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
/**
* Acquires only if thread is first waiter or empty
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
代码说明
- 如果锁状态为0,则表示未被任何线程持有
- 判断是否有线程在等待队列中,如果没有等待线程且通过CAS将变量state设置为成功,则当前线程设置为独占线程【插队(有等待线程执行等待线程,当前线程会被加入到等待队列;没有等待线程执行当前线程)】
- 若没有等待线程或通过CAS设置变量state失败,则需要判断当前线程是否已经独占线程【重入的场景】;
- 若当前线程是独占线程【已经获取了锁】,则将state+1【尝试增加锁的持有计数】
- 若获取锁失败,且当前线程非独占线程,则获取锁失败
获取锁失败就会加入到CLH队列中【同上】
CLH详解
上述其实对CLH有了初步的了解,接下来则是针对AQS的源码实现来分析线程加入CLH的逻辑
线程加入等待队列
当执行acquire(1);时,会通过tryAcquire(arg)尝试去获取锁。这种情况下如果获取锁失败,则会返回false。
如果尝试获取锁失败,则会触发先成功加入队列的操作(final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time)
),如下源码所示:
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread【spins:自旋次数,减少不公平性;postSpins:控制自旋重试的次数】
boolean interrupted = false, first = false;//interrupted标记当前线程是否被中断,first表示当前节点是否是队列中的第一个有效节点
Node pred = null; // predecessor of node when enqueued【当前节点的前驱节点】
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
//自旋循环,直到成功获取锁或取消
for (;;) {
//前驱节点有效性检查与队列清理
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {//如果前驱节点存在,且不是头结点
if (pred.status < 0) {//如果前驱节点pred的状态小于0,则表示溢出
cleanQueue(); // predecessor cancelled【清理队列,处理异常的前驱节点等情况】
continue;
} else if (pred.prev == null) {//如果前驱节点pred没有前驱节点==>pred就是队列的头节点,则要进行自旋等待【内存序列化】
Thread.onSpinWait(); // ensure serialization
continue;
}
}
//尝试获取锁
if (first || pred == null) {如果当前节点是队列的头节点,或者前驱节点pred为null【尚未入队】
boolean acquired;//用于标记是否成功获取资源
try {
if (shared)//是否共享模式
acquired = (tryAcquireShared(arg) >= 0);//共享模式下,调用tryAcquireShared方法,arg参数控制获取共享量,返回值大于等于0则表示获取成功
else
acquired = tryAcquire(arg);//独占模式下,调用tryAcquire方法尝试获取资源
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);//一旦发生异常,则取消当前节点的获取操作,并抛出异常
throw ex;
}
if (acquired) {//如果成功获取资源
if (first) {//如果当前节点是队列头节点
node.prev = null;//设置前驱指针为null,脱离原队列关系
head = node;//更新头节点
pred.next = null;//断开前驱的next指针
node.waiter = null;//清除当前节点的等待线程信息
if (shared)
signalNextIfShared(node);//尝试唤醒当前节点中的线程
if (interrupted)
current.interrupt();//如果当前线程处于中断状态,则恢复中断状态
}
return 1;//返回成功
}
}
//节点初始化与线程入队
if (node == null) { // allocate; retry before enqueue【节点未创建,根据模式创建】
if (shared)
node = new SharedNode();//创建共享结点
else
node = new ExclusiveNode();//创建独占结点
} else if (pred == null) { // try to enqueue【尝试将节点加入队列尾部】
node.waiter = current;//设置节点的waiter字段为当前线程
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence【设置前驱节点(避免内存屏障)】
if (t == null)//如果队列未初始化
tryInitializeHead();//尝试初始化队列头节点
else if (!casTail(t, node))//CAS更新尾结点失败
node.setPrevRelaxed(null); // back out【回滚前驱节点设置】
else
t.next = node;//前驱节点的next指向当前节点
}
//自旋与阻塞控制
else if (first && spins != 0) {//减少重试时的不公平性
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();//提示CPU进行忙等待优化
} else if (node.status == 0) {//如果节点状态未设置
node.status = WAITING; // enable signal and recheck【标记节点为等待状态(可被唤醒)】
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);//指数退避,增加重试次数
if (!timed)//非超时模式
LockSupport.park(this);//阻塞当前线程
else if ((nanos = time - System.nanoTime()) > 0L)//计算剩余纳秒
LockSupport.parkNanos(this, nanos);//超时阻塞
else
break;//超时则退出循环
node.clearStatus();//清除节点状态
if ((interrupted |= Thread.interrupted()) && interruptible)//检查中断
break;//中断且允许中断,退出循环
}
}
//取消获取逻辑
return cancelAcquire(node, interrupted, interruptible);
}
代码说明:
1、 前驱节点检查与队列清理:如果当前节点非头节点,且前驱存在,检查前驱状态:若已取消则清理队列;若前驱未正确链接则等待队列稳定。
2、 尝试获取锁:若当前节点是头节点或者未入队,调用ryAcquire
/tryAcquireShared
尝试获取锁。成功则更新队列头节点并唤醒后续线程。
3、 节点初始化与入队:若节点未创建,根据共享/独占模式创建对应节点;若未入队,通过CAS操作将节点加入队列尾部。
4、 自旋与阻塞控制:通过spins
和postSpins
减少锁竞争的不公平性,最终通过LockSupport.park
阻塞线程,支持超时和中断响应。
5、 取消获取处理:若超时或中断,调用cancelAcquire
清理节点状态并返回相应结果。
private int cancelAcquire(Node node, boolean interrupted,
boolean interruptible) {
if (node != null) {//node==null,是无效节点
node.waiter = null;
node.status = CANCELLED;//把当前 node 的状态设置为 CANCELLED
if (node.prev != null)
cleanQueue();
}
if (interrupted) {
if (interruptible)
return CANCELLED;
else
Thread.currentThread().interrupt();
}
return 0;
}
//取消队列
private void cleanQueue() {
for (;;) { // restart point
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
break; // inconsistent
if (q.status < 0) { // cancelled
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
q.prev == p) {
p.casNext(q, s); // OK if fails
if (p.prev == null)
signalNext(p);
}
break;
}
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
p.casNext(n, q);
if (p.prev == null)
signalNext(p);
}
break;
}
s = q;
q = q.prev;
}
}
}
flowchart TD
A[开始] --> B{当前节点是否为第一个?}
B -->|否| C{前驱节点是否被取消?}
C -->|是| D[清理队列]
D --> B
C -->|否| E{是否需要尝试获取锁?}
E -->|是| F{尝试获取锁成功?}
F -->|是| G[更新队列状态并返回成功]
G --> H[结束]
F -->|否| I{节点是否为空?}
I -->|是| J[创建新节点]
J --> K{是否需要入队?}
K -->|是| L[尝试入队]
L --> M{是否需要重试或等待?}
M -->|重试| N[减少重试次数]
N --> M
M -->|等待| O{是否超时或中断?}
O -->|否| P[等待唤醒]
P --> M
O -->|是| Q[取消获取]
Q --> H
B -->|是| E
CLH的基本原理:
1、 使用FIFO保证公平性
2、 有当前节点和前置节点,当前节点不断自旋,查询前置节点的状态
3、 前置节点与当前节点构成队列
4、 当前节点运行完成后,更改自己的状态,监听当前节点状态的线程就会结束自旋
以上就是整个加锁的流程
如何解锁
由于ReentrantLock在解锁时,并不区分公平锁和非公平锁
public void unlock() {
sync.release(1);//利用公平锁和非公平锁的父类来定义可重入锁的释放机制
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//尝试释放锁,返回true说明该锁没有被任何线程持有
signalNext(head);//唤醒当前节点的后继
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {//返回结果:方法返回当前锁是否被线程持有
int c = getState() - releases;//减少可重入锁的次数
//当前线程不是持有锁的线程
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
//如果持有锁的线程全部释放,将当前独占锁所有现成设置为null,并更新state
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {//如果后继节点存在,且状态为等待
s.getAndUnsetStatus(WAITING);//清除其等待状态
LockSupport.unpark(s.waiter);//唤醒该线程
}
}
这就是加锁+解锁的全流程,具体可以参考美团技术团队的这篇文章:美团技术年货-后台篇.pdf