锁是为了保护竞争资源,防止多个线程同时操作共享资源而发生数据不一致的问题。本文介绍了三种 JUC 包提供的锁:ReentrantLock、ReentrantReadWriteLock、StampedLock。基于 AQS 实现的有 ReentrantLock、ReentrantReadWriteLock,而 StampedLock 实现了自己的队列。
ReentrantLock
ReentrantLock 是一个可重入的互斥锁,又被称为“独占锁”,即在同一个时间点只能被一个线程锁持有。它分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。ReentraantLock 是通过一个 FIFO 的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。下图是 ReentrantLock 的 UML 图。
上图可看出 ReentrantLock 与 Sync 是组合关系。并且,Sync 是 AQS 的子类,FairSync(公平锁)和 NonFairSync(非公平锁)。ReentrantLock 是一个独占锁,至于它具体是公平锁还是非公平锁,就取决于 Sync 对象是 FairSync 的实例还是 NonFairSync 的实例。
ReentrantLock API
ReentrantLock 线程状态
函数 | 解释 | 返回值 |
---|---|---|
getHoldCount | 查询当前线程保持此锁的次数 | int |
getQueueLength | 查询等待此锁释放的线程估计数 | int |
getWaitQueueLength(Condition condition) | 返回Condition所辖的等待唤醒的线程估计数(或者是已经await的线程数) | int |
getOwner | 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 | Thread |
getQueuedThreads() | 返回一个 collection,它包含可能正等待获取此锁的线程。 | Collection |
getWaitingThreads(Condition condition) | 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 | Collection |
hasQueuedThread(Thread thread) | 查询指定线程是否正在等待获取此锁 | boolean |
hasQueuedThreads | 查询ReentrantLock所辖线程是否有线程等待获取此锁 | boolean |
hasWaiters(Condition condition) | 查询Condition所辖线程是否有线程等待获取此锁 | boolean |
isFair | 判断是不是公平锁 | boolean |
isHeldByCurrentThread | 判断当前线程是否获取此锁 | boolean |
isLocked | 判断是否有任意线程获取此锁 | boolean |
ReentrantLock 锁功能
函数 | 解释 | 返回值 |
---|---|---|
lock | 获取锁 | void |
lockInterruptibly | 如果当前线程未被中断则获取锁定,已经被中断则出现异常 | void |
tryLock | 在调用时,尝试获取锁。如果获取到锁,返回true;否则直接返回false,不等待 | boolean |
tryLock(long timeout, TimeUnit unit) | 如果锁在给定等待时间内没有被另一个线程获得,且当前线程未被中断,则获取该锁定。 | boolean |
unlock | 释放锁 | void |
ReentrantLock 条件变量
函数 | 解释 | 返回值 |
---|---|---|
newCondition | 返回用来与此Lock实例一起使用的Condition实例 | Condition |
await | 等待条件的发生,线程在await状态interrupt会报错 | void |
awaitUninterruptibly | 等待条件的发生。与await()方法不同,它的调用不可能被中断的。 | void |
awaitNanos(long nanosTimeout) | 等待条件的发生。然而,如果通知没有在nanosTimeout所指定的纳秒内出现的话,它还是会返回。返回值是时限所剩的估计值,等于或小于零的返回值表示这个返回是因为时限到了。跟其他的一样,这个method的实际分辨率与平台所定有关,通常是毫秒。 | long |
awaitUntil(Date deadline) | 等待条件的发生。然而,如果通知没有在指定的绝对时间前出现,它会以 false值返回。 | boolean |
signal | 通知某个等待使用Condition对象的thread此条件已经发生。 | void |
signalAll | 通知所有等待使用Condition对象的thread此条件已经发生。 | void |
条件变量
条件变量(condition variable)是一种由许多其他 threading 系统所提供的同步类型。条件变量非常类似 Java 的等待-通知机制,事实上,在大部分情况下它们的功能是相同的。
POSIX 条件变量的四个基本函数 wait、timed_wait、signal 与 broadcast,直接对应到 Java 所提供的 method(wait()
、wait(long timeout)
、notify()
、notifyAll()
)。它们的实现在逻辑上也是相同的。条件变量的 wait 操作需要持有 mutex 的 lock。它在返回到调用者之前的等待与重新取得 lock 的时候会释放掉 lock。signal 这个 function 会唤醒一个 thread,而 broadcast 会唤醒所有在等待的 thread,这些 function 同样也需要在调用的时候持有 mutex。条件变量的 race condition 会以与 Java 的等待-通知机制相同的方式来解决。
然而其中还是有些许的不同之处,等待-通知机制是与所关联的 lock 高度地整合过,这会让此机制比条件变量的相对应功能更易于使用。从 synchronized 程序代码区段调用 wait 与 notify 方法就是它们自然的运用方式。然而,使用条件变量需要创建独立的 mutex lock、存储此 mutex,最终在不需要的时候将 mutex 销毁。
很不幸地,便利性是有代价的。一个 POSIX 的条件变量与它所关联的 mutex lock 是单独的同步实体。可以对同一个 mutex 使用两个不同的条件变量,或是在任何的 scope 内将 mutex 与条件变量混合配对。虽然等待-通知机制更容易使用且在大多数以信号为基础的同步范例上很适用,但它无法指派任何的同步 lock 给任意的通知对象。当你在请求相同的同步 lock 以保护共享数据而需要送信号给两个不同的通知对象时,条件变量是比较有效率的。
J2SE 加入了提供条件变量功能的 class。此 class 是与 Lock interface 并用。因为这个新的 interface(及由此来的对象)是与调用的对象以及 lock 对象分开的,它在使用方法上的灵活性就如同其他 threading 系统中的条件变量一样。在 Java 中,条件变量是实现出 Condition interface 的对象。Condition interface 是绑在 Lock interface 上,就好像等待-通知机制是绑在同步 lock 上一样。
可重入的互斥锁
//在公平锁和非公平锁都调用了该代码获取锁
acquire(1);
“当前线程”实际上是通过acquire(1)
获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是 0;锁被线程初次获取到了,它的状态值就变成了 1。
public final void acquire(int arg) {
//调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法
if (!tryAcquire(arg) &&
//当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,
//并且插入到AQS的队列的末尾.
//继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面,尝试获取锁资源
//如果没在前面,尝试将线程挂起,阻塞起来!
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可重入性和公平性
由于 ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多次获取,每获取 1 次就将锁的状态加一。也就是说,初次获取锁时,通过acquire(1)
将锁的状态值设为 1;再次获取锁时,将锁的状态值设为 2;依次类推…这就是为什么获取锁时,传入的参数是 1 的原因了。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//拿到当前线程
final Thread current = Thread.currentThread();
//拿到AQS的state
int c = getState();
// 如果state == 0,说明没有线程占用着当前的锁资源
if (c == 0) {
//如果没有线程排队,直接直接CAS尝试获取锁资源
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//如果获取资源成功,将当前线程设置为持有锁资源的线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
//增加AQS的state的值
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
获取锁资源失败进队列
在获取锁资源失败后,需要将当前线程封装为 Node 对象,并且插入到 AQS 队列的末尾。
private Node addWaiter(Node mode) {
// 将当前线程封装为Node对象,mode为null,代表互斥锁
Node node = new Node(Thread.currentThread(), mode);
// pred是tail节点
Node pred = tail;
// 如果pred不为null,有线程正在排队
if (pred != null) {
// 将当前节点的prev,指定tail尾节点
node.prev = pred;
// 以CAS的方式,将当前节点变为tail节点
if (compareAndSetTail(pred, node)) {
// 之前的tail的next指向当前节点
pred.next = node;
return node;
}
}
// 添加的流程为, 自己prev指向、tail指向自己、前节点next指向我
// 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
enq(node);
return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
for (;;) {
// 拿到tail
Node t = tail;
// 如果tail为null,说明当前没有Node在队列中
if (t == null) {
// 创建一个新的Node作为head,并且将tail和head指向一个Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 和上述代码一致!
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
释放锁
释放锁资源,将 state 减 1, 如果 state 减为 0 了,唤醒在队列中排队的 Node。
public final boolean release(int arg) {
// 核心的释放锁资源方法
if (tryRelease(arg)) {
// 释放锁资源释放干净了。 (state == 0)
Node h = head;
// 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
// 释放锁成功,但是state != 0
return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
// 获取state - 1
int c = getState() - releases;
// 如果释放锁的线程不是占用锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否成功的将锁资源释放利索 (state == 0)
boolean free = false;
if (c == 0) {
// 锁资源释放干净。
free = true;
// 将占用锁资源的属性设置为null
setExclusiveOwnerThread(null);
}
// 将state赋值
setState(c);
// 返回true,代表释放干净了
return free;
}
// 唤醒节点
private void unparkSuccessor(Node node) {
// 拿到头节点状态
int ws = node.waitStatus;
// 如果头节点状态小于0,换为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到当前节点的next
Node s = node.next;
// 如果s == null ,或者s的状态为1
if (s == null || s.waitStatus > 0) {
// next节点不需要唤醒,需要唤醒next的next
s = null;
// 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 经过循环的获取,如果拿到状态正常的节点,并且不为null
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
ReentrantReadWriteLock
ReentrantReadWriteLock,是可重入读写锁。它维护了一对相关的锁 —— 读锁和写锁。
- 读锁用于只读操作,它是共享锁,能同时被多个线程获取。
- 写锁用于写入操作,它是独占锁,写锁只能被一个线程锁获取。
ReentrantReadWriteLock API
// 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。
ReentrantReadWriteLock()
// 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。
ReentrantReadWriteLock(boolean fair)
// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正在等待获取读取锁的线程。
protected Collection<Thread> getQueuedReaderThreads()
// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回一个 collection,它包含可能正在等待获取写入锁的线程。
protected Collection<Thread> getQueuedWriterThreads()
// 返回等待获取读取或写入锁的线程估计数目。
int getQueueLength()
// 查询当前线程在此锁上保持的重入读取锁数量。
int getReadHoldCount()
// 查询为此锁保持的读取锁数量。
int getReadLockCount()
// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回正等待与写入锁相关的给定条件的线程估计数目。
int getWaitQueueLength(Condition condition)
// 查询当前线程在此锁上保持的重入写入锁数量。
int getWriteHoldCount()
// 查询是否给定线程正在等待获取读取或写入锁。
boolean hasQueuedThread(Thread thread)
// 查询是否所有的线程正在等待获取读取或写入锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与写入锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果此锁将公平性设置为 ture,则返回 true。
boolean isFair()
// 查询是否某个线程保持了写入锁。
boolean isWriteLocked()
// 查询当前线程是否保持了写入锁。
boolean isWriteLockedByCurrentThread()
// 返回用于读取操作的锁。
ReentrantReadWriteLock.ReadLock readLock()
// 返回用于写入操作的锁。
ReentrantReadWriteLock.WriteLock writeLock()
AQS状态
AQS 的状态 state 是 32 位(int 类型)的,辦成两份,读锁用高 16 位,表示持有读锁的线程数(sharedCount),写锁低 16 位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount 不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount 和 exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 写锁的可重入的最大次数、读锁允许的最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁的掩码,用于状态的低16位有效值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁计数,当前持有读锁的线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁的计数,也就是它的重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
重入计数:
abstract static class Sync extends AbstractQueuedSynchronizer {
//每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
static final class HoldCounter {
int count = 0;
// 使用id而不是引用是为了避免保留垃圾。注意这是个常量。
final long tid = Thread.currentThread().getId();
}
/**
* 采用继承是为了重写 initialValue 方法,这样就不用进行这样的处理:
* 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
* 可以直接调用 get。
**/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
//保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
private transient ThreadLocalHoldCounter readHolds;
/**
* 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
* 通常情况下,下一个释放线程是最后一个获取线程。这不是 volatile 的,
* 因为它仅用于试探的,线程进行缓存也是可以的
* (因为判断是否是当前线程是通过线程id来比较的)。
*/
private transient HoldCounter cachedHoldCounter;
/**
* firstReader是这样一个特殊线程:它是最后一个把 共享计数 从 0 改为 1 的
* (在锁空闲的时候),而且从那之后还没有释放读锁的。如果不存在则为null。
* firstReaderHoldCount 是 firstReader 的重入计数。
*
* firstReader 不能导致保留垃圾,因此在 tryReleaseShared 里设置为null,
* 除非线程异常终止,没有释放读锁。
*
* 作用是在跟踪无竞争的读锁计数时非常便宜。
*
* firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
}
}
共享读锁的实现原理分析
//ReadLock
public void lock() {
sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireShared()
首先会通过tryAcquireShared()
来尝试获取锁。尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。尝试失败的话,则通过doAcquireShared()
来获取锁。doAcquireShared()
会获取到锁了才返回。
其中int tryAcquireShared(int unused)
的具体实现如下:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//持有写锁的线程可以获取读锁,如果获取锁的线程不是当前线程,则返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//获取共享读锁的数量
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
//如果首次获取锁,则初始化firstReader和firstReaderHoldCount
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果当前线程是首次获取读锁的线程
firstReaderHoldCount++;
} else {
//更新HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
整个函数的工作流程如下:
- 如果写锁已经被持有了,但是持有写锁的不是当前写出,那么就直接返回-1(体现写锁的排他性).
- 如果在尝试获取锁时不需要阻塞等待(由锁的公平性决定),并且读锁的共享计数小于最大值,那么就直接通过CAS更新读锁数量,获取读锁。
- 如果第二步执行失败了,那么就会调用
fullTryAcquireShared(current)
fullTryAcquireShared(current)
的具体实现如下:
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) { //自旋
int c = getState();
if (exclusiveCount(c) != 0) { //写锁已经被持有了
if (getExclusiveOwnerThread() != current) //持有写锁的不是单线程
return -1; //其它线程持有读锁后,就不能在获取写锁了
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//由公平性决定需要阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
//更新锁计数(可重入的体现)
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
//如果当前线程的持有读锁数为0,那么就没必要使用计数器,直接移除
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) //如果读锁的数量超过最大值了
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { //CAS更新读锁数量
if (sharedCount(c) == 0) {
//首次获取读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//当前线程是首次获取读锁的线程,直接更新持有数
firstReaderHoldCount++;
} else {
//当前线程是后来共享读锁的线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();//更新为当前线程的计数器
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
可以看出其实int fullTryAcquireShared(Thread current)
也每什么特别,它的代码和int tryAcquireShared(int unused)
差不多。只不过是增加了自旋重试,和“持有读锁数的延迟读取”
回到acquireShared(int arg)
方法,如果tryAcquireShared(arg)
获取读锁失败后,会调用doAcquireShared(arg)
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); //添加一个共享模式的Node到等待队列尾部
boolean failed = true;
try {
boolean interrupted = false; //获取前驱节点
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果前驱节点,尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取成功,更新等待队列,并唤醒下一个等待的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && //检查获取失败后是否可以阻塞
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
整个获取共享读锁的流程总结出来,获取锁一般的流程就是首先尝试去直接获取,如果获取不到了,那么尝试自旋获取,如果还是获取不到,那么就去等待队列排队,排队的时候,如果发现自己是第二个那么就再次尝试获取锁,如果还是没获取到,那么就在等待队列中 park 阻塞等待了。
排他写锁的实现原理分析
排他写锁的实现原理与 ReentrantLock 一致。直接参照上节所讲即可。
StampedLock
ReentrantReadWriteLock 使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。它拥有的是悲观的读锁,如果在读多写少的情况下,使用不当很容易产生“饥饿”问题,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。
要进一步提升并发执行效率,Java 8 引入了新的读写锁:StampedLock。StampedLock 和 ReadWriteLock 相比,改进之处在于:读的过程中也允许获取写锁后写入。这样一来,读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。
StampedLock的特点
try 系列获取锁的函数,当获取锁失败后会返回为 0 的 stamp 值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的 stamp 值。
StampedLock 的内部实现是基于 CLH 锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位 locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。
StampedLock 的等待队列与 AQS 的 CLH 队列对比
- 当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的 cowait 链中,cowait 链本质是一个栈;
- 当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
- AQS 类似唤醒线程的规则,都是首先唤醒队首结点。区别是 StampedLock 中,当唤醒的结点是读结点时,会唤醒该读结点的 cowait 链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。
另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。
StampedLock使用示例
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock(); //涉及对共享资源的修改,使用写锁-独占操作
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
/**
* 使用乐观读锁访问共享资源
* 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
* 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
*
* @return
*/
double distanceFromOrigin() {
double currentX = x, currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
long stamp = sl.tryOptimisticRead(); // 使用乐观读锁
// 注意下面这行代码不是原子操作
// 假设x,y = (100,200), 但x,y可能被写线程修改为(300,400)
double currentX = x, currentY = y; // 拷贝共享资源到本地方法栈中
// 检查乐观读锁后是否有其他写锁发生
if (!sl.validate(stamp)) {
// 如果有写锁被占用,可能造成数据不一致,所以要切换到悲观读锁模式
stamp = sl.readLock(); // 获取悲观锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp); // 释放悲观锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp); //读锁转换为写锁
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
由上面的例子,我们可以总结出 StampedLock 的用法
long stamp = lock.tryOptimisticRead(); // 非阻塞获取版本信息
copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
long stamp = lock.readLock(); // 获取读锁
try {
copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
} finally {
lock.unlock(stamp); // 释放悲观锁
}
}
useThreadMemoryVarables(); // 使用线程本地堆栈里面的数据进行操作
和 ReadWriteLock 相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()
获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()
去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。
可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是 StampedLock 是不可重入锁,不能在一个线程中反复获取同一个锁。StampedLock 还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在 if-then-update 的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
StampedLock的内部常量
StampedLock 虽然不像其它锁一样定义了内部类来实现 AQS 框架,但是 StampedLock 的基本实现思路还是利用 CLH 队列进行线程的管理,通过同步状态值来表示锁的状态和类型。
StampedLock 内部定义了很多常量,定义这些常量的根本目的还是和 ReentrantReadWriteLock 一样,对同步状态值按位切分,以通过位运算对 State 进行操作:
对于 StampedLock 来说,写锁被占用的标志是第 8 位为 1,读锁使用 0-7 位,正常情况下读锁数目为 1-126,超过 126 时,使用一个名为
readerOverflow
的int整型保存超出数。
/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
/** The number of bits to use for reader count before overflowing */
private static final int LG_READERS = 7;
// Values for lock state and stamp operations
private static final long RUNIT = 1L; //一单位读锁 0000 0001
private static final long WBIT = 1L << LG_READERS; //写锁标志位 1000 0000
private static final long RBITS = WBIT - 1L; //读状态标识 0111 1111
private static final long RFULL = RBITS - 1L; //读锁的最大数量 0111 1110
private static final long ABITS = RBITS | WBIT; //用于获取读写状态 1111 1111
private static final long SBITS = ~RBITS; // note overlap with ABITS
// 初始status值
private static final long ORIGIN = WBIT << 1;
/** 同步状态 State,处于写锁使用第 8 位(为表示占用),读锁使用前7位(为 1-126,附加的 readerOverflow 用于当读锁超过 126 时) */
private transient volatile long state;
/** 因为读锁只使用前7位,所以当超过了 128 之后将使用一个 int 变量来记录 */
private transient int readerOverflow;
部分常量的比特位表示如下:
另外,StampedLock 相比 ReentrantReadWriteLock,对多核 CPU 进行了优化,可以看到,当 CPU 核数超过 1 时,会有一些自旋操作:
/** CPU 核数,用于控制自旋次数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 尝试获取锁时,如果超过该值仍未获得锁,则加入等待队列 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** 等待队列的首结点,自旋获取锁失败超过该值时,会继续阻塞 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** 再次进入阻塞之前的最大重试次数 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
StampedLock实现原理
等待队列节点状态
// 节点状态
// 初始状态 0
private static final int WAITING = -1;
private static final int CANCELLED = 1;
// 节点类型
private static final int RMODE = 0;
private static final int WMODE = 1;
/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
StampedLock对象的创建
public StampedLock() {
state = ORIGIN; //state 1 0000 0000
}
StampedLock 提供了三类视图:
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
这些视图其实是对 StamedLock 方法的封装,便于习惯了 ReentrantReadWriteLock 的用户使用:
例如,ReadLockView 其实相当于ReentrantReadWriteLock.readLock()
返回的读锁
final class ReadLockView implements Lock {
public void lock() { readLock(); }
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
public boolean tryLock() { return tryReadLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
public void unlock() { unstampedUnlockRead(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
调用writeLock获取写锁
获取写锁成功
获取写锁成功,将调用 writeLock 方法
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
//(s = state) & ABITS == 0L 表示读锁和写锁都未被使用
return ((((s = state) & ABITS) == 0L &&
//CAS操作:将第8位置为1,表示写锁被占用
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));//获取失败则调用 acquireWrite,加入到等待队列
}
说明:上述代码获取写锁,如果获取失败,则进入阻塞,注意该方法不响应中断,返回非0表示获取成功。
写锁被其他线程持有,写锁获取失败
自旋入队操作,如果没用任何锁被占用,则立即尝试获取写锁,获取成功则返回,如果存在锁被使用,则将当前线程包装成独占节点,并插入等待队列尾部
/**
* 尝试自旋的获取写锁, 获取不到则阻塞线程
*
* @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
* @param deadline 如果非0, 则表示限时获取
* @return 非0表示获取成功, INTERRUPTED表示中途被中断过
*/
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
/**
* 自旋入队操作
* 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
* 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
*/
for (int spins = -1; ; ) {
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) { // 没有任何锁被占用
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 尝试立即获取写锁
return ns; // 获取成功直接返回
} else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) { // 队列为空, 则初始化队列, 构造队列的头结点
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) // 将当前线程包装成写结点
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 链接结点至队尾
p.next = node;
break;
}
}
for (int spins = -1; ; ) {
WNode h, np, pp;
int ps;
if ((h = whead) == p) { // 如果当前结点是队首结点, 则立即尝试获取写锁
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins; ; ) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) { // 写锁未被占用
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) { // CAS修改State: 占用写锁
// 将队首结点从队列移除
whead = node;
node.prev = null;
return ns;
}
} else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
} else if (h != null) { // 唤醒头结点的栈中的所有读线程
WNode c;
Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
} else if ((ps = p.status) == 0) // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else { // 阻塞当前调用线程
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
调用readLock获取读锁
写锁被其他线程持有,读锁获取失败
获取读锁失败,将调用 acquireRead 方法,加入等待队列:
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&//表示写锁未被占用,且读锁数量没用超限
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
说明:上述代码获取读锁,如果写锁被占用,线程会阻塞,注意该方法不响应中断,返回非0表示获取成功。
/**
* 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
*
* @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
* @param deadline 如果非0, 则表示限时获取
* @return 非0表示获取成功, INTERRUPTED表示中途被中断过
*/
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p; // node指向入队结点, p指向入队前的队尾结点
/**
* 自旋入队操作
* 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
* 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
*/
for (int spins = -1; ; ) {
WNode h;
if ((h = whead) == (p = wtail)) { // 如果队列为空或只有头结点, 则会立即尝试获取读锁
for (long m, s, ns; ; ) {
if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
return ns; // 获取成功后, 直接返回
else if (m >= WBIT) { // 写锁被占用,以随机方式探测是否要退出自旋
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
//首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,
//插入到等待队列。 由于,如果目前等待队列还是空会初始化队列,然后将自身包装成一个读结点,
//插入队尾,然后在下面这个地方跳出自旋:
if (p == null) { // p == null表示队列为空, 则初始化队列(构造头结点)
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null) { // 将当前线程包装成读结点
node = new WNode(RMODE, p);
} else if (h == p || p.mode != RMODE) { // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
// 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
node.cowait = null;
} else {
for (; ; ) {
WNode pp, c;
Thread w;
// 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
// 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
U.park(false, time);
}
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
for (int spins = -1; ; ) {
WNode h, np, pp;
int ps;
if ((h = whead) == p) { // 如果当前线程是队首结点, 则尝试获取读锁
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins; ; ) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
// 获取读锁成功, 释放cowait链中的所有读结点
WNode c;
Thread w;
// 释放头结点, 当前队首结点成为新的头结点
whead = node;
node.prev = null;
// 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
U.unpark(w);
}
return ns;
} else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
} else if (h != null) { // 如果头结点存在cowait链, 则唤醒链中所有读线程
WNode c;
Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
//跳出自旋后,会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,
//如果这次再获取不到,就会将前驱的等待状态置为WAITING,表示当前线程准备阻塞,需要去唤醒
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
} else if ((ps = p.status) == 0) // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
// 阻塞当前读线程
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) //限时等待超时, 取消等待
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
// 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程
U.park(false, time);
}
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
调用unlockWrite释放写锁
通过 CAS 操作,修改 State 成功后,会调用 release 方法唤醒等待队列的队首结点:
//释放写锁
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)//stamp不匹配,或者写锁未被占用,抛出异常
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//正常情况下,stamp+=WBIT后,第8位位0,表示写锁被释放;但是溢出,则置为ORIGIN
if ((h = whead) != null && h.status != 0)
release(h);//唤醒等待队列中的队首节点
}
release 方法非常简单,先将头结点的等待状态置为 0,表示即将唤醒后继结点,然后立即唤醒队首结点:
//唤醒等待队列的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);//唤醒队首节点
}
}
调用unlockRead释放读锁
CAS 操作 State 将读锁数量减 1
//释放读锁
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||//stamp不匹配,或没用任何锁被占用,都会抛出异常
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {//读锁数量未超限
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {//读锁数量-1
if (m == RUNIT && (h = whead) != null && h.status != 0)//如果当前读锁数量为1,唤醒等待队列中的队首节点
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)//读锁数量超限,则溢出字段要-1
break;
}
}
注意,当读锁的数量变为0时才会调用 release 方法,唤醒队首结点:
//唤醒等待队列中的队首节点(即头结点whead的后继节点)
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//将头结点的等待状态从-1置为0,表示将要唤醒后继节点
if ((q = h.next) == null || q.status == CANCELLED) {//从队尾开始查找距离头结点最近的WAITING节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);//唤醒队首节点
}
}