[
AI的出现,是否能替代IT从业者?
10w+人浏览
1.2k人参与
](https://activity.csdn.net/topic?id=10811)
CSDN Java并发编程深度解析系列:AQS(AbstractQueuedSynchronizer)是JUC包的核心基石,理解AQS是掌握Java高并发编程的关键钥匙。本文从设计思想、实现原理、源码分析、实战应用四个维度,全方位解析AQS的队列管理、状态控制、模板方法模式等核心技术。通过ReentrantLock、CountDownLatch等源码剖析,揭示AQS如何成为Java并发组件的统一抽象层。
JUC并发工具
同步器层
并发集合
原子变量
AQS AbstractQueuedSynchronizer
ReentrantLock
CountDownLatch
Semaphore
CyclicBarrier
ReentrantReadWriteLock
同步器开发的共性需求:
// 所有同步器都需要解决的基本问题
public interface SyncCommonRequirements {
// 1. 状态管理:记录锁的状态(是否被持有、重入次数等)
int getState();
void setState(int newState);
boolean compareAndSetState(int expect, int update);
// 2. 队列管理:管理等待线程的排队和唤醒
void enqueue(Thread thread); // 入队
Thread dequeue(); // 出队
// 3. 阻塞/唤醒:线程的挂起和恢复
void parkThread(Thread thread); // 阻塞
void unparkThread(Thread thread);// 唤醒
// 4. 超时/中断:支持限时等待和中断响应
boolean await(long time, TimeUnit unit) throws InterruptedException;
}
AQS的设计目标:将通用逻辑(队列管理、阻塞唤醒)与特定逻辑(状态判断、获取释放策略)分离。
/**
* AQS状态管理源码分析
*/
public abstract class AbstractQueuedSynchronizer {
// 核心:volatile保证状态可见性,CAS保证原子更新
private volatile int state;
/**
* 获取当前状态
*/
protected final int getState() {
return state;
}
/**
* 设置状态
*/
protected final void setState(int newState) {
state = newState;
}
/**
* CAS原子更新状态 - AQS的原子操作基石
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 状态的不同语义(由子类定义)
// ReentrantLock: state表示锁的重入次数(0-未锁定,1-被持有,>1-重入次数)
// Semaphore: state表示可用许可证数量
// CountDownLatch: state表示剩余计数
}
/**
* CLH队列节点定义
*/
static final class Node {
// 节点状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 节点关联的线程
volatile Thread thread;
// 下一个等待节点(Condition队列用)
Node nextWaiter;
// 等待状态常量
static final int CANCELLED = 1; // 取消状态
static final int SIGNAL = -1; // 需要唤醒后继节点
static final int CONDITION = -2; // 在条件队列中等待
static final int PROPAGATE = -3; // 共享模式下需要传播
}
/**
* AQS模板方法设计
*/
public abstract class AbstractQueuedSynchronizer {
// 模板方法:供外部调用的入口
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取(由子类实现)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 加入队列并等待
// 响应中断
Thread.currentThread().interrupt();
}
}
// 需要子类实现的钩子方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}
| 模式 | 特点 | 适用场景 | 典型实现 |
|---|---|---|---|
| 独占模式(Exclusive) | 同一时刻只有一个线程能获取资源 | 互斥锁、重入锁 | ReentrantLock |
| 共享模式(Shared) | 同一时刻多个线程可以获取资源 | 信号量、闭锁 | Semaphore, CountDownLatch |
/**
* ReentrantLock中AQS的具体实现
*/
public class ReentrantLock {
private final Sync sync;
// 内部同步器,继承AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// 实现AQS的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 锁未被占用
if (!hasQueuedPredecessors() && // 公平性检查
compareAndSetState(0, acquires)) { // CAS获取锁
setExclusiveOwnerThread(current); // 设置当前线程为锁持有者
return true;
}
} else if (current == getExclusiveOwnerThread()) { // 重入
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc); // 更新重入次数
return true;
}
return false;
}
// 实现tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) { // 完全释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}
/**
* acquire方法的完整执行流程
*/
public final void acquire(int arg) {
// 1. 先尝试快速获取(子类实现)
if (!tryAcquire(arg)) {
// 2. 快速获取失败,创建节点加入队列
Node node = addWaiter(Node.EXCLUSIVE);
// 3. 在队列中自旋等待
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 轮到当前节点且获取成功
setHead(node); // 成为新的头节点
p.next = null; // 帮助GC
return interrupted;
}
// 4. 检查是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node)) {
// 5. 阻塞当前线程
interrupted |= parkAndCheckInterrupt();
}
}
}
}
/**
* 加入等待队列
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 队列非空,快速入队
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS设置尾节点
pred.next = node;
return node;
}
}
enq(node); // 完整入队流程
return node;
}
线程1(持有锁)线程2(等待)线程3(等待)AQS队列持有锁(state=1)尝试获取锁(tryAcquire)获取失败创建节点加入队列队列: [头节点] ->> [T2]尝试获取锁获取失败加入队列尾部队列: [头节点] ->> [T2] ->> [T3]释放锁(tryRelease)唤醒T2获取锁成功队列: [头节点(T2)] ->> [T3]线程1(持有锁)线程2(等待)线程3(等待)AQS队列
/**
* CountDownLatch的同步器实现
*/
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // 初始化状态为计数
}
// 共享模式获取
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
// 状态为0时返回1(成功),否则返回-1(失败)
}
// 共享模式释放
protected boolean tryReleaseShared(int releases) {
for (;;) { // 自旋CAS
int c = getState();
if (c == 0) return false; // 已经为0
int nextc = c - 1;
if (compareAndSetState(c, nextc)) { // CAS减1
return nextc == 0; // 减到0时返回true,唤醒所有等待线程
}
}
}
}
}
/**
* 共享模式获取资源
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) { // 尝试获取
doAcquireShared(arg); // 加入队列等待
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 共享模式节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取成功
setHeadAndPropagate(node, r); // 关键:设置头节点并传播
p.next = null;
if (interrupted) Thread.currentThread().interrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node)) {
interrupted |= parkAndCheckInterrupt();
}
}
} finally {
if (failed) cancelAcquire(node);
}
}
/**
* 传播唤醒后续共享节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 如果还有剩余资源,或者原头节点需要传播,或者新头节点需要传播
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) {
doReleaseShared(); // 唤醒后续共享节点
}
}
}
| 同步器 | 同步模式 | 状态state含义 | tryAcquire逻辑 | 特点 |
|---|---|---|---|---|
| ReentrantLock | 独占 | 重入次数 | CAS获取,支持重入 | 可重入、可中断、可超时 |
| Semaphore | 共享 | 许可证数量 | CAS减少许可证 | 控制并发线程数 |
| CountDownLatch | 共享 | 剩余计数 | 检查是否为0 | 一次性屏障 |
| ReentrantReadWriteLock | 两者 | 高16位读锁,低16位写锁 | 复杂的位运算 | 读写锁分离 |
| CyclicBarrier | 共享+独占 | 代次+计数 | 计数到达后重置 | 可重复使用的屏障 |
/**
* 基于AQS实现简单的互斥锁
*/
public class SimpleMutex {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 内部同步器
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 状态0->1表示获取锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0); // 无需CAS,因为只有持有者能释放
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
}
/**
* ReentrantLock中的公平性实现
*/
public class ReentrantLock {
// 非公平锁
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 直接尝试获取,不检查队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
// 公平锁
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// 先检查队列中是否有等待线程
if (!hasQueuedPredecessors() && // 关键区别
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
// 检查是否有前驱节点在等待
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
if ((s = h.next) == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0) s = p;
}
}
if (s != null && s.thread != Thread.currentThread()) {
return true; // 有前驱节点在等待
}
}
return false;
}
}
/**
* AQS条件变量实现
*/
public class ConditionObject implements Condition {
// 条件队列(单向链表)
private transient Node firstWaiter;
private transient Node lastWaiter;
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// 1. 加入条件队列
Node node = addConditionWaiter();
// 2. 完全释放锁
int savedState = fullyRelease(node);
// 3. 等待被唤醒或中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted()) break;
}
// 4. 重新获取锁
if (acquireQueued(node, savedState)) {
throw new InterruptedException();
}
}
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) {
doSignal(first); // 唤醒条件队列中的第一个节点
}
}
}
1. 减少CAS竞争
// 不好的做法:频繁CAS
while (!compareAndSetState(0, 1)) {
// 空旋或让步
}
// 好的做法:先检查再CAS
if (getState() == 0 && compareAndSetState(0, 1)) {
// 快速路径成功
}
2. 合理使用自旋
// 在入队前短暂自旋
for (int i = 0; i < MAX_SPIN; i++) {
if (tryAcquire(arg)) return true; // 自旋期间成功获取
if (hasQueuedPredecessors()) break; // 有排队线程,停止自旋
}
// 自旋失败,加入队列
/**
* 基于AQS开发同步器的最佳实践
*/
public abstract class BestPracticeSync extends AbstractQueuedSynchronizer {
// 1. 明确状态语义
@Override
protected boolean isHeldExclusively() {
// 必须正确实现
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 2. 合理使用公平性
protected boolean fairTryAcquire(int acquires) {
// 根据场景选择公平策略
return isFair() ? fairAcquire(acquires) : nonfairAcquire(acquires);
}
// 3. 正确实现Condition
public Condition newCondition() {
return new ConditionObject(); // 使用AQS提供的实现
}
// 4. 性能监控点
protected void acquisitionSucceeded() {
// 可以添加监控逻辑
monitor.recordAcquisition();
}
}
| 价值点 | 说明 | 影响 |
|---|---|---|
| 代码复用 | 同步器开发者只需关注状态管理 | 大幅降低开发难度 |
| 性能优化 | 队列管理、CAS操作等经过深度优化 | 提供高性能基础 |
| 可靠性 | 经过严格测试和大量生产验证 | 工业级质量 |
| 扩展性 | 支持各种复杂的同步需求 | 适应性强 |
入门阶段:
进阶阶段:
高级阶段:
💡 核心认知:AQS不仅是技术实现,更是并发设计的典范。理解AQS有助于培养良好的并发编程思维。
实用工具:
💬 互动话题:你在项目中使用过哪些基于AQS的组件?遇到过哪些并发问题?欢迎分享你的实战经验!