0%

JUC ConcurrentUtility

本文介绍了三种并发工具:CountDownLatch、CyclicBarrier、Semaphore。它们都直接或者间接使用了 AQS 同步器,使用了 JVM 给它提供的原语操作。

CountDownLatch

CountDownLatch 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。它继承了 AQS 同步器,使用共享锁对计数进行维护。

CountDownLatch API

CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()

CountDownLatch计数原理

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

该函数是创建一个 Sync 对象,而 Sync 是继承于 AQS 类。

Sync(int count) {
    setState(count);
}

setState()在AQS中实现,源码如下:

protected final void setState(long newState) {
    state = newState;
}

说明:在 AQS 中,state 是一个 private volatile long 类型的对象。对于 CountDownLatch 而言,state 表示的”锁计数器“。CountDownLatch 中的getCount()最终是调用 AQS中 的getState(),返回的 state 对象,即”锁计数器“。

await()

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

说明:该函数实际上是调用的 AQS 的acquireSharedInterruptibly(1)

public final void acquireSharedInterruptibly(long arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

说明acquireSharedInterruptibly()的作用是获取共享锁。

如果当前线程是中断状态,则抛出异常 InterruptedException。否则,调用tryAcquireShared(arg)尝试获取共享锁;尝试成功则返回,否则就调用doAcquireSharedInterruptibly()doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。

tryAcquireShared()在 CountDownLatch 中被重写,它的源码如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

说明tryAcquireShared()的作用是尝试获取共享锁。

如果”锁计数器=0”,即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取上一个节点。
            // 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire(),如果在尝试获取锁失败之后,线程应该等待
            // ,则返回true;
            // parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
            
            // (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
            // 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()

public void countDown() {
    sync.releaseShared(1);
}

说明:该函数实际上调用releaseShared(1)释放共享锁。

releaseShared()在AQS中实现,源码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

说明releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

tryReleaseShared()在 CountDownLatch 中被重写,源码如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 获取“锁计数器”的状态
        int c = getState();
        if (c == 0)
            return false;
        // “锁计数器”-1
        int nextc = c-1;
        // 通过CAS函数进行赋值。
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

说明tryReleaseShared()的作用是释放共享锁,将“锁计数器”的值 - 1。

CountDownLatch使用示例

public class CountDownLatchTest {

    private static CountDownLatch doneSignal;
    public static void main(String[] args) {

        try {
            doneSignal = new CountDownLatch(5);

            // 新建5个任务
            for(int i=0; i<LATCH_SIZE; i++)
                new InnerThread().start();

            System.out.println("main await begin.");
            // "主线程"等待线程池中5个任务的完成
            doneSignal.await();

            System.out.println("main await finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                // 将CountDownLatch的数值减1
                doneSignal.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果

main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.

结果说明

主线程通过doneSignal.await()等待其它线程将 doneSignal 递减至 0。其它的 5 个 InnerThread 线程,每一个都通过doneSignal.countDown()将 doneSignal 的值减 1;当 doneSignal 为 0 时,main 被唤醒后继续执行。

CyclicBarrier

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

注意比较CountDownLatch 和 CyclicBarrier:

  1. CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待。
  2. CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier。

CyclicBarrier API

CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。

CyclicBarrier实现原理

CyclicBarrier 是通过 ReentrantLock (独占锁)和 Condition 来实现的。

构造函数

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必须同时到达barrier的线程个数”。
    this.parties = parties;
    // count表示“处在等待状态的线程个数”。
    this.count = parties;
    // barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
    this.barrierCommand = barrierAction;
}

await()

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 获取“独占锁(lock)”
    lock.lock();
    try {
        // 保存“当前的generation”
        final Generation g = generation;

        // 若“当前generation已损坏”,则抛出异常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 将“count计数器”-1
       int index = --count;
       // 如果index=0,则意味着“有parties个线程到达barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不为null,则执行该动作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 唤醒所有等待线程,并更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
        // 当前线程才继续执行。
        for (;;) {
            try {
                // 如果不是“超时等待”,则调用await()进行等待;否则,调用awaitNanos()进行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待过程中,线程被中断,则执行下面的函数。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“当前generation已经损坏”,则抛出异常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已经换代”,则返回index。
            if (g != generation)
                return index;

            // 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放“独占锁(lock)”
        lock.unlock();
    }
}

说明dowait()的作用就是让当前线程阻塞,直到“有 parties 个线程到达 barrier” 或 “当前线程被中断” 或 “超时”这三者之一发生,当前线程才继续执行。

generation 是 CyclicBarrier 的一个成员遍历,它的定义如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在 CyclicBarrier 中,同一批的线程属于同一代,即同一个 Generation;CyclicBarrier 中通过 generation 对象,记录属于哪一代。当有 parties 个线程到达barrier,generation 就会被更新换代。

如果当前线程被中断,即Thread.interrupted()为 true;则通过breakBarrier()终止 CyclicBarrier。

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()会设置当前中断标记 broken 为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化 count;最后,通过signalAll()唤醒 CyclicBarrier 上所有的等待线程。

将“count计数器”-1,即 —count;然后判断是不是“有 parties 个线程到达 barrier”,即 index 是不是为0。
index=0时,如果 barrierCommand 不为 null,则执行该 barrierCommand,barrierCommand 就是我们创建 CyclicBarrier 时,传入的 Runnable对象。然后,调用nextGeneration()进行换代工作。

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化 count;最后,更新 generation 的值。

在for(;;)循环中。timed 是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

CyclicBarrier使用示例

public class CyclicBarrierTest {

    private static CyclicBarrier cb;
    public static void main(String[] args) {

        cb = new CyclicBarrier(5);

        // 新建5个任务
        for(int i=0; i<SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                // 将cb的参与者数量加1
                cb.await();

                // cb的参与者数量等于5时,才继续往后执行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

结果说明

主线程中新建了 5 个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到 cb 中所有线程都达到 barrier 时,这些线程才继续运行。

Semaphore

Semaphore是一个计数信号量,它的本质是一个”共享锁”。根据共享锁的获取原则,Semaphore分为”公平信号量”和”非公平信号量”。

公平信号量和非公平信号量的区别

公平信号量和非公平信号量的释放信号量的机制是一样的。不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在 CLH 队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在 CLH 队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。

信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。

Semaphore API

// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)

// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore实现原理

构造函数

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

信号量分为“公平信号量( FairSync )和非公平信号量( NonfairSync )。Semaphore(int permits)函数会默认创建“非公平信号量”。

公平信号量的获取

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

信号量中的acquire()获取函数,实际上是调用的 AQS 中的acquireSharedInterruptibly()

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果线程是中断状态,则抛出异常。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 否则,尝试获取“共享锁”;获取成功则直接返回,获取失败,则通过doAcquireSharedInterruptibly()获取。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Semaphore中”公平锁“对应的tryAcquireShared()实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则返回-1。
        if (hasQueuedPredecessors())
            return -1;
        // 设置“可以获得的信号量的许可数”
        int available = getState();
        // 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
        int remaining = available - acquires;
        // 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

说明tryAcquireShared()的作用是尝试获取 acquires 个信号量许可数。对于 Semaphore 而言,state 表示的是“当前可获得的信号量许可数”。

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 创建”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到CLH队列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取上一个节点。
            // 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 当前线程一直等待,直到获取到共享锁。
            // 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

说明doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。

公平信号量的释放

Semaphore中公平信号量(FairSync)的释放API如下:

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

信号量的releases()释放函数,实际上是调用的 AQS 中的releaseShared()

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

说明releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

Semaphore重写了tryReleaseShared()

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取“可以获得的信号量的许可数”
        int current = getState();
        // 获取“释放releases个信号量许可之后,剩余的信号量许可数”
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 设置“可以获得的信号量的许可数”为next。
        if (compareAndSetState(current, next))
            return true;
    }
}

如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。

private void doReleaseShared() {
    for (;;) {
        // 获取CLH队列的头节点
        Node h = head;
        // 如果头节点不为null,并且头节点不等于tail节点。
        if (h != null && h != tail) {
            // 获取头节点对应的线程的状态
            int ws = h.waitStatus;
            // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
            if (ws == Node.SIGNAL) {
                // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒“头节点的下一个节点所对应的线程”。
                unparkSuccessor(h);
            }
            // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头节点发生变化,则继续循环。否则,退出循环。
        if (h == head)                   // loop if head changed
            break;
    }
}

说明doReleaseShared()会释放“共享锁”。它会从前往后的遍历 CLH 队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

非公平信号量获取和释放

Semaphore 中的非公平信号量是 NonFairSync。在 Semaphore 中,“非公平信号量许可的释放(release)”与“公平信号量许可的释放(release)”是一样的。不同的是它们获取“信号量许可”的机制不同,下面是非公平信号量获取信号量许可的代码。

非公平信号量的tryAcquireShared()实现如下

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared()的实现如下

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 设置“可以获得的信号量的许可数”
        int available = getState();
        // 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
        int remaining = available - acquires;
        // 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

说明:非公平信号量的tryAcquireShared()调用 AQS 中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的 for 循环中,它都会直接判断“当前剩余的信号量许可数”是否足够;足够的话,则直接“设置可以获得的信号量许可数”,进而再获取信号量。而公平信号量的tryAcquireShared()中,在获取信号量之前会通过if (hasQueuedPredecessors())来判断“当前线程是不是在 CLH 队列的头部”,是的话,则返回 -1。

Semaphore示例

public class SemaphoreTest {
    private static final int SEM_MAX = 10;
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(SEM_MAX);
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        //在线程池中执行任务
        threadPool.execute(new MyThread(sem, 5));
        threadPool.execute(new MyThread(sem, 4));
        threadPool.execute(new MyThread(sem, 7));
        //关闭池
        threadPool.shutdown();
    }
}

class MyThread extends Thread {
    private volatile Semaphore sem;    // 信号量
    private int count;        // 申请信号量的大小

    MyThread(Semaphore sem, int count) {
        this.sem = sem;
        this.count = count;
    }

    public void run() {
        try {
            // 从信号量中获取count个许可
            sem.acquire(count);

            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " acquire count=" + count);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放给定数目的许可,将其返回到信号量。
            sem.release(count);
            System.out.println(Thread.currentThread().getName() + " release " + count + "");
        }
    }
}

(某一次)运行结果

pool-1-thread-1 acquire count=5
pool-1-thread-2 acquire count=4
pool-1-thread-1 release 5
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=7
pool-1-thread-3 release 7

结果说明

信号量 sem 的许可总数是 10 个;共 3 个线程,分别需要获取的信号量许可数是 5,4,7。前面两个线程获取到信号量的许可后,sem 中剩余的可用的许可数是 1;因此,最后一个线程必须等前两个线程释放了它们所持有的信号量许可之后,才能获取到 7 个信号量许可。