0%

JUC AQS同步器框架

AbstractQueuedSynchronized(AQS),抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock、Semaphore、CountDownLatch…

概述

简单说来,AbstractQueuedSynchronized(AQS)会把所有的请求线程构成一个 CLH 队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态.

CLH 队列 — Craig, Landin, and Hagersten lock queue

CLH 队列是 AQS 中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH 就是管理这些“等待锁”的线程的队列。

CLH 是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

AQS 维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。state 的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS 定义两种资源共享方式:

  • Exclusive(独占,同一时间点只有一个线程能执行,如ReentrantLock)
  • Share(共享,同一时间点多个线程可同时执行,如Semaphore/CountDownLatch/ReadLock)

独占锁根据锁的划分机制还可分为两类:“公平锁”和“非公平锁”。

  • 公平锁:是按照通过 CLH 等待线程按照先来先得的规则,公平的获取锁。
  • 非公平锁:当线程要获取锁时,它会无视 CLH 等待队列而直接获取锁。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回 true
  • tryRelease(int):独占方式。尝试释放资源,成功则返回 true
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程lock()时,会调用tryAcquire()独占该锁并将state + 1。此后,其他线程再tryAcquire()时就会失败,直到 A 线程unlock()state = 0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 为例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后countDown()一次,state 会 CAS 减 1。等到所有子线程都执行完后(即state = 0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。

Lock VS Synchronized

AbstractQueuedSynchronizer 通过构造一个基于阻塞的 CLH 队列容纳所有的阻塞线程,而对该队列的操作均通过 Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock 实现了偏向锁的功能。

原生的 CLH 队列是用于自旋锁,但 Doug Lea 把其改造为阻塞锁

synchronized 的底层也是一个基于 CAS 操作的等待队列,但 JVM 实现的更精细,把等待队列分为 ContentionList 和 EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但 synchronized 还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而 Lock 则完全依靠系统阻塞挂起等待线程。

当然 Lock 比 synchronized 更适合在应用层扩展,可以继承 AbstractQueuedSynchronizer 定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock 对应的 Condition 也比 wait/notify 要方便的多、灵活的多。

源码详解

继承自AOS

AbstractOwnableSynchronizer 是一个抽象类同步器,它可能由线程独占。该类为创建可能包含所有权概念的锁和相关同步器提供了基础,但是,子类和工具可以使用适当维护的值来帮助控制和监视访问并提供诊断,定义了独占模式,设置和获取独占模式下的线程 Thread 信息。

AbstractOwnableSynchronizer 是一个抽象父类,子类有 AbstractQueuedSynchronizer 和 AbstractQueuedLongSynchronizer,它们 2 个之间的区别就是异常将所有与状态相关的参数和结果定义为long类型而不是int类型,在创建同步器(例如多级锁和需要 64 位状态的障碍)时,此类可能很有用。  

//线程对象:表示独占模式同步的当前所有者。以独占模式拥有锁的线程对象
private transient Thread exclusiveOwnerThread;


//设置拥有独占访问权的线程对象。如果为空表示没有线程拥有访问权,
//此方法不会强制执行任何同步或 volatile 字段访问。
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
//返回上次被setExclusiveOwnerThread方法设置的线程,如果没有被设置,返回null.
//此方法不会强制执行任何同步或 volatile 字段访问。
protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

结点状态

Node 结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。

waitStatus

变量 waitStatus 则表示当前 Node 结点(对应一个线程)的等待状态,有以下几种类型:

  • SIGNAL(-1) :表示当前节点的后继节点包含的线程需要被唤醒,也就是 unpark,所以当前节点release或cancels时,必须unpark它的后继节点
  • CANCELLED(1):因为超时或中断,该线程已经被取消
  • CONDITION(-2):表明该线程被处于条件队列,将不会被用于同步队列,直到节点状态被设置为 0
  • PROPAGATE(-3):表示当前场景下后续的 acquireShared 能够得以执行,releaseShared 应该被传播到其他节点
  • 0:表示当前节点在同步队列中,等待着获取锁

注意:负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用 >0、<0 来判断结点的状态是否正常。

nextWaiter

nextWaiter 是区别当前 CLH 队列是独占锁队列还是共享锁队列的标记。

  • nextWaiter=SHARED,则 CLH 队列是共享锁队列
  • nextWaiter=EXCLUSIVE,(即nextWaiter=null),则 CLH 队列是共享锁队列

独占模式

acquire

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而 CLH 队列中可能还有别的线程在等待)
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire(int)

此方法尝试去获取独占资源。如果获取成功,则直接返回 true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

AQS 只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,自定义同步器在进行资源访问时要考虑线程安全的影响。这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。下面是实现逻辑:

addWaiter(Node)

此方法用于将当前线程包装为 Node 节点加入到等待队列的队尾,并返回当前线程所在的结点。下图是流程图:

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        //快速尝试一次将Node加入队列
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            //操作成功后直接返回
            return node;
        }
    }
    //尝试失败代表线程竞争,以自旋的方式加入队列
    enq(node);
    return node;
}

enq(Node)

此方法用于将 node 加入队尾。源码如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到从队列中获取锁为止;并且返回当前线程在等待过程中有没有并中断过。

通过tryAcquire()addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。该线程下一部就是进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源。但是,在阻塞之前又通过 tryAccquire 重试是否能获得锁,如果重试成功能则无需阻塞。

final boolean acquireQueued(final Node node, int arg) {
    //标记是否成功拿到资源
    boolean failed = true;
    try {
        //标记等待过程中是否被中断过
        boolean interrupted = false;
        //又是一个“自旋”!
        for (;;) {
            //拿到前驱,获取上一个等待锁的线程
            final Node p = node.predecessor();
            //如果前驱是 head,即该结点已成老二,那么便有资格去尝试获取资源
            //(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                //拿到资源后,将head指向该结点。
                //所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                setHead(node);
                //setHead中node.prev已置为null,此处再将head.next置为null,
                //就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                p.next = null; 
                failed = false; // 成功获取资源
                return interrupted; //返回等待过程中是否被中断过
            }
            //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
            //如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,
            //从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
                //如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,
                //从而继续进入park()等待。
                interrupted = true;
        }
    } finally {
        //如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),
        //那么取消结点在队列中的等待。
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

该方法的作用在于判断当前节点中的线程,是否可以安全的进入park()。返回 true,表示进程可以进入 park。

shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

  1. 如果前继节点状态为 SIGNAL,表明当前节点需要被 unpark(唤醒),此时则返回 true
  2. 如果前继节点状态为 CANCELLED(ws > 0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非 CANCELLED 状态)的节点,并返回 false
  3. 如果前继节点状态为非 SIGNAL、非 CANCELLED,则设置前继的状态为 SIGNAL,并返回 false。

如果“规则1”发生,即“前继节点是 SIGNAL 状态,则意味着“当前线程”需要被阻塞。接下来会调用 parkAndCheckInterrupt() 阻塞当前线程,直到当前先被唤醒才从 parkAndCheckInterrupt() 中返回。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //拿到前驱的状态
    //如果前驱节点的waitStatus为SIGNAL -1,则表示当前节点可以安全的park()
    if (ws == Node.SIGNAL)
        //为前驱节点设置,release后通知该节点的状态
        return true;
     // waitStatus>0,即为CANCELLED状态,此时当前节点需要找到状态不为CANCELLED状态的节点,将其设置为自己的前驱节点,并将新的前驱节点的next指向自己。
   // 注意,这样做完之后,那些当前节点的waitStatus状态为CANCELLED的前驱节点链,将成为孤链。但这个孤链仍然有指向原等待队列的prev和next指针。只是原等待队列中已经没有指向孤链的节点指针
   // 将前驱节点移出列队
    if (ws > 0) {
        //如果前驱节点取消,不断向前找到一个正常状态的前驱节点并为其设置release后通知的状态
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //走到此处,表明前驱节点的状态为0或PROPAGATE。此时可以将前驱节点的waitStatus设置为SIGNAL状态
       //注意:这里仍然要返回false,表明当前节点不能被park。我们需要在park之前,重试确认该节点不能获取到资源
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。

它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot 在 Linux 中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。

park() 会让当前线程进入 waiting 状态。在此状态下,有两种途径可以唤醒该线程:

  1. unpark()唤醒:前继节点对应的线程使用完锁之后,通过 unpark() 方式唤醒当前线程。
  2. 中断唤醒:其它线程通过 interrupt() 中断当前线程。

需要注意的是,Thread.interrupted() 会清除当前线程的中断标记位。

经过 shouldParkAfterFailedAcquire() 方法后,队列中所有节点的状态的示意图如下。

最终对于acquireQueued()方法而言,只有线程获取到了锁或者被中断,线程才会从这个方法里面返回,否则它会一直阻塞在里面。

selfInterrupt

当线程从acquireQueued()方法处返回时,返回值有两种情况,如果返回 false,表示线程不是被中断才唤醒的,所以此时在acquire()方法中,if 判断不成立,就不会执行selfInterrupt()方法,而是直接返回。如果返回 true,否则表示线程是被中断才唤醒的,由于在parkAndCheckInterrupt()方法中调用了Thread.interrupted()方法,这会将线程的中断标识重置,但也会使acquire()方法中的 if 判断成立,然后这样就会调用selfInterrupt()方法,最后acquire()方法返回。

selfInterrupt()方法主要是由于parkAndCheckInterrupt()在返回当前中断状态之后清除了中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!

需要注意的是,之后自己设计的逻辑中没有如果没有处理中断的代码,那么它会一直执行下去,或者将当前线程设置为 waitiing 状态,就会抛出中断异常,并消除中断标志。

release

当持有锁的线程释放锁后,会唤醒同步队列中下一个处于等待状态的节点去尝试获取锁。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 当释放锁成功以后,需要唤醒同步队列中的其他线程
        Node h = head;
        // 当waitStatus!=0时,表示同步队列中还有其他线程在等待获取锁
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release()方法中会先调用 AQS 子类的tryRelease()方法,该方法会留作子类扩展使用

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

如果同步队列中有线程在等待获取锁,那么此时在release()方法中调用unparkSuccessor()方法去唤醒下一个等待状态的节点。

private void unparkSuccessor(Node node) {
    //找到头结点后第一个waitStatus小于0的节点,然后将其唤醒
    //<0 代表是非取消状态的节点 
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        /**
         * 同步队列中的节点锁代表的线程,可能被取消了,此时这个节点的waitStatus=1
         * 因此这个时候利用for循环从同步队列尾部开始向前遍历,判断节点是不是被取消了
         * 正常情况下,当头结点释放锁以后,应该唤醒同步队列中的第二个节点,但是如果第二个节点的线程被取消了,此时就不能唤醒它了,
         * 就应该判断第三个节点,如果第三个节点也被取消了,再依次往后判断,直到第一次出现没有被取消的节点。如果都被取消了,此时s==null,所以不会唤醒任何线程
         */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒符合条件的线程
    //此时线程被唤醒后,就回到上面获取锁流程中的parkAndCheckInterrupt()方法处
    //接着就执行后面的逻辑了
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享模式

acquireShared

以共享模式获取资源,如果获取成功,直接返回,否则进去 CLH 等待队列,通过自旋知道获取到资源为止,过程中忽略线程中断,获取资源后才进行自我中断(补上)

//获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

tryAcquireShared

尝试以共享的方式获取资源,成功true,失败false,该方法可以用于实现Lock中的tryLock()方法。

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

AQS 只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,自定义同步器在进行资源访问时要考虑线程安全的影响。这里之所以没有定义成 abstract,是因为独占模式下只用实现 tryAcquire-tryRelease,而共享模式下只用实现 tryAcquireShared-tryReleaseShared。

doAcquireShared

将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

cancelAcquire

取消节点(列队等待中抛出异常会调用此方法)

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    //找到适合的前继节点,当前节点的waitStatus赋值为CANCELLED
    node.thread = null; // 释放线程

    // Skip cancelled predecessors 前驱节点已被取消  重新定义前驱节点
    Node pred = node.prev;
    //若前继节点是CANCELLED,则继续找前继节点,直至找到一个正常的前继节点赋值给node,作为node的新前继节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    //取消当前线程所属的节点(标记为取消),没有使用cas因为其他线程不会干扰这里
    node.waitStatus = Node.CANCELLED; 

    // If we are the tail, remove ourselves.
    //特殊情况:node==tail节点,将pred作为tail节点,然后将cancelledNodes节点链从CLH队列剔除
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        //正常情况:则将cancelledNodes节点链从CLH队列剔除
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //特殊情况:如果node是head的后继节点,则直接唤醒node的后继节点 pred==head节点:尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

releaseShared

共享模式释放资源

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared

共享模式下尝试释放锁,由子类选择性实现

protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
}

fullyRelease

使用当前节点状态调用 release,成功返回状态,失败抛出异常

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

Condition条件队列

内部类 ConditionObject,它实现了 Condition 接口,主要用于实现条件锁。

ConditionObject 中也维护了一个队列,这个队列主要用于等待条件的成立,当条件成立时,其它线程将 signal 这个队列中的元素,将其移动到 CLH 的队列中,等待占有锁的线程释放锁后被唤醒。

Condition 典型的运用场景是在 BlockingQueue 中的实现,当队列为空时,获取元素的线程阻塞在 notEmpty 条件上,一旦队列中添加了一个元素,将通知 notEmpty 条件,将其队列中的元素移动到 AQS 队列中等待被唤醒。

下图是 Condition 执行过程