AbstractQueuedSynchronizer框架

JerryXia 发表于 , 阅读 (0)
日常开发中,大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLockSemaphoreJava线程池中的Worker等。本文将介绍AQS相关的实现细节。

什么是AbstractQueuedSynchronizer(AQS)

AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getStatesetStatecompareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:

aqs-type-tree.png

AbstractQueuedSynchronizer(AQS)实现

AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:

abstract-queued-synchronizer.png

AbstractOwnableSynchronizer

当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {    private static final long serialVersionUID = 3737899427754241961L;    protected AbstractOwnableSynchronizer() { }    /**     * 当前独占同步器的线程     */    private transient Thread exclusiveOwnerThread;    /**     * 设置当前独占同步器的线程     */    protected final void setExclusiveOwnerThread(Thread t) {        exclusiveOwnerThread = t;    }    /**     * 获取当前独占同步器的线程     */    protected final Thread getExclusiveOwnerThread() {        return exclusiveOwnerThread;    }}    

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展高性能公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node

static final class Node {        /**     * 标记节点正以共享模式等待     */    static final Node SHARED = new Node();    /**     * 标记节点正以独占模式等待     */     static final Node EXCLUSIVE = null;    // ===== 以下表示节点的等待状态 =====        /**      * 表示当前的线程被取消     */    static final int CANCELLED =  1;    /**      * 表示当前节点的后继节点包含的线程需要运行,也就是unpark     */    static final int SIGNAL    = -1;    /**      * 表示当前节点在等待condition,也就是在condition队列中     */    static final int CONDITION = -2;    /**      * 示当前场景下后续的acquireShared能够得以执行     */    static final int PROPAGATE = -3;    /**      * 状态     */    volatile int waitStatus;    /**     * 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成连接。     */    volatile Node prev;    /**     * 后继结点     */    volatile Node next;    /**     * 入队列时的当前线程     */    volatile Thread thread;    /**     * 存储condition队列中的后继节点。     */    Node nextWaiter;    ...}    

其中AbstractQueuedSynchronizer维护的链表结构大致如下:

aqs-clh-list.png

ReentrantLock

可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lockunlock操作:

 public class ReentrantLock implements Lock, java.io.Serializable {       // 同步器,用于实现锁机制    private final Sync sync;    /**     * 基础的同步器实现     */    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = -5179523762034025860L;        /**         * 由公平锁和非公平锁实现         */        abstract void lock();        /**         * 非公平锁时,尝试加锁         */        final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            // 同步器状态            int c = getState();            if (c == 0) {            	// 若同步器状态为初始状态,则尝试加锁                if (compareAndSetState(0, acquires)) {                	// 设置锁的占用线程                    setExclusiveOwnerThread(current);                    return true;                }            } else if (current == getExclusiveOwnerThread()) {            	// 当前线程已经加锁过,则设置state为锁的重入次数+1                int nextc = c + acquires;                if (nextc < 0) // 超出了锁重入的最大次数                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }        /**         * 尝试释放同步器         */         protected final boolean tryRelease(int releases) {            // 释放后的新状态            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                // 非占用线程                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {            	// state归零,释放成功                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }        /*          * 当前线程是否独占该锁         */        protected final boolean isHeldExclusively() {            return getExclusiveOwnerThread() == Thread.currentThread();        }        /*          * 创建一个条件对象         */        final ConditionObject newCondition() {            return new ConditionObject();        }        /*          * 获取当前独占线程         */        final Thread getOwner() {            return getState() == 0 ? null : getExclusiveOwnerThread();        }        /*         * 获取锁被重入的次数         */        final int getHoldCount() {            return isHeldExclusively() ? getState() : 0;        }        /*          * 锁是否被占用         */        final boolean isLocked() {            return getState() != 0;        }        /**         * 从对象流中反序列化锁对象         */        private void readObject(java.io.ObjectInputStream s)            throws java.io.IOException, ClassNotFoundException {            s.defaultReadObject();            // 重置为初始状态            setState(0);         }    }    /**     * 非公平锁     */    static final class NonfairSync extends Sync {        private static final long serialVersionUID = 7316153563782823691L;        /**         * 加锁         */        final void lock() {        	// 先尝试直接加锁,即抢占式            if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else            	// 失败后,就排队抢锁                acquire(1);        }        /**         * 尝试获取锁         */        protected final boolean tryAcquire(int acquires) {            return nonfairTryAcquire(acquires);        }    }    /**     * 公平锁     */    static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        final void lock() {        	// 直接进行排队抢锁,保持公平            acquire(1);        }        /**         * 尝试获取锁         */        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {            	// 若没有其他线程已经在等待队列中,则尝试加锁                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            } else if (current == getExclusiveOwnerThread()) {            	// 当前线程已经占有锁,则重入次数 + acquires                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }    /**     * 默认非公平锁     */    public ReentrantLock() {        sync = new NonfairSync();    }    /**     * 请求锁     */    public void lock() {        sync.lock();    }    /**     * 尝试加锁,可被中断     */    public void lockInterruptibly() throws InterruptedException {        sync.acquireInterruptibly(1);    }    /**     * 尝试加锁     */    public boolean tryLock() {        return sync.nonfairTryAcquire(1);    }    /**     * 加锁,具有超时限制     */    public boolean tryLock(long timeout, TimeUnit unit)            throws InterruptedException {        return sync.tryAcquireNanos(1, unit.toNanos(timeout));    }    /**     * 解锁     */    public void unlock() {        sync.release(1);    }    /**     * 创建一个条件对象     */    public Condition newCondition() {        return sync.newCondition();    }    /**     * 获取锁的重入次数     */    public int getHoldCount() {        return sync.getHoldCount();    }    /**     * 锁是否被当前线程持有     */    public boolean isHeldByCurrentThread() {        return sync.isHeldExclusively();    }    /**     * 锁是否已被持有     */    public boolean isLocked() {        return sync.isLocked();    }    /**     * 是否是公平锁     */    public final boolean isFair() {        return sync instanceof FairSync;    }    /**     * 获取占用锁的线程     */    protected Thread getOwner() {        return sync.getOwner();    }    /**     * 是否有等待的线程     */    public final boolean hasQueuedThreads() {        return sync.hasQueuedThreads();    }    /**     * 判断线程是否在等待队列中     */    public final boolean hasQueuedThread(Thread thread) {        return sync.isQueued(thread);    }    /**     * 获取等待队列长度,并发时,不是绝对精确     */    public final int getQueueLength() {        return sync.getQueueLength();    }    /**     * 获取等待的线程集合,不是绝对精确     */    protected Collection<Thread> getQueuedThreads() {        return sync.getQueuedThreads();    }    /**     * 判断是否有线程在某条件上等待     */    public boolean hasWaiters(Condition condition) {        if (condition == null)            throw new NullPointerException();        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))            throw new IllegalArgumentException("not owner");        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);    }    /**     * 获取在某条件上等待的线程数     */    public int getWaitQueueLength(Condition condition) {        if (condition == null)            throw new NullPointerException();        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))            throw new IllegalArgumentException("not owner");        return sync.getWaitQueueLength((AbstractQueuedS