专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

AQS原理详解:Java并发锁机制与CLH队列实现全过程

什么是AQS

AQS:全称是AbstractQueuedSynchronizer,是并发容器JUC下locks包内的一个类。实现了CLH同步队列【FIFO的双向链表】

AQS框架架构图:

img_1image.png

图解:

  • 图中有颜色的为方法(Method),无颜色的为属性(Attribution)
  • AQS框架共分为5层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据
  • 当自定义同步器接入时,只需要重写第一层所需要的部分方法即可,不需要关注底层具体实现流程
  • 当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着处理获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方法均依赖于第五层的基础数据提供层

AQS的原理

  • AQS的核心思想:

如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。

  • 等待唤醒机制:该唤醒机制主要是用的CLH队列的变体实现的
  • CLH队列:全称Craig-Landin-Hagersten单向队列,在AQS中实现的是虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配

img_2image.png

  • 同步状态state:
  • 高16位:记录读锁的持有次数【读线程数*每个线程的重入次数】

低16位:记录写锁的持有次数及持有线程的标识

static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    /** Returns the number of shared holds represented in count. */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** Returns the number of exclusive holds represented in count. */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  • AQS使用一个volatile修饰的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对state值的修改
// java.util.concurrent.locks.AbstractQueuedSynchronizer#compareAndSetState
    protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }
state字段结构:【在共享锁中才会有此区分】

接下来通过源码,分析获取锁的流程

非公平锁的实现

非公平锁源码的加锁流程如下:基于JDK17

//// java.util.concurrent.locks.ReentrantLock#NonfairSync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        if (compareAndSetState(0, 1)) { // first attempt is unguarded
            setExclusiveOwnerThread(current);
            return true;
        } else if (getExclusiveOwnerThread() == current) {
            int c = getState() + 1;
            if (c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        } else
            return false;
    }
    /**
     * Acquire for non-reentrant cases after initialTryLock prescreen
     */
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

代码说明

  • 若通过CAS设置变量state成功,即代表获取锁成功,则将当前线程设置为独占线程;
  • 若通过CAS设置变量state失败,即代表获取锁失败,则需要判断当前线程是否已经独占线程;
  • 若是独占线程,则将state+1
  • 若获取锁失败,且当前线程非独占线程,则获取锁失败

获取锁失败就会加入到CLH队列中

@ReservedStackAccess
final void lock() {
  if (!initialTryLock())
      acquire(1);
}

公平锁的实现

公平锁源码的加锁流程如下:基于JDK17

// java.util.concurrent.locks.ReentrantLock#FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    /**
     * Acquires only if reentrant or queue is empty.
     */
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }
    /**
     * Acquires only if thread is first waiter or empty
     */
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && !hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

代码说明

  • 如果锁状态为0,则表示未被任何线程持有
  • 判断是否有线程在等待队列中,如果没有等待线程且通过CAS将变量state设置为成功,则当前线程设置为独占线程【插队(有等待线程执行等待线程,当前线程会被加入到等待队列;没有等待线程执行当前线程)】
  • 若没有等待线程或通过CAS设置变量state失败,则需要判断当前线程是否已经独占线程【重入的场景】;
  • 若当前线程是独占线程【已经获取了锁】,则将state+1【尝试增加锁的持有计数】
  • 若获取锁失败,且当前线程非独占线程,则获取锁失败

获取锁失败就会加入到CLH队列中【同上】

CLH详解

上述其实对CLH有了初步的了解,接下来则是针对AQS的源码实现来分析线程加入CLH的逻辑

线程加入等待队列

当执行acquire(1);时,会通过tryAcquire(arg)尝试去获取锁。这种情况下如果获取锁失败,则会返回false。

如果尝试获取锁失败,则会触发先成功加入队列的操作(final int acquire(Node node, int arg, boolean shared,boolean interruptible, boolean timed, long time)),如下源码所示:

final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread【spins:自旋次数,减少不公平性;postSpins:控制自旋重试的次数】
    boolean interrupted = false, first = false;//interrupted标记当前线程是否被中断,first表示当前节点是否是队列中的第一个有效节点
    Node pred = null;                // predecessor of node when enqueued【当前节点的前驱节点】
    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */
    //自旋循环,直到成功获取锁或取消
    for (;;) {
      //前驱节点有效性检查与队列清理
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {//如果前驱节点存在,且不是头结点
            if (pred.status < 0) {//如果前驱节点pred的状态小于0,则表示溢出
                cleanQueue();           // predecessor cancelled【清理队列,处理异常的前驱节点等情况】
                continue;
            } else if (pred.prev == null) {//如果前驱节点pred没有前驱节点==>pred就是队列的头节点,则要进行自旋等待【内存序列化】
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
      //尝试获取锁
        if (first || pred == null) {如果当前节点是队列的头节点,或者前驱节点pred为null【尚未入队】
            boolean acquired;//用于标记是否成功获取资源
            try {
                if (shared)//是否共享模式
                    acquired = (tryAcquireShared(arg) >= 0);//共享模式下,调用tryAcquireShared方法,arg参数控制获取共享量,返回值大于等于0则表示获取成功
                else
                    acquired = tryAcquire(arg);//独占模式下,调用tryAcquire方法尝试获取资源
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);//一旦发生异常,则取消当前节点的获取操作,并抛出异常
                throw ex;
            }
            if (acquired) {//如果成功获取资源
                if (first) {//如果当前节点是队列头节点
                    node.prev = null;//设置前驱指针为null,脱离原队列关系
                    head = node;//更新头节点
                    pred.next = null;//断开前驱的next指针
                    node.waiter = null;//清除当前节点的等待线程信息
                    if (shared)
                        signalNextIfShared(node);//尝试唤醒当前节点中的线程
                    if (interrupted)
                        current.interrupt();//如果当前线程处于中断状态,则恢复中断状态
                }
                return 1;//返回成功
            }
        }
      //节点初始化与线程入队
        if (node == null) {                 // allocate; retry before enqueue【节点未创建,根据模式创建】
            if (shared)
                node = new SharedNode();//创建共享结点
            else
                node = new ExclusiveNode();//创建独占结点
        } else if (pred == null) {          // try to enqueue【尝试将节点加入队列尾部】
            node.waiter = current;//设置节点的waiter字段为当前线程
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence【设置前驱节点(避免内存屏障)】
            if (t == null)//如果队列未初始化
                tryInitializeHead();//尝试初始化队列头节点
            else if (!casTail(t, node))//CAS更新尾结点失败
                node.setPrevRelaxed(null);  // back out【回滚前驱节点设置】
            else
                t.next = node;//前驱节点的next指向当前节点
        }
      //自旋与阻塞控制
      else if (first && spins != 0) {//减少重试时的不公平性
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();//提示CPU进行忙等待优化
        } else if (node.status == 0) {//如果节点状态未设置
            node.status = WAITING;          // enable signal and recheck【标记节点为等待状态(可被唤醒)】
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);//指数退避,增加重试次数
            if (!timed)//非超时模式
                LockSupport.park(this);//阻塞当前线程
            else if ((nanos = time - System.nanoTime()) > 0L)//计算剩余纳秒
                LockSupport.parkNanos(this, nanos);//超时阻塞
            else
                break;//超时则退出循环
            node.clearStatus();//清除节点状态
            if ((interrupted |= Thread.interrupted()) && interruptible)//检查中断
                break;//中断且允许中断,退出循环
        }
    }
  //取消获取逻辑
    return cancelAcquire(node, interrupted, interruptible);
}

代码说明:

1、 前驱节点检查与队列清理:如果当前节点非头节点,且前驱存在,检查前驱状态:若已取消则清理队列;若前驱未正确链接则等待队列稳定。
2、 尝试获取锁:若当前节点是头节点或者未入队,调用ryAcquire/tryAcquireShared尝试获取锁。成功则更新队列头节点并唤醒后续线程。
3、 节点初始化与入队:若节点未创建,根据共享/独占模式创建对应节点;若未入队,通过CAS操作将节点加入队列尾部。
4、 自旋与阻塞控制:通过spinspostSpins减少锁竞争的不公平性,最终通过LockSupport.park阻塞线程,支持超时和中断响应。
5、 取消获取处理:若超时或中断,调用cancelAcquire清理节点状态并返回相应结果。

private int cancelAcquire(Node node, boolean interrupted,
                            boolean interruptible) {
  if (node != null) {//node==null,是无效节点
      node.waiter = null;
      node.status = CANCELLED;//把当前 node 的状态设置为 CANCELLED
      if (node.prev != null)
          cleanQueue();
  }
  if (interrupted) {
      if (interruptible)
          return CANCELLED;
      else
          Thread.currentThread().interrupt();
  }
  return 0;
}
//取消队列
private void cleanQueue() {
  for (;;) {                               // restart point
      for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
          if (q == null || (p = q.prev) == null)
              return;                      // end of list
          if (s == null ? tail != q : (s.prev != q || s.status < 0))
              break;                       // inconsistent
          if (q.status < 0) {              // cancelled
              if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
                  q.prev == p) {
                  p.casNext(q, s);         // OK if fails
                  if (p.prev == null)
                      signalNext(p);
              }
              break;
          }
          if ((n = p.next) != q) {         // help finish
              if (n != null && q.prev == p) {
                  p.casNext(n, q);
                  if (p.prev == null)
                      signalNext(p);
              }
              break;
          }
          s = q;
          q = q.prev;
      }
  }
}
flowchart TD
    A[开始] --> B{当前节点是否为第一个?}
    B -->|否| C{前驱节点是否被取消?}
    C -->|是| D[清理队列]
    D --> B
    C -->|否| E{是否需要尝试获取锁?}
    E -->|是| F{尝试获取锁成功?}
    F -->|是| G[更新队列状态并返回成功]
    G --> H[结束]
    F -->|否| I{节点是否为空?}
    I -->|是| J[创建新节点]
    J --> K{是否需要入队?}
    K -->|是| L[尝试入队]
    L --> M{是否需要重试或等待?}
    M -->|重试| N[减少重试次数]
    N --> M
    M -->|等待| O{是否超时或中断?}
    O -->|否| P[等待唤醒]
    P --> M
    O -->|是| Q[取消获取]
    Q --> H
    B -->|是| E

CLH的基本原理:

1、 使用FIFO保证公平性
2、 有当前节点和前置节点,当前节点不断自旋,查询前置节点的状态
3、 前置节点与当前节点构成队列
4、 当前节点运行完成后,更改自己的状态,监听当前节点状态的线程就会结束自旋

以上就是整个加锁的流程

如何解锁

由于ReentrantLock在解锁时,并不区分公平锁和非公平锁

public void unlock() {
    sync.release(1);//利用公平锁和非公平锁的父类来定义可重入锁的释放机制
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁,返回true说明该锁没有被任何线程持有
        signalNext(head);//唤醒当前节点的后继
        return true;
    }
    return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {//返回结果:方法返回当前锁是否被线程持有
    int c = getState() - releases;//减少可重入锁的次数
    //当前线程不是持有锁的线程
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    //如果持有锁的线程全部释放,将当前独占锁所有现成设置为null,并更新state  
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}
 private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {//如果后继节点存在,且状态为等待
        s.getAndUnsetStatus(WAITING);//清除其等待状态
        LockSupport.unpark(s.waiter);//唤醒该线程
    }
}

这就是加锁+解锁的全流程,具体可以参考美团技术团队的这篇文章:美团技术年货-后台篇.pdf

未经允许不得转载:搜云库 » AQS原理详解:Java并发锁机制与CLH队列实现全过程

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们