AbstractQueuedSynchronizer框架
日常开发中,大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLock,Semaphore,Java线程池中的Worker等。本文将介绍AQS相关的实现细节。


什么是AbstractQueuedSynchronizer(AQS)
AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState及compareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:

AbstractQueuedSynchronizer(AQS)实现
AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:

AbstractOwnableSynchronizer
当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } /** * 当前独占同步器的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置当前独占同步器的线程 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /** * 获取当前独占同步器的线程 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }} AbstractQueuedSynchronizer
AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node:
static final class Node { /** * 标记节点正以共享模式等待 */ static final Node SHARED = new Node(); /** * 标记节点正以独占模式等待 */ static final Node EXCLUSIVE = null; // ===== 以下表示节点的等待状态 ===== /** * 表示当前的线程被取消 */ static final int CANCELLED = 1; /** * 表示当前节点的后继节点包含的线程需要运行,也就是unpark */ static final int SIGNAL = -1; /** * 表示当前节点在等待condition,也就是在condition队列中 */ static final int CONDITION = -2; /** * 示当前场景下后续的acquireShared能够得以执行 */ static final int PROPAGATE = -3; /** * 状态 */ volatile int waitStatus; /** * 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成连接。 */ volatile Node prev; /** * 后继结点 */ volatile Node next; /** * 入队列时的当前线程 */ volatile Thread thread; /** * 存储condition队列中的后继节点。 */ Node nextWaiter; ...} 其中AbstractQueuedSynchronizer维护的链表结构大致如下:

ReentrantLock
可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lock和unlock操作:
public class ReentrantLock implements Lock, java.io.Serializable { // 同步器,用于实现锁机制 private final Sync sync; /** * 基础的同步器实现 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 由公平锁和非公平锁实现 */ abstract void lock(); /** * 非公平锁时,尝试加锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 同步器状态 int c = getState(); if (c == 0) { // 若同步器状态为初始状态,则尝试加锁 if (compareAndSetState(0, acquires)) { // 设置锁的占用线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经加锁过,则设置state为锁的重入次数+1 int nextc = c + acquires; if (nextc < 0) // 超出了锁重入的最大次数 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 尝试释放同步器 */ protected final boolean tryRelease(int releases) { // 释放后的新状态 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) // 非占用线程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // state归零,释放成功 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /* * 当前线程是否独占该锁 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /* * 创建一个条件对象 */ final ConditionObject newCondition() { return new ConditionObject(); } /* * 获取当前独占线程 */ final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } /* * 获取锁被重入的次数 */ final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /* * 锁是否被占用 */ final boolean isLocked() { return getState() != 0; } /** * 从对象流中反序列化锁对象 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); // 重置为初始状态 setState(0); } } /** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加锁 */ final void lock() { // 先尝试直接加锁,即抢占式 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 失败后,就排队抢锁 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 直接进行排队抢锁,保持公平 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 若没有其他线程已经在等待队列中,则尝试加锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经占有锁,则重入次数 + acquires int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * 默认非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 请求锁 */ public void lock() { sync.lock(); } /** * 尝试加锁,可被中断 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试加锁 */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** * 加锁,具有超时限制 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 解锁 */ public void unlock() { sync.release(1); } /** * 创建一个条件对象 */ public Condition newCondition() { return sync.newCondition(); } /** * 获取锁的重入次数 */ public int getHoldCount() { return sync.getHoldCount(); } /** * 锁是否被当前线程持有 */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 锁是否已被持有 */ public boolean isLocked() { return sync.isLocked(); } /** * 是否是公平锁 */ public final boolean isFair() { return sync instanceof FairSync; } /** * 获取占用锁的线程 */ protected Thread getOwner() { return sync.getOwner(); } /** * 是否有等待的线程 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 判断线程是否在等待队列中 */ public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } /** * 获取等待队列长度,并发时,不是绝对精确 */ public final int getQueueLength() { return sync.getQueueLength(); } /** * 获取等待的线程集合,不是绝对精确 */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } /** * 判断是否有线程在某条件上等待 */ public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } /** * 获取在某条件上等待的线程数 */ public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedS