0%

JUC ConcurrentLock

锁是为了保护竞争资源,防止多个线程同时操作共享资源而发生数据不一致的问题。本文介绍了三种 JUC 包提供的锁:ReentrantLock、ReentrantReadWriteLock、StampedLock。基于 AQS 实现的有 ReentrantLock、ReentrantReadWriteLock,而 StampedLock 实现了自己的队列。

ReentrantLock

ReentrantLock 是一个可重入的互斥锁,又被称为“独占锁”,即在同一个时间点只能被一个线程锁持有。它分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。ReentraantLock 是通过一个 FIFO 的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。下图是 ReentrantLock 的 UML 图。

上图可看出 ReentrantLock 与 Sync 是组合关系。并且,Sync 是 AQS 的子类,FairSync(公平锁)和 NonFairSync(非公平锁)。ReentrantLock 是一个独占锁,至于它具体是公平锁还是非公平锁,就取决于 Sync 对象是 FairSync 的实例还是 NonFairSync 的实例。

ReentrantLock API

ReentrantLock 线程状态

函数 解释 返回值
getHoldCount 查询当前线程保持此锁的次数 int
getQueueLength 查询等待此锁释放的线程估计数 int
getWaitQueueLength(Condition condition) 返回Condition所辖的等待唤醒的线程估计数(或者是已经await的线程数) int
getOwner 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 Thread
getQueuedThreads() 返回一个 collection,它包含可能正等待获取此锁的线程。 Collection
getWaitingThreads(Condition condition) 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 Collection
hasQueuedThread(Thread thread) 查询指定线程是否正在等待获取此锁 boolean
hasQueuedThreads 查询ReentrantLock所辖线程是否有线程等待获取此锁 boolean
hasWaiters(Condition condition) 查询Condition所辖线程是否有线程等待获取此锁 boolean
isFair 判断是不是公平锁 boolean
isHeldByCurrentThread 判断当前线程是否获取此锁 boolean
isLocked 判断是否有任意线程获取此锁 boolean

ReentrantLock 锁功能

函数 解释 返回值
lock 获取锁 void
lockInterruptibly 如果当前线程未被中断则获取锁定,已经被中断则出现异常 void
tryLock 在调用时,尝试获取锁。如果获取到锁,返回true;否则直接返回false,不等待 boolean
tryLock(long timeout, TimeUnit unit) 如果锁在给定等待时间内没有被另一个线程获得,且当前线程未被中断,则获取该锁定。 boolean
unlock 释放锁 void

ReentrantLock 条件变量

函数 解释 返回值
newCondition 返回用来与此Lock实例一起使用的Condition实例 Condition
await 等待条件的发生,线程在await状态interrupt会报错 void
awaitUninterruptibly 等待条件的发生。与await()方法不同,它的调用不可能被中断的。 void
awaitNanos(long nanosTimeout) 等待条件的发生。然而,如果通知没有在nanosTimeout所指定的纳秒内出现的话,它还是会返回。返回值是时限所剩的估计值,等于或小于零的返回值表示这个返回是因为时限到了。跟其他的一样,这个method的实际分辨率与平台所定有关,通常是毫秒。 long
awaitUntil(Date deadline) 等待条件的发生。然而,如果通知没有在指定的绝对时间前出现,它会以 false值返回。 boolean
signal 通知某个等待使用Condition对象的thread此条件已经发生。 void
signalAll 通知所有等待使用Condition对象的thread此条件已经发生。 void

条件变量

条件变量(condition variable)是一种由许多其他 threading 系统所提供的同步类型。条件变量非常类似 Java 的等待-通知机制,事实上,在大部分情况下它们的功能是相同的。

POSIX 条件变量的四个基本函数 wait、timed_wait、signal 与 broadcast,直接对应到 Java 所提供的 method(wait()wait(long timeout)notify()notifyAll())。它们的实现在逻辑上也是相同的。条件变量的 wait 操作需要持有 mutex 的 lock。它在返回到调用者之前的等待与重新取得 lock 的时候会释放掉 lock。signal 这个 function 会唤醒一个 thread,而 broadcast 会唤醒所有在等待的 thread,这些 function 同样也需要在调用的时候持有 mutex。条件变量的 race condition 会以与 Java 的等待-通知机制相同的方式来解决。

然而其中还是有些许的不同之处,等待-通知机制是与所关联的 lock 高度地整合过,这会让此机制比条件变量的相对应功能更易于使用。从 synchronized 程序代码区段调用 wait 与 notify 方法就是它们自然的运用方式。然而,使用条件变量需要创建独立的 mutex lock、存储此 mutex,最终在不需要的时候将 mutex 销毁。

很不幸地,便利性是有代价的。一个 POSIX 的条件变量与它所关联的 mutex lock 是单独的同步实体。可以对同一个 mutex 使用两个不同的条件变量,或是在任何的 scope 内将 mutex 与条件变量混合配对。虽然等待-通知机制更容易使用且在大多数以信号为基础的同步范例上很适用,但它无法指派任何的同步 lock 给任意的通知对象。当你在请求相同的同步 lock 以保护共享数据而需要送信号给两个不同的通知对象时,条件变量是比较有效率的。

J2SE 加入了提供条件变量功能的 class。此 class 是与 Lock interface 并用。因为这个新的 interface(及由此来的对象)是与调用的对象以及 lock 对象分开的,它在使用方法上的灵活性就如同其他 threading 系统中的条件变量一样。在 Java 中,条件变量是实现出 Condition interface 的对象。Condition interface 是绑在 Lock interface 上,就好像等待-通知机制是绑在同步 lock 上一样。

可重入的互斥锁

//在公平锁和非公平锁都调用了该代码获取锁
acquire(1);

“当前线程”实际上是通过acquire(1)获取锁的。

这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是 0;锁被线程初次获取到了,它的状态值就变成了 1。

public final void acquire(int arg) {
    //调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法
    if (!tryAcquire(arg) &&
    //当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,
    //并且插入到AQS的队列的末尾.
   //继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面,尝试获取锁资源
    //如果没在前面,尝试将线程挂起,阻塞起来!
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
}

可重入性和公平性

由于 ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多次获取,每获取 1 次就将锁的状态加一。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为 1;再次获取锁时,将锁的状态值设为 2;依次类推…这就是为什么获取锁时,传入的参数是 1 的原因了。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        //拿到当前线程
        final Thread current = Thread.currentThread();
        //拿到AQS的state
        int c = getState();
        // 如果state == 0,说明没有线程占用着当前的锁资源
        if (c == 0) {
            //如果没有线程排队,直接直接CAS尝试获取锁资源
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                //如果获取资源成功,将当前线程设置为持有锁资源的线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
        else if (current == getExclusiveOwnerThread()) {
            //增加AQS的state的值
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  }
}

获取锁资源失败进队列

在获取锁资源失败后,需要将当前线程封装为 Node 对象,并且插入到 AQS 队列的末尾。

private Node addWaiter(Node mode) {
    // 将当前线程封装为Node对象,mode为null,代表互斥锁
    Node node = new Node(Thread.currentThread(), mode);
    // pred是tail节点
    Node pred = tail;
    // 如果pred不为null,有线程正在排队
    if (pred != null) {
        // 将当前节点的prev,指定tail尾节点
        node.prev = pred;
        // 以CAS的方式,将当前节点变为tail节点
        if (compareAndSetTail(pred, node)) {
            // 之前的tail的next指向当前节点
            pred.next = node;
            return node;
        }
    }
    // 添加的流程为,  自己prev指向、tail指向自己、前节点next指向我
    // 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
    enq(node);
    return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
    for (;;) {
        // 拿到tail
        Node t = tail;
        // 如果tail为null,说明当前没有Node在队列中
        if (t == null) { 
            // 创建一个新的Node作为head,并且将tail和head指向一个Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和上述代码一致!
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

释放锁

释放锁资源,将 state 减 1, 如果 state 减为 0 了,唤醒在队列中排队的 Node。

public final boolean release(int arg) {
    // 核心的释放锁资源方法
    if (tryRelease(arg)) {
        // 释放锁资源释放干净了。  (state == 0)
        Node h = head;
        // 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
        if (h != null && h.waitStatus != 0)
            // 唤醒线程
            unparkSuccessor(h);
        return true;
    }
    // 释放锁成功,但是state != 0
    return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
    // 获取state - 1
    int c = getState() - releases;
    // 如果释放锁的线程不是占用锁的线程,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功的将锁资源释放利索 (state == 0)
    boolean free = false;
    if (c == 0) {
        // 锁资源释放干净。
        free = true;
        // 将占用锁资源的属性设置为null
        setExclusiveOwnerThread(null);
    }
    // 将state赋值
    setState(c);
    // 返回true,代表释放干净了
    return free;
}


// 唤醒节点
private void unparkSuccessor(Node node) {
    // 拿到头节点状态
    int ws = node.waitStatus;
    // 如果头节点状态小于0,换为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到当前节点的next
    Node s = node.next;
    // 如果s == null ,或者s的状态为1
    if (s == null || s.waitStatus > 0) {
        // next节点不需要唤醒,需要唤醒next的next
        s = null;
        // 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 经过循环的获取,如果拿到状态正常的节点,并且不为null
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

ReentrantReadWriteLock

ReentrantReadWriteLock,是可重入读写锁。它维护了一对相关的锁 —— 读锁和写锁。

  • 读锁用于只读操作,它是共享锁,能同时被多个线程获取。
  • 写锁用于写入操作,它是独占锁,写锁只能被一个线程锁获取。

ReentrantReadWriteLock API

// 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。
ReentrantReadWriteLock()
// 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。
ReentrantReadWriteLock(boolean fair)

// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正在等待获取读取锁的线程。
protected Collection<Thread> getQueuedReaderThreads()
// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回一个 collection,它包含可能正在等待获取写入锁的线程。
protected Collection<Thread> getQueuedWriterThreads()
// 返回等待获取读取或写入锁的线程估计数目。
int getQueueLength()
// 查询当前线程在此锁上保持的重入读取锁数量。
int getReadHoldCount()
// 查询为此锁保持的读取锁数量。
int getReadLockCount()
// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回正等待与写入锁相关的给定条件的线程估计数目。
int getWaitQueueLength(Condition condition)
// 查询当前线程在此锁上保持的重入写入锁数量。
int getWriteHoldCount()
// 查询是否给定线程正在等待获取读取或写入锁。
boolean hasQueuedThread(Thread thread)
// 查询是否所有的线程正在等待获取读取或写入锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与写入锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果此锁将公平性设置为 ture,则返回 true。
boolean isFair()
// 查询是否某个线程保持了写入锁。
boolean isWriteLocked()
// 查询当前线程是否保持了写入锁。
boolean isWriteLockedByCurrentThread()
// 返回用于读取操作的锁。
ReentrantReadWriteLock.ReadLock readLock()
// 返回用于写入操作的锁。
ReentrantReadWriteLock.WriteLock writeLock()

AQS状态

AQS 的状态 state 是 32 位(int 类型)的,辦成两份,读锁用高 16 位,表示持有读锁的线程数(sharedCount),写锁低 16 位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount 不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount 和 exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。

abstract static class Sync extends AbstractQueuedSynchronizer {

    static final int SHARED_SHIFT   = 16;

    // 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

    // 写锁的可重入的最大次数、读锁允许的最大数量
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    // 写锁的掩码,用于状态的低16位有效值
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 读锁计数,当前持有读锁的线程数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    // 写锁的计数,也就是它的重入次数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

重入计数:

abstract static class Sync extends AbstractQueuedSynchronizer {



    //每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
    static final class HoldCounter {
        int count = 0;
        
        // 使用id而不是引用是为了避免保留垃圾。注意这是个常量。
        final long tid = Thread.currentThread().getId();
    }

    /**
     * 采用继承是为了重写 initialValue 方法,这样就不用进行这样的处理:
     * 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
     * 可以直接调用 get。
     **/
    static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {

        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }



    //保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
    private transient ThreadLocalHoldCounter readHolds;



    /**
     * 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
     * 通常情况下,下一个释放线程是最后一个获取线程。这不是 volatile 的,
     * 因为它仅用于试探的,线程进行缓存也是可以的
     * (因为判断是否是当前线程是通过线程id来比较的)。
     */
    private transient HoldCounter cachedHoldCounter;



    /**
     * firstReader是这样一个特殊线程:它是最后一个把 共享计数 从 0 改为 1 的
     * (在锁空闲的时候),而且从那之后还没有释放读锁的。如果不存在则为null。
     * firstReaderHoldCount 是 firstReader 的重入计数。
     *
     * firstReader 不能导致保留垃圾,因此在 tryReleaseShared 里设置为null,
     * 除非线程异常终止,没有释放读锁。
     *
     * 作用是在跟踪无竞争的读锁计数时非常便宜。
     *
     * firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
     */
    private transient Thread firstReader = null;

    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
    }
}

共享读锁的实现原理分析

//ReadLock
public void lock() {
    sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

acquireShared()首先会通过tryAcquireShared()来尝试获取锁。尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。

其中int tryAcquireShared(int unused)的具体实现如下:

protected final int tryAcquireShared(int unused) {
   /*
    * Walkthrough:
    * 1. If write lock held by another thread, fail.
    * 2. Otherwise, this thread is eligible for
    *    lock wrt state, so ask if it should block
    *    because of queue policy. If not, try
    *    to grant by CASing state and updating count.
    *    Note that step does not check for reentrant
    *    acquires, which is postponed to full version
    *    to avoid having to check hold count in
    *    the more typical non-reentrant case.
    * 3. If step 2 fails either because thread
    *    apparently not eligible or CAS fails or count
    *    saturated, chain to version with full retry loop.
    */
    Thread current = Thread.currentThread();
    int c = getState();
    //持有写锁的线程可以获取读锁,如果获取锁的线程不是当前线程,则返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);//获取共享读锁的数量
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            //如果首次获取锁,则初始化firstReader和firstReaderHoldCount
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            //如果当前线程是首次获取读锁的线程
            firstReaderHoldCount++;
        } else {
            //更新HoldCounter
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

整个函数的工作流程如下:

  • 如果写锁已经被持有了,但是持有写锁的不是当前写出,那么就直接返回-1(体现写锁的排他性).
  • 如果在尝试获取锁时不需要阻塞等待(由锁的公平性决定),并且读锁的共享计数小于最大值,那么就直接通过CAS更新读锁数量,获取读锁。
  • 如果第二步执行失败了,那么就会调用fullTryAcquireShared(current)

fullTryAcquireShared(current)的具体实现如下:

final int fullTryAcquireShared(Thread current) {
   /*
    * This code is in part redundant with that in
    * tryAcquireShared but is simpler overall by not
    * complicating tryAcquireShared with interactions between
    * retries and lazily reading hold counts.
    */
    HoldCounter rh = null;
    for (;;) { //自旋
        int c = getState();
        if (exclusiveCount(c) != 0) { //写锁已经被持有了
            if (getExclusiveOwnerThread() != current) //持有写锁的不是单线程
                return -1; //其它线程持有读锁后,就不能在获取写锁了
           // else we hold the exclusive lock; blocking here
           // would cause deadlock.
        } else if (readerShouldBlock()) {//由公平性决定需要阻塞
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) { 
                // assert firstReaderHoldCount > 0;
            } else {
                //更新锁计数(可重入的体现)
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            //如果当前线程的持有读锁数为0,那么就没必要使用计数器,直接移除
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT) //如果读锁的数量超过最大值了
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { //CAS更新读锁数量
            if (sharedCount(c) == 0) {
                //首次获取读锁
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //当前线程是首次获取读锁的线程,直接更新持有数
                firstReaderHoldCount++;
            } else {
                //当前线程是后来共享读锁的线程
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();//更新为当前线程的计数器 
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

可以看出其实int fullTryAcquireShared(Thread current)也每什么特别,它的代码和int tryAcquireShared(int unused)差不多。只不过是增加了自旋重试,和“持有读锁数的延迟读取”

回到acquireShared(int arg)方法,如果tryAcquireShared(arg)获取读锁失败后,会调用doAcquireShared(arg)

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); //添加一个共享模式的Node到等待队列尾部
    boolean failed = true;
    try {
        boolean interrupted = false; //获取前驱节点
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //如果前驱节点,尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取成功,更新等待队列,并唤醒下一个等待的节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && //检查获取失败后是否可以阻塞
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

整个获取共享读锁的流程总结出来,获取锁一般的流程就是首先尝试去直接获取,如果获取不到了,那么尝试自旋获取,如果还是获取不到,那么就去等待队列排队,排队的时候,如果发现自己是第二个那么就再次尝试获取锁,如果还是没获取到,那么就在等待队列中 park 阻塞等待了。

排他写锁的实现原理分析

排他写锁的实现原理与 ReentrantLock 一致。直接参照上节所讲即可。

StampedLock

ReentrantReadWriteLock 使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。它拥有的是悲观的读锁,如果在读多写少的情况下,使用不当很容易产生“饥饿”问题,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。

要进一步提升并发执行效率,Java 8 引入了新的读写锁:StampedLock。StampedLock 和 ReadWriteLock 相比,改进之处在于:读的过程中也允许获取写锁后写入。这样一来,读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

StampedLock的特点

try 系列获取锁的函数,当获取锁失败后会返回为 0 的 stamp 值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的 stamp 值。

StampedLock 的内部实现是基于 CLH 锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位 locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。

StampedLock 的等待队列与 AQS 的 CLH 队列对比

  1. 当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的 cowait 链中,cowait 链本质是一个栈;
  2. 当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
  3. AQS 类似唤醒线程的规则,都是首先唤醒队首结点。区别是 StampedLock 中,当唤醒的结点是读结点时,会唤醒该读结点的 cowait 链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。

另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。

StampedLock使用示例

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();    //涉及对共享资源的修改,使用写锁-独占操作
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    /**
     * 使用乐观读锁访问共享资源
     * 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
     * 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
     *
     * @return
     */
    double distanceFromOrigin() {
        
        
        double currentX = x, currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
        
        long stamp = sl.tryOptimisticRead();    // 使用乐观读锁
        // 注意下面这行代码不是原子操作
        // 假设x,y = (100,200), 但x,y可能被写线程修改为(300,400)
        double currentX = x, currentY = y;      // 拷贝共享资源到本地方法栈中
        // 检查乐观读锁后是否有其他写锁发生
        if (!sl.validate(stamp)) {  
            // 如果有写锁被占用,可能造成数据不一致,所以要切换到悲观读锁模式
            stamp = sl.readLock(); // 获取悲观锁            
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp); // 释放悲观锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) { // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);  //读锁转换为写锁
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

由上面的例子,我们可以总结出 StampedLock 的用法

long stamp = lock.tryOptimisticRead();  // 非阻塞获取版本信息
copyVaraibale2ThreadMemory();           // 拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){              // 校验
    long stamp = lock.readLock();       // 获取读锁
    try {
        copyVaraibale2ThreadMemory();   // 拷贝变量到线程本地堆栈
     } finally {
       lock.unlock(stamp);              // 释放悲观锁
    }

}
useThreadMemoryVarables();              // 使用线程本地堆栈里面的数据进行操作

和 ReadWriteLock 相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是 StampedLock 是不可重入锁,不能在一个线程中反复获取同一个锁。StampedLock 还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在 if-then-update 的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

StampedLock的内部常量

StampedLock 虽然不像其它锁一样定义了内部类来实现 AQS 框架,但是 StampedLock 的基本实现思路还是利用 CLH 队列进行线程的管理,通过同步状态值来表示锁的状态和类型。

StampedLock 内部定义了很多常量,定义这些常量的根本目的还是和 ReentrantReadWriteLock 一样,对同步状态值按位切分,以通过位运算对 State 进行操作:

对于 StampedLock 来说,写锁被占用的标志是第 8 位为 1,读锁使用 0-7 位,正常情况下读锁数目为 1-126,超过 126 时,使用一个名为readerOverflow的int整型保存超出数。

/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1

/** The number of bits to use for reader count before overflowing */
private static final int LG_READERS = 7;

// Values for lock state and stamp operations
private static final long RUNIT = 1L; //一单位读锁  0000 0001
private static final long WBIT  = 1L << LG_READERS; //写锁标志位 1000 0000
private static final long RBITS = WBIT - 1L; //读状态标识 0111 1111
private static final long RFULL = RBITS - 1L; //读锁的最大数量 0111 1110
private static final long ABITS = RBITS | WBIT; //用于获取读写状态 1111 1111
private static final long SBITS = ~RBITS; // note overlap with ABITS

// 初始status值
private static final long ORIGIN = WBIT << 1;

/** 同步状态 State,处于写锁使用第 8 位(为表示占用),读锁使用前7位(为 1-126,附加的 readerOverflow 用于当读锁超过 126 时) */
private transient volatile long state;
/** 因为读锁只使用前7位,所以当超过了 128 之后将使用一个 int 变量来记录 */
private transient int readerOverflow;

部分常量的比特位表示如下:

另外,StampedLock 相比 ReentrantReadWriteLock,对多核 CPU 进行了优化,可以看到,当 CPU 核数超过 1 时,会有一些自旋操作:

/** CPU 核数,用于控制自旋次数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();

/** 尝试获取锁时,如果超过该值仍未获得锁,则加入等待队列 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

/** 等待队列的首结点,自旋获取锁失败超过该值时,会继续阻塞 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

/** 再次进入阻塞之前的最大重试次数 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

StampedLock实现原理

等待队列节点状态

// 节点状态
// 初始状态 0
private static final int WAITING   = -1;
private static final int CANCELLED =  1;

// 节点类型
private static final int RMODE = 0;
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // list of linked readers
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;

StampedLock对象的创建

public StampedLock() {
    state = ORIGIN; //state 1 0000 0000
}

StampedLock 提供了三类视图:

// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;

这些视图其实是对 StamedLock 方法的封装,便于习惯了 ReentrantReadWriteLock 的用户使用:
例如,ReadLockView 其实相当于ReentrantReadWriteLock.readLock()返回的读锁

final class ReadLockView implements Lock {
    public void lock() { readLock(); }
    public void lockInterruptibly() throws InterruptedException {
        readLockInterruptibly();
    }
    public boolean tryLock() { return tryReadLock() != 0L; }
    public boolean tryLock(long time, TimeUnit unit)
        throws InterruptedException {
        return tryReadLock(time, unit) != 0L;
    }
    public void unlock() { unstampedUnlockRead(); }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

调用writeLock获取写锁

获取写锁成功

获取写锁成功,将调用 writeLock 方法

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    //(s = state) & ABITS == 0L 表示读锁和写锁都未被使用
    return ((((s = state) & ABITS) == 0L &&
             //CAS操作:将第8位置为1,表示写锁被占用
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));//获取失败则调用 acquireWrite,加入到等待队列
}

说明:上述代码获取写锁,如果获取失败,则进入阻塞,注意该方法不响应中断,返回非0表示获取成功。

写锁被其他线程持有,写锁获取失败

自旋入队操作,如果没用任何锁被占用,则立即尝试获取写锁,获取成功则返回,如果存在锁被使用,则将当前线程包装成独占节点,并插入等待队列尾部

/**
 * 尝试自旋的获取写锁, 获取不到则阻塞线程
 *
 * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
 * @param deadline      如果非0, 则表示限时获取
 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
 */
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;

    /**
     * 自旋入队操作
     * 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
     * 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
     */
    for (int spins = -1; ; ) {
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {      // 没有任何锁被占用
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))    // 尝试立即获取写锁
                return ns;                                                 // 获取成功直接返回
        } else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) {       // 队列为空, 则初始化队列, 构造队列的头结点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null)               // 将当前线程包装成写结点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {    // 链接结点至队尾
            p.next = node;
            break;
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 如果当前结点是队首结点, 则立即尝试获取写锁
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {      // 写锁未被占用
                    if (U.compareAndSwapLong(this, STATE, s,
                        ns = s + WBIT)) {               // CAS修改State: 占用写锁
                        // 将队首结点从队列移除
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 &&
                    --k <= 0)
                    break;
            }
        } else if (h != null) {  // 唤醒头结点的栈中的所有读线程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞当前调用线程
                long time;  // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                    U.park(false, time);    // emulate LockSupport.park
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

调用readLock获取读锁

写锁被其他线程持有,读锁获取失败

获取读锁失败,将调用 acquireRead 方法,加入等待队列:

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    return ((whead == wtail && (s & ABITS) < RFULL &&//表示写锁未被占用,且读锁数量没用超限
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

说明:上述代码获取读锁,如果写锁被占用,线程会阻塞,注意该方法不响应中断,返回非0表示获取成功。

/**
 * 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
 *
 * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
 * @param deadline      如果非0, 则表示限时获取
 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
 */
private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;   // node指向入队结点, p指向入队前的队尾结点

    /**
     * 自旋入队操作
     * 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
     * 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
     */
    for (int spins = -1; ; ) {
        WNode h;
        if ((h = whead) == (p = wtail)) {   // 如果队列为空或只有头结点, 则会立即尝试获取读锁
            for (long m, s, ns; ; ) {
                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))        //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                    return ns;          // 获取成功后, 直接返回
                else if (m >= WBIT) {   // 写锁被占用,以随机方式探测是否要退出自旋
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    } else {
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        spins = SPINS;
                    }
                }
            }
        }
        //首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,
        //插入到等待队列。 由于,如果目前等待队列还是空会初始化队列,然后将自身包装成一个读结点,
        //插入队尾,然后在下面这个地方跳出自旋:
        if (p == null) {                            // p == null表示队列为空, 则初始化队列(构造头结点)
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null) {                  // 将当前线程包装成读结点
            node = new WNode(RMODE, p);
        } else if (h == p || p.mode != RMODE) {     // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
        else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) {    // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
            node.cowait = null;
        } else {
            for (; ; ) {
                WNode pp, c;
                Thread w;
                // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                ns = s + RUNIT) :
                            (m < WBIT &&
                                (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);
                }
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
                        // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
                        U.park(false, time);
                    }
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 如果当前线程是队首结点, 则尝试获取读锁
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                    // 获取读锁成功, 释放cowait链中的所有读结点
                    WNode c;
                    Thread w;

                    // 释放头结点, 当前队首结点成为新的头结点
                    whead = node;
                    node.prev = null;

                    // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                } else if (m >= WBIT &&
                    LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        } else if (h != null) {     // 如果头结点存在cowait链, 则唤醒链中所有读线程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        //跳出自旋后,会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,
        //如果这次再获取不到,就会将前驱的等待状态置为WAITING,表示当前线程准备阻塞,需要去唤醒
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }                
            } 
            // 阻塞当前读线程
            else {        
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)   //限时等待超时, 取消等待
                    return cancelWaiter(node, node, false);

                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
                    // 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程
                    U.park(false, time);
                }
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

调用unlockWrite释放写锁

通过 CAS 操作,修改 State 成功后,会调用 release 方法唤醒等待队列的队首结点:

//释放写锁
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)//stamp不匹配,或者写锁未被占用,抛出异常
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//正常情况下,stamp+=WBIT后,第8位位0,表示写锁被释放;但是溢出,则置为ORIGIN
    if ((h = whead) != null && h.status != 0)
        release(h);//唤醒等待队列中的队首节点
}

release 方法非常简单,先将头结点的等待状态置为 0,表示即将唤醒后继结点,然后立即唤醒队首结点:

//唤醒等待队列的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
        if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);//唤醒队首节点
    }
}

调用unlockRead释放读锁

CAS 操作 State 将读锁数量减 1

//释放读锁
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        if (((s = state) & SBITS) != (stamp & SBITS) ||//stamp不匹配,或没用任何锁被占用,都会抛出异常
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {//读锁数量未超限
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {//读锁数量-1
                if (m == RUNIT && (h = whead) != null && h.status != 0)//如果当前读锁数量为1,唤醒等待队列中的队首节点
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)//读锁数量超限,则溢出字段要-1
            break;
    }
}

注意,当读锁的数量变为0时才会调用 release 方法,唤醒队首结点:

//唤醒等待队列中的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
        if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);//唤醒队首节点
    }
}