TechBlog
首页分类标签搜索关于

© 2025 TechBlog. All rights reserved.

深入剖析AQSJava并发编程的基石与灵魂

11/22/2025
未分类#并发编程#同步器#Juc#Java#Aqs

深入剖析AQS:Java并发编程的基石与灵魂

[ AI的出现,是否能替代IT从业者? 10w+人浏览 1.2k人参与

](https://activity.csdn.net/topic?id=10811)

CSDN Java并发编程深度解析系列:AQS(AbstractQueuedSynchronizer)是JUC包的核心基石,理解AQS是掌握Java高并发编程的关键钥匙。本文从设计思想、实现原理、源码分析、实战应用四个维度,全方位解析AQS的队列管理、状态控制、模板方法模式等核心技术。通过ReentrantLock、CountDownLatch等源码剖析,揭示AQS如何成为Java并发组件的统一抽象层。

🏗️ 一、AQS框架概览:并发组件的基石

1.1 AQS在JUC中的核心地位

JUC并发工具

同步器层

并发集合

原子变量

AQS AbstractQueuedSynchronizer

ReentrantLock

CountDownLatch

Semaphore

CyclicBarrier

ReentrantReadWriteLock

1.2 AQS解决的核心问题

同步器开发的共性需求:

// 所有同步器都需要解决的基本问题
public interface SyncCommonRequirements {
    // 1. 状态管理:记录锁的状态(是否被持有、重入次数等)
    int getState();
    void setState(int newState);
    boolean compareAndSetState(int expect, int update);
    
    // 2. 队列管理:管理等待线程的排队和唤醒
    void enqueue(Thread thread);    // 入队
    Thread dequeue();               // 出队
    
    // 3. 阻塞/唤醒:线程的挂起和恢复
    void parkThread(Thread thread);  // 阻塞
    void unparkThread(Thread thread);// 唤醒
    
    // 4. 超时/中断:支持限时等待和中断响应
    boolean await(long time, TimeUnit unit) throws InterruptedException;
}

AQS的设计目标:将通用逻辑(队列管理、阻塞唤醒)与特定逻辑(状态判断、获取释放策略)分离。


🔍 二、AQS核心架构解析

2.1 AQS的三大核心组件

2.1.1 状态管理(State)
/**
 * AQS状态管理源码分析
 */
public abstract class AbstractQueuedSynchronizer {
    
    // 核心:volatile保证状态可见性,CAS保证原子更新
    private volatile int state;
    
    /**
     * 获取当前状态
     */
    protected final int getState() {
        return state;
    }
    
    /**
     * 设置状态
     */
    protected final void setState(int newState) {
        state = newState;
    }
    
    /**
     * CAS原子更新状态 - AQS的原子操作基石
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    // 状态的不同语义(由子类定义)
    // ReentrantLock: state表示锁的重入次数(0-未锁定,1-被持有,>1-重入次数)
    // Semaphore: state表示可用许可证数量
    // CountDownLatch: state表示剩余计数
}

2.1.2 CLH队列(等待队列)
/**
 * CLH队列节点定义
 */
static final class Node {
    // 节点状态
    volatile int waitStatus;
    
    // 前驱节点
    volatile Node prev;
    
    // 后继节点  
    volatile Node next;
    
    // 节点关联的线程
    volatile Thread thread;
    
    // 下一个等待节点(Condition队列用)
    Node nextWaiter;
    
    // 等待状态常量
    static final int CANCELLED = 1;   // 取消状态
    static final int SIGNAL = -1;     // 需要唤醒后继节点
    static final int CONDITION = -2;  // 在条件队列中等待
    static final int PROPAGATE = -3;  // 共享模式下需要传播
}

2.1.3 模板方法模式
/**
 * AQS模板方法设计
 */
public abstract class AbstractQueuedSynchronizer {
    
    // 模板方法:供外部调用的入口
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 尝试获取(由子类实现)
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 加入队列并等待
            // 响应中断
            Thread.currentThread().interrupt();
        }
    }
    
    // 需要子类实现的钩子方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
}

2.2 AQS的两种同步模式

模式特点适用场景典型实现
独占模式(Exclusive)同一时刻只有一个线程能获取资源互斥锁、重入锁ReentrantLock
共享模式(Shared)同一时刻多个线程可以获取资源信号量、闭锁Semaphore, CountDownLatch

⚙️ 三、源码深度解析:以ReentrantLock为例

3.1 ReentrantLock与AQS的协作

/**
 * ReentrantLock中AQS的具体实现
 */
public class ReentrantLock {
    private final Sync sync;
    
    // 内部同步器,继承AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        // 实现AQS的tryAcquire方法
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            
            if (c == 0) { // 锁未被占用
                if (!hasQueuedPredecessors() && // 公平性检查
                    compareAndSetState(0, acquires)) { // CAS获取锁
                    setExclusiveOwnerThread(current); // 设置当前线程为锁持有者
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) { // 重入
                int nextc = c + acquires;
                if (nextc < 0) throw new Error("Maximum lock count exceeded");
                setState(nextc); // 更新重入次数
                return true;
            }
            return false;
        }
        
        // 实现tryRelease方法
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean free = false;
            if (c == 0) { // 完全释放
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
}

3.2 获取锁的完整流程

/**
 * acquire方法的完整执行流程
 */
public final void acquire(int arg) {
    // 1. 先尝试快速获取(子类实现)
    if (!tryAcquire(arg)) {
        // 2. 快速获取失败,创建节点加入队列
        Node node = addWaiter(Node.EXCLUSIVE);
        
        // 3. 在队列中自旋等待
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 轮到当前节点且获取成功
                setHead(node); // 成为新的头节点
                p.next = null; // 帮助GC
                return interrupted;
            }
            
            // 4. 检查是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node)) {
                // 5. 阻塞当前线程
                interrupted |= parkAndCheckInterrupt();
            }
        }
    }
}

/**
 * 加入等待队列
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) { // 队列非空,快速入队
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // CAS设置尾节点
            pred.next = node;
            return node;
        }
    }
    enq(node); // 完整入队流程
    return node;
}

3.3 等待队列的可视化过程

线程1(持有锁)线程2(等待)线程3(等待)AQS队列持有锁(state=1)尝试获取锁(tryAcquire)获取失败创建节点加入队列队列: [头节点] ->> [T2]尝试获取锁获取失败加入队列尾部队列: [头节点] ->> [T2] ->> [T3]释放锁(tryRelease)唤醒T2获取锁成功队列: [头节点(T2)] ->> [T3]线程1(持有锁)线程2(等待)线程3(等待)AQS队列


🔄 四、共享模式详解:CountDownLatch源码分析

4.1 CountDownLatch的AQS实现

/**
 * CountDownLatch的同步器实现
 */
public class CountDownLatch {
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count); // 初始化状态为计数
        }
        
        // 共享模式获取
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
            // 状态为0时返回1(成功),否则返回-1(失败)
        }
        
        // 共享模式释放
        protected boolean tryReleaseShared(int releases) {
            for (;;) { // 自旋CAS
                int c = getState();
                if (c == 0) return false; // 已经为0
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) { // CAS减1
                    return nextc == 0; // 减到0时返回true,唤醒所有等待线程
                }
            }
        }
    }
}

4.2 共享模式的传播机制

/**
 * 共享模式获取资源
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) { // 尝试获取
        doAcquireShared(arg); // 加入队列等待
    }
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 共享模式节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 获取成功
                    setHeadAndPropagate(node, r); // 关键:设置头节点并传播
                    p.next = null;
                    if (interrupted) Thread.currentThread().interrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node)) {
                interrupted |= parkAndCheckInterrupt();
            }
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

/**
 * 传播唤醒后续共享节点
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    // 如果还有剩余资源,或者原头节点需要传播,或者新头节点需要传播
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) {
            doReleaseShared(); // 唤醒后续共享节点
        }
    }
}


🎯 五、AQS在JUC中的典型应用

5.1 各种同步器的AQS实现对比

同步器同步模式状态state含义tryAcquire逻辑特点
ReentrantLock独占重入次数CAS获取,支持重入可重入、可中断、可超时
Semaphore共享许可证数量CAS减少许可证控制并发线程数
CountDownLatch共享剩余计数检查是否为0一次性屏障
ReentrantReadWriteLock两者高16位读锁,低16位写锁复杂的位运算读写锁分离
CyclicBarrier共享+独占代次+计数计数到达后重置可重复使用的屏障

5.2 自定义同步器实战

/**
 * 基于AQS实现简单的互斥锁
 */
public class SimpleMutex {
    private final Sync sync = new Sync();
    
    public void lock() {
        sync.acquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    
    // 内部同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            // 状态0->1表示获取锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0); // 无需CAS,因为只有持有者能释放
            return true;
        }
        
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }
}


⚡ 六、AQS的高级特性与优化

6.1 公平性与非公平性

/**
 * ReentrantLock中的公平性实现
 */
public class ReentrantLock {
    // 非公平锁
    static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            // 直接尝试获取,不检查队列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }
    
    // 公平锁
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            // 先检查队列中是否有等待线程
            if (!hasQueuedPredecessors() && // 关键区别
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }
    
    // 检查是否有前驱节点在等待
    public final boolean hasQueuedPredecessors() {
        Node h, s;
        if ((h = head) != null) {
            if ((s = h.next) == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != h && p != null; p = p.prev) {
                    if (p.waitStatus <= 0) s = p;
                }
            }
            if (s != null && s.thread != Thread.currentThread()) {
                return true; // 有前驱节点在等待
            }
        }
        return false;
    }
}

6.2 条件变量(Condition)实现

/**
 * AQS条件变量实现
 */
public class ConditionObject implements Condition {
    // 条件队列(单向链表)
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        
        // 1. 加入条件队列
        Node node = addConditionWaiter();
        
        // 2. 完全释放锁
        int savedState = fullyRelease(node);
        
        // 3. 等待被唤醒或中断
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted()) break;
        }
        
        // 4. 重新获取锁
        if (acquireQueued(node, savedState)) {
            throw new InterruptedException();
        }
    }
    
    public final void signal() {
        if (!isHeldExclusively()) {
            throw new IllegalMonitorStateException();
        }
        Node first = firstWaiter;
        if (first != null) {
            doSignal(first); // 唤醒条件队列中的第一个节点
        }
    }
}


📊 七、AQS性能分析与最佳实践

7.1 性能优化策略

1. 减少CAS竞争

// 不好的做法:频繁CAS
while (!compareAndSetState(0, 1)) {
    // 空旋或让步
}

// 好的做法:先检查再CAS
if (getState() == 0 && compareAndSetState(0, 1)) {
    // 快速路径成功
}

2. 合理使用自旋

// 在入队前短暂自旋
for (int i = 0; i < MAX_SPIN; i++) {
    if (tryAcquire(arg)) return true; // 自旋期间成功获取
    if (hasQueuedPredecessors()) break; // 有排队线程,停止自旋
}
// 自旋失败,加入队列

7.2 最佳实践指南

/**
 * 基于AQS开发同步器的最佳实践
 */
public abstract class BestPracticeSync extends AbstractQueuedSynchronizer {
    
    // 1. 明确状态语义
    @Override
    protected boolean isHeldExclusively() {
        // 必须正确实现
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    
    // 2. 合理使用公平性
    protected boolean fairTryAcquire(int acquires) {
        // 根据场景选择公平策略
        return isFair() ? fairAcquire(acquires) : nonfairAcquire(acquires);
    }
    
    // 3. 正确实现Condition
    public Condition newCondition() {
        return new ConditionObject(); // 使用AQS提供的实现
    }
    
    // 4. 性能监控点
    protected void acquisitionSucceeded() {
        // 可以添加监控逻辑
        monitor.recordAcquisition();
    }
}


💎 总结与核心价值

8.1 AQS的设计哲学

  1. 模板方法模式:分离变与不变,AQS处理通用逻辑,子类实现特定逻辑
  2. CLH队列变体:高效的FIFO等待队列,减少竞争
  3. 状态机抽象:通过int state抽象各种同步状态
  4. 双模式支持:独占与共享模式覆盖大部分同步需求

8.2 AQS的核心价值

价值点说明影响
代码复用同步器开发者只需关注状态管理大幅降低开发难度
性能优化队列管理、CAS操作等经过深度优化提供高性能基础
可靠性经过严格测试和大量生产验证工业级质量
扩展性支持各种复杂的同步需求适应性强

8.3 学习路径建议

入门阶段:

  • ✅ 理解AQS的基本工作原理
  • ✅ 掌握ReentrantLock的使用和源码

进阶阶段:

  • ✅ 分析CountDownLatch、Semaphore等组件的AQS实现
  • ✅ 理解共享模式的传播机制

高级阶段:

  • ✅ 掌握Condition的实现原理
  • ✅ 能够基于AQS自定义同步器
  • ✅ 理解AQS的性能优化技巧

💡 核心认知:AQS不仅是技术实现,更是并发设计的典范。理解AQS有助于培养良好的并发编程思维。


📚 资源推荐

实用工具:

  • JStack:查看线程堆栈和锁状态
  • JConsole:监控线程阻塞情况
  • Arthas:在线诊断工具

💬 互动话题:你在项目中使用过哪些基于AQS的组件?遇到过哪些并发问题?欢迎分享你的实战经验!