除了synchronized
关键字,悲观锁还有一种实现方式,即基于Java同步器AQS
的各种实现类,其中就有我们常用的ReentrantLock
类。
今天我们就来探究AQS与ReentrantLock
类,从源码讲解它们的实现机制。
- 1.AQS抽象队列同步器
- 1.1 AQS设计思想
- 1.2 AQS原理
- 2.ReentrantLock
- 2.1 ReentrantLock实现机制
- 2.2 内部同步器syn实现可重入锁
- 2.3 子类NonfairSync与FairSync实现公平与非公平策略
- 2.4 ReentrantLock的可中断锁
- 2.5 Condition条件变量
- 2.6 synchronized和ReentrantLock的区别
1.AQS抽象队列同步器
1.1 AQS设计思想
AQS(AbstractQueuedSynchronizer
)是 java.util.concurrent.locks
包中提供的一个用于构建锁和其他同步器的基础框架类。
AQS是一个抽象类,提供了通用的锁获取、释放逻辑(如 acquire()
、release()
),所有需要用到这些方法的类都要继承AQS类,通过tryAcquire()
/ tryRelease()
等方法来实现,这种设计模式就是模版方法模式。
1.2 AQS原理
它通过内部维护一个 原子整数 state
和一个 FIFO 的双向等待队列(CLH),为独占锁(如 ReentrantLock
)、共享锁(如 ReadWriteLock
)、信号量(Semaphore
)、闭锁(CountDownLatch
)等提供统一的同步原语。
(1)状态变量 state
state
是一个 volatile int
类型的字段,用来表示同步器当前的状态,它是所有同步操作的核心:
private volatile int state;
AQS 提供了以下操作方法:
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);
compareAndSetState
方法通过CAS 原子操作控制 state,保证线程安全。
这个 state
本身不包含“锁”的语义,它的具体含义是交给子类(比如 ReentrantLock、Semaphore)来定义的!在AQS的实现类中,获取锁和释放锁的方法中都会用到上面的三种方法。
不同的同步器,对 state
的解释不一样的,例如在 ReentrantLock
中,state 表示获取锁的次数(可重入),Semaphore
中则表示剩余许可证数量,CountDownLatch
中则表示倒计时计数器。
(2)双向 FIFO 同步队列(CLH)
AQS 的本质是:在获取锁失败时,将线程封装进一个队列中,有序排队挂起等待。这个“同步队列”必须:
- 支持高并发场景下的安全入队/出队
- 保证先来的线程先被唤醒(FIFO)
- 能高效地支持唤醒一个或多个线程
所以 AQS 使用了一种变种的 CLH 队列(Craig–Landin–Hagersten),实现为一个 双向链表的 FIFO 同步等待队列。
失败获取锁的线程,会被包装为 Node
对象加入队列,挂起等待;当锁释放后,会按照 FIFO 顺序唤醒队列中的下一个节点。
AQS 中的 Node
是个静态内部类,代表一个线程在等待队列中的状态。
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
waitStatus
表示节点状态
状态解释:
状态 | 含义 |
---|---|
SIGNAL (-1) | 当前节点的后继节点需要被唤醒(唤醒机制核心) |
CANCELLED (1) | 线程取消排队(中断或超时) |
CONDITION (-2) | 表示在 Condition 条件队列中 |
PROPAGATE (-3) | 表示共享式同步时需要传播 |
0 | 初始状态 |
prev
/next
: 构成双向链表thread
: 当前被封装的线程对象
多线程场景中,队列会发生什么变化?
(1)初始化:head
和 tail
为 null
;第一个线程竞争失败后调用 enq()
创建一个空的 head
节点;后续失败线程将调用addWaiter()
依次追加到队尾。
(2)挂起:shouldParkAfterFailedAcquire()
判断是否可以挂起,只有前驱节点的 waitStatus == SIGNAL
才允许挂起,通过 LockSupport.park(this)
让线程挂起,等待唤醒。
(3)唤醒:释放锁后,找到队列中第一个有效的后继节点,并使用 LockSupport.unpark()
唤醒线程,被唤醒的线程从阻塞状态恢复,再次尝试获取锁。
(4)中断与取消:如果线程被中断或主动退出,状态会设置为 CANCELLED
。取消节点不会被唤醒,AQS 会跳过它们继续向后传播。
[HEAD] <-> [T1 - SIGNAL] <-> [T2 - SIGNAL] <-> [T3 - SIGNAL] <-> [TAIL]
↑ ↑
head 当前尝试加锁的线程
上面的队列变化过程有点复杂,用一段话总结一下:
AQS 的同步队列本质是一个基于 Node 的双向 FIFO 链表,线程获取锁失败后进入队列尾部排队,通过 waitStatus
和CAS操作来控制挂起与唤醒,唤醒是从 head 节点向后传播的。
AQS 是一套模板式的并发控制框架,它的设计初衷就是为了提供两种通用的线程访问控制策略:
- 独占模式(Exclusive) :一次只能有一个线程获取资源,实现类有
ReentrantLock
、FutureTask
。 - 共享模式(Shared) :多个线程可以同时获取资源,实现类有CountDownLatch、ReentrantReadWriteLock.readLock()
2.ReentrantLock
2.1 ReentrantLock实现机制
ReentrantLock
是 Java 提供的可重入显式锁,实现于 JUC 包中。它内部基于 AQS实现。
我们来看下ReentrantLock
的内部实现:
public class ReentrantLock implements Lock, java.io.Serializable {
privatefinal Sync sync;
public ReentrantLock() {
sync = new NonfairSync(); // 默认非公平
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 加锁
public void lock() {
sync.lock();
}
// 可中断加锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁(立即返回 true/false)
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 超时加锁
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 解锁
public void unlock() {
sync.release(1);
}
// 查询
public boolean isLocked() {
return sync.isHeldExclusively();
}
// 当前线程持有锁的次数
public int getHoldCount() {
return sync.getHoldCount();
}
...
}
可以发现ReentrantLock
实现了Lock接口,这时候我脑子里有个疑问:既然ReentrantLock
基于AQS实现,那为什么没有继承AQS类,而是实现Lock接口呢?
(1)为什么不继承AQS?
因为 AQS 是一个同步器(同步状态管理工具),而不是一个锁本身。所以更合理的做法是把 AQS 当作一个工具类,ReentrantLock
内部封装了一个继承了AQS的内部类: Sync
,而且 Sync
还有两个子类:FairSync
和NonfairSync
分别用来支持公平锁和非公平锁,ReentrantLock
直接通过 Sync
来控制加锁、解锁等逻辑就可以了。
(2)为什么要实现Lock
接口?
因为 Lock
是 JDK 给开发者提供的一个“使用锁的标准接口”,它定义了很多锁相关的方法,所有的锁类(ReentrantLock、StampedLock、ReadWriteLock 的 writeLock 等)都 统一使用 Lock 接口来操作,这就是面向接口编程。
简单来说,它向外暴露的是 Lock 接口的语义行为, 它向内借助 AQS 实现具体的同步机制。
所以下面创建ReentrnatLock
这句代码:
Lock lock = new ReentrantLock()
本质上是“Lock 接口 → ReentrantLock → Sync(内部类)→ AQS(真正逻辑)”这样的实现机制。
2.2 内部同步器syn实现可重入锁
ReentrantLock
通过内部的 Sync
类继承 AQS,并重写了 AQS 的模板方法(如 tryAcquire/tryRelease),我们来看看Sync
的内部实现。
Sync
是一个静态内部抽象类,继承自 AQS,重写了tryRelease()
释放锁的逻辑,并定义了抽象方法 lock()
,由 FairSync
/ NonfairSync
去具体实现加锁策略。
abstract staticclass Sync extends AbstractQueuedSynchronizer {
// 加锁入口
abstract void lock();
// 释放锁的实际逻辑
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
thrownew IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 获取锁是否属于当前线程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 当前线程的持有次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 当前是否有线程持有锁
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 创建条件变量
final ConditionObject newCondition() {
returnnew ConditionObject();
}
}
仔细分析Sync
的内部代码,我们可以发现tryRelease()
不但实现了释放锁的逻辑,还实现了可重入锁的特性。
tryRelease()
方法(1)首先检查当前线程是否是 owner(只有 owner 才能释放,否则会抛出异常),(2)然后将 state 减少一次(释放一次锁)。(3)如果减到 0,表示完全释放锁,把 owner 清空(setExclusiveOwnerThread(null)
)。
2.3 子类NonfairSync与FairSync实现公平与非公平策略
NonfairSync
与FairSync
都继承 Sync
,都重写了AQS中的 tryAcquire(int)
方法,这也是它们实现公平加锁和非公平加锁的核心逻辑。
// 非公平锁实现
staticfinalclass NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // AQS 的模板方法
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
returntrue;
}
} elseif (current == getExclusiveOwnerThread()) {
setState(c + acquires); // 可重入
returntrue;
}
returnfalse;
}
}
// 公平锁实现
staticfinalclass FairSync extends Sync {
final void lock() {
acquire(1); // 直接走 AQS 的 acquire,公平性靠 tryAcquire 保证
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 判断是否有排队线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
returntrue;
}
} elseif (current == getExclusiveOwnerThread()) {
setState(c + acquires); // 重入
returntrue;
}
returnfalse;
}
}
我们来对比这两个子类中最关键的两个方法:lock()
和 tryAcquire()
,它们的控制流程有什么不同。
lock() 方法对比
- 非公平锁先试图直接抢锁(CAS),抢不到再进入 AQS 的 acquire 流程(排队、自旋、挂起)。
- 公平锁不做任何直接 CAS 抢锁的尝试,直接进入排队机制,从头排起,不给插队机会。
tryAcquire() 方法对比
- 非公平锁中,只判断
state
是否为 0,一旦为 0,立刻尝试抢锁。 - 公平锁中,在尝试 CAS 加锁之前先调用
hasQueuedPredecessors()
来判断当前线程前面是否有前驱节点。如果有,就老实排队,不抢锁。如果没有前驱节点再用CAS 加锁。
总的来说,非公平锁会先尝试通过 CAS 抢锁,抢不到再排队;而公平锁不允许插队,始终先检查是否有排队线程,只有队首线程才允许尝试获取锁。
2.4 ReentrantLock的可中断锁
可中断锁是指当一个线程在等待获取锁时,可以通过中断(Thread.interrupt()
)将其从等待中唤醒,从而避免有些线程带锁时间过长的情况,可以避免死锁、阻塞过久。
在ReentrantLock
内部中有这样一个方法lockInterruptibly
,可以实现可中断加锁。
// 可中断加锁
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
但是在Sync
内部并没有这个方法acquireInterruptibly()
,实际上调用的是 AQS 中的模板方法,而这个模板方法又依赖子类实现的 tryAcquire()
方法来判断是否能抢锁。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg); // 核心方法
}
该方法会先检查当前线程是否已中断(已中断则抛出异常),然后尝试获取锁,如果失败则加入 AQS 队列,在挂起等待的过程中响应中断信号。一旦线程被中断,立即抛出 InterruptedException 并退出等待队列。
2.5 Condition条件变量
第一节讲AQS的时候,就提到了Node的状态为CONDITION
时,就表示线程在 Condition
条件队列中。
而条件变量就是让线程在条件队列中的“罪魁祸首”,它是配合 ReentrantLock 使用的一种线程通信机制,可以让线程在满足某个“条件”之前主动等待(await()
),等条件满足后被其他线程唤醒(signal()
)。
它是由AQS的内部类ConditionObject
实现的,每一个 Condition 对象都对应一个条件等待队列(独立于锁的同步队列)。
- 创建
Condition
对象:调用newCondition()
方法。
Lock lock = new ReentrantLock();
Condition notEmpty = lock.newCondition();
- 加入等待队列:
await()
会让当前线程加入条件等待队列,释放锁并阻塞挂起。
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列非空
}
- 唤醒:
signal()
会从条件队列中取出一个节点,将它移入 AQS 同步队列,唤醒它去竞争锁。
notEmpty.signal(); // 唤醒等待者
最近有一场面试,被问到一个场景题,我觉得很适合用Condition
来实现,假设有一个仓库,只能存放5个货物(BoundedQueue<Integer>
),有一个生产线程不断送货进仓库,还有一个消费线程不断从仓库取货。
BoundedQueue queue = new BoundedQueue<>(5);
// 生产者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {}
}
}).start();
// 消费者线程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(500); // 模拟消费慢
queue.take();
} catch (InterruptedException e) {}
}
}).start();
这个场景中,生产线程要满足仓库有空位才能送货,消费线程要满足仓库有货才能取货。
那么我们就可以给送货方法put()
和取货方法take()
加上ReentrantLock锁,并创建两个Condition
对象来让线程在不满足条件的情况下挂起阻塞。
class BoundedQueue {
privatefinal Queue queue = new LinkedList<>();
privatefinalint capacity;
privatefinal Lock lock = new ReentrantLock();
privatefinal Condition notFull = lock.newCondition();
privatefinal Condition notEmpty = lock.newCondition();
public BoundedQueue(int capacity) {
this.capacity = capacity;
}
// 生产者
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待空间
}
queue.offer(item);
System.out.println(\"生产了: \" + item);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
// 消费者
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待数据
}
T item = queue.poll();
System.out.println(\"消费了: \" + item);
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
}
2.6 synchronized和ReentrantLock的区别
synchronized
和 ReentrantLock
是非常高频的面试题,我们可以从它们的实现机制、是否可重入、是否可中断、是否支持公平性选择等方面来回答。
维度 | synchronized | ReentrantLock |
---|---|---|
实现方式 | Java 关键字,语言级,JVM 实现(monitorenter/monitorexit) | Java 类,JUC 并发包,基于 AQS实现,自行控制 |
可重入 | 是 | 是 |
可中断 | 不支持 | 支持 lockInterruptibly() |
公平性选择 | 不支持 | 支持,构造函数可选择公平/非公平 |
是否必须手动释放 | 自动释放(代码块结束) | 必须手动 unlock() |
条件变量 | 无 | 支持多个 Condition |
性能优化 | JDK1.6 之后有锁优化(偏向锁、轻量级锁) | 性能稳定,适合复杂并发 |
使用场景 | 简单同步控制 | 更复杂的锁需求(如条件等待、可中断、非公平锁) |