0%

JUC ThreadPool

线程池本质上是生产者-消费者模型,任务队列中的线程会进入到线程池中,由线程池进行管理。线程池本质上是解决线程反复创建产生的性能损耗,将线程重用提高性能。当然,如果无谓的创建线程池也会导致性能的浪费,这完全取决于场景。

对于一台机器来说,CPU 的核心数有限,同时能运行的线程数有限,所以需要根据调度算法切换执行的线程,而线程的切换需要开销,比如替换寄存器的内容、高速缓存的失效等等。如果线程数太多,切换的频率就变高,可能使得多线程带来的好处抵不过线程切换带来的开销,得不偿失。因此,线程池应运而生,且其使用完全取决于场景。

JDK 提供的线程池分为三种:ThreadPoolExecutor、ForkJoinPool、ScheduledThreadPoolExecutor。分别用作不同的场景下:ThreadPoolExecutor 适用于线程调度,ScheduledThreadPoolExecutor 适用于周期或延时任务处理。

接口定义

Executor接口

定义了线程池最基本的执行接口。

void execute(Runnable command);

ExecutorService接口

扩展了对线程池的状态管理,并提供了两种执行任务方式:以集合形式提交执行以及单任务提交执行。

// 如果关闭后所有任务都已完成,则返回 true。
boolean isTerminated()
// 调用线程阻塞到线程池shutdown后执行完所有任务、函数调用超时或者当前线程中断中的任何一个事件发生后结束。
boolean awaitTermination(long timeout, TimeUnit unit)  
// 如果线程池shutdown,则返回 true。
boolean isShutdown()

// 启动一次顺序关闭,不接受新任务,执行以前提交的任务。
void shutdown()
// 试图停止所有正在执行的活动任务,停止处理正在等待的任务,并返回等待执行的任务列表。
List<Runnable> shutdownNow()
  
// 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
<T> Future<T> submit(Callable<T> task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
Future<?> submit(Runnable task)
// 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)
  
// 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
// 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
  
// 执行给定任务组,如果某个任务已成功完成(也就是未抛出异常),则返回其结果,中断其他任务。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
// 执行给定任务组,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果,中断其他任务。
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)

ScheduledExecutorService接口

扩展了线程池执行定时任务的功能,并提供了两种执行方式:以固定延时执行(FixedDelay)和以固定时间间隔执行(FixedRate)。

//创建并执行在给定延迟后启用的单次操作
ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit)
<V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit)
//创建并执行在给定的初始延迟之后,以给定的时间间隔执行周期性动作。
//即在 initialDelay 初始延迟后,initialDelay + period 执行第一次,initialDelay + 2 * period 执行第二次,依次类推。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit)

// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit)

ThreadPoolExecutor

函数列表

已略过父类或接口已定义的函数。

// 在执行给定线程中的给定 Runnable 之前调用的方法。
protected void beforeExecute(Thread t, Runnable r)
// 基于完成执行给定 Runnable 所调用的方法。
protected void afterExecute(Runnable r, Throwable t)
// 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。
void allowCoreThreadTimeOut(boolean value)
// 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。
boolean allowsCoreThreadTimeOut()
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
boolean awaitTermination(long timeout, TimeUnit unit)
// 当不再引用此执行程序时,调用 shutdown。
protected void finalize()
// 返回曾计划执行的近似任务总数。
long getTaskCount()
// 如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,则返回 true。
boolean isTerminating()
// 启动所有核心线程,使其处于等待工作的空闲状态。
int prestartAllCoreThreads()
// 启动一个核心线程,使其处于等待工作的空闲状态。
boolean prestartCoreThread()
// 尝试从工作队列移除所有已取消的 Future 任务。
void purge()
// 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
boolean remove(Runnable task)

// GETTER / SETTER
// 主动执行任务的近似线程数 : ActiveCount
// 已完成执行的近似任务总数 : CompletedTaskCount
// 核心线程数 : CorePoolSize
// 线程保持活动的时间 : KeepAliveTime
// 曾经同时位于池中的最大线程数 : LargestPoolSize
// 允许的最大线程数 : MaximumPoolSize
// 池中的当前线程数 : PoolSize
// 执行程序使用的任务队列 : Queue
// 对未执行任务的处理程序 : RejectedExecutionHandler
// 用于创建新线程的线程工厂 : ThreadFactory

关键参数

线程池线程数量

// 核心池大小
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
// 阻塞队列。
private final BlockingQueue<Runnable> workQueue;
// 拒绝策略的处理句柄。
private volatile RejectedExecutionHandler handler;

在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用setCorePoolSize(int)setMaximumPoolSize(int)进行动态更改。需要明晰的是,核心池的线程是不会被销毁的,而多余的线程会在 keeptime 结束后销毁。

ThreadFactory

在不同的业务最好不要混用线程池,并且在使用时通过传递定制的 ThreadFactory 来为线程赋予实际的业务命名,方便排查问题等。

ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("test-pool-%d")
        .setDaemon(false) 
        .setPriority(Thread.NORM_PRIORITY)
        .setUncaughtExceptionHandler((t, e) -> {
            logger.error("Thread {} is failed to handle task.", t.getName(), e);
        })
        .build();

生命周期

ThreadPoolExecutor 使用 ctl 来存储线程池状态和 WorkerCount。WorkerCount 指示的是有效线程数,表示已经被允许启动但不允许停止的工作线程数量。WorkerCount 的值与实际活动线程的数量不同。

runState 具有如下 5 种状态

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

下图是它们之间的状态转换

![[/images/JDK/ThreadPoolExecutor状态转换.png]]

  • RUNNING: 如果线程池处于 RUNNING 状态下的话,能够接收新任务,也能处理正在运行的任务。可以从 ctl 的初始化得知,线程池一旦创建出来就会处于 RUNNING 状态,并且线程池中的有效线程数为 0。

  • SHUTDOWN: 在调用 shutdown() 方法后,线程池的状态会由 RUNNING -> SHUTDOWN 状态,位于 SHUTDOWN 状态的线程池能够处理正在运行的任务,但是不能接受新的任务。

  • STOP: 和 shutdown() 方法类似,在调用 shutdownNow() 方法时,程序会从 RUNNING/SHUTDOWN -> STOP 状态,处于 STOP 状态的线程池,不接收新任务,不处理已添加的任务,并且会尝试中断正在处理的任务。
  • TIDYING:TIDYING 状态有个前置条件,分为两种:一种是是当线程池位于 SHUTDOWN 状态下,阻塞队列和线程池中的线程数量为空时,会由 SHUTDOWN -> TIDYING;另一种是当线程池位于 STOP 状态下时,线程池中的数量为空时,会由 STOP -> TIDYING 状态。转换为 TIDYING 的线程池会调用 terminated()这个钩子方法,terminated() 在 ThreadPoolExecutor 类中是空实现,若用户想在线程池变为 TIDYING 时,进行相应的处理,可以通过重载 terminated() 函数来实现。
  • TERMINATED:TERMINATED 状态是线程池的最后一个状态,线程池处在 TIDYING 状态时,执行完 terminated() 方法之后,就会由 TIDYING -> TERMINATED 状态。此时表示线程池的彻底终止。

运行过程

任务添加过程

当新任务提交给线程池执行时:

  1. 如果当前运行的工作线程少于 corePoolSize 的话,那么会创建新的 Worker 线程来执行任务 ,这一步需要获取 mainLock 全局锁。
  2. 如果运行线程不小于 corePoolSize,则将任务加入 BlockingQueue 阻塞队列。
  3. 如果无法将任务加入 BlockingQueue 中,此时队列已满,需要创建新的 Worker 线程来处理任务,这一步同样需要获取 mainLock 全局锁。
  4. 如果创建新线程会使当前运行的线程超过 maximumPoolSize 的话,任务将被拒绝,并且使用 RejectedExecutionHandler.rejectEExecution() 方法拒绝新的任务。

ThreadPoolExecutor 采取上面的整体设计思路,是为了在执行 execute 方法时,避免获取全局锁,因为频繁获取全局锁会是一个严重的可伸缩瓶颈。

/**
 * 在将来的某个时候执行给定的任务。任务可以在新线程中执行,也可以在现有的池线程中执行。
 * 如果由于此执行器已关闭或已达到其容量而无法提交任务以供执行,则由当前的{@code RejectedExecutionHandler}处理该任务。
 * 
 * @param command the task to execute  待执行的任务命令
 */
public void execute(Runnable command) {
    // NPE检查,线程池不允许提交NULL任务
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 1. 如果运行的线程少于corePoolSize,将尝试以给定的命令作为第一个任务启动新线程。
     * 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查两点,其一,我们是否应该添加一个线程
     * (因为自从上次检查至今,一些存在的线程已经死亡),其二,线程池状态此时已改变成非运行态。因此,我们重新检查状态,如果检查不通过,则移除已经入列的任务,如果检查通过且线程池线程数为0,则启动新线程。
     * 3. 如果无法将任务加入任务队列,则将线程池扩容到极限容量并尝试创建一个新线程,如果失败则拒绝任务。
     */
    int c = ctl.get(); // 获取当前的clt,AtomicInteger类型保证线程安全
    
    // 步骤1:判断线程池当前线程数是否小于线程池大小
    if (workerCountOf(c) < corePoolSize) {
        // 增加一个工作线程并添加任务,成功则返回,否则进行步骤2
        // true代表使用coreSize作为边界约束,否则使用maximumPoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 步骤2:不满足workerCountOf(c) < corePoolSize或addWorker失败,进入步骤2
    // 如果线程池处于Running状态,则将当前提交的任务提交到内部的阻塞队列进行排队等待worker处理
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        /**
          * 二次检查线程池是否仍在运行中
          * 如果线程池不在 running 状态则将刚才进行排队的任务移除,并拒绝此次提交的任务
          * 如果此时在线程池中运行的 worker 数量减少到 0(corePoolSize 为 0 的线程池在
          * 并发的情况下会出现此场景)则添加一个不携带任何任务的非核心态的worker去处理
          * 刚才排队成功的任务。
          */
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 步骤3:如果线程池不是Running状态或任务入列失败,尝试添加一个非核心态的worker,失败则拒绝任务
    else if (!addWorker(command, false))
        //添加失败:当前运行的worker数量超过maximumPoolSize或者本身最大的限制;线程池状态在shutdown以上
        reject(command);
}

添加Worker过程

添加 Worker 的过程可以概括为三步:创建 Worker、将 Worker 加入到 workers、运行 Worker。

private boolean addWorker(Runnable firstTask, boolean core) {
    //自旋进行线程状态check
    retry:
    for (;;) {
        int c = ctl.get(); //读取最新的clt,其本身具有可见性
        int rs = runStateOf(c);
        /** 
         * 1.线程池为非Running状态(Running状态则既可以新增核心线程也可以接受任务)
         * 2.线程池为shutdown状态且firstTask为空且队列不为空
         * 3.满足条件1且条件2不满足,则返回false
         * 4.条件2解读:线程池为shutdown状态时且任务队列不为空时,可以新增空任务的线程来处理队列中的任务
         */
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;
        /**
          * 自旋进行worker数量自增
          * 如果当前新增的是核心态的worker则与corePoolSize进行比较
          * 如果当期新增的是非核心态的worker则与maximumPoolSize进行比较
          * 不满足数量限制则直接添加失败,进入后续的排队 or 拒绝流程
          */
        for (;;) {
            int wc = workerCountOf(c);
            // 如果 worker 数量 > 线程池最大上限 CAPACITY(即使用int低29位可以容纳的最大值)
            // 或 worker数量 > corePoolSize 或 worker数量 > maximumPoolSize,即已经超过了给定的边界
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //通过CAS进行worker数量+1
            if (compareAndIncrementWorkerCount(c))
                break retry; //如果CAS成功则跳出自旋
            c = ctl.get();  // 线程+1失败,重新读clt
            if (runStateOf(c) != rs)// 如果线程池状态发生变化(只有running状态才接受新任务),则跳到外层循环执行拒绝
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    /**
     * 核心线程数量+1成功的后续操作:添加到工作线程集合,并启动工作线程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建Worker,将本次提交的任务封装到其内部
        w = new Worker(firstTask);
        final Thread t = w.thread; // worker内部真正用来执行任务的线程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 下面代码需要加锁:线程池主锁
            mainLock.lock();
            try {
                // 持锁期间重新检查,线程工厂创建线程失败或获取锁之前关闭的情况发生时,退出
                int c = ctl.get();
                int rs = runStateOf(c);

                // 再次检验线程池是否是running状态或线程池shutdown但线程任务为空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 线程已经启动,则抛出非法线程状态异常
                    // 为什么会存在这种状态呢?未解决
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //加入线程池
                    int s = workers.size();
                    // 如果当前工作线程数超过线程池曾经出现过的最大线程数,刷新后者值
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock(); //释放锁
            }
            
            if (workerAdded) {
                // 启动worker内部的线程,其会调用worker内部的run方法
                t.start();
                // 添加成功
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker的工作流程

Worker 继承了AQS 框架来保证一个 Worker 同一时间只执行一个任务,实现 Runnable 接口包装传递进来的 Task。也可以理解为,参数 Task 的 Runnable 只是 Worker 众多执行任务中的其中一个,而 Worker 实现的 Runnable 以轮询的方式不断从 WorkQueue 中获取可执行任务。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable

功能结构

Runnable结构

/**
  * Class Worker mainly maintains interrupt control state for
  * threads running tasks, along with other minor bookkeeping.
  * This class opportunistically extends AbstractQueuedSynchronizer
  * to simplify acquiring and releasing a lock surrounding each
  * task execution.  This protects against interrupts that are
  * intended to wake up a worker thread waiting for a task from
  * instead interrupting a task being run.  We implement a simple
  * non-reentrant mutual exclusion lock rather than use
  * ReentrantLock because we do not want worker tasks to be able to
  * reacquire the lock when they invoke pool control methods like
  * setCorePoolSize.  Additionally, to suppress interrupts until
  * the thread actually starts running tasks, we initialize lock
  * state to a negative value, and clear it upon start (in
  * runWorker).
  */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        //在addWorker方法中执行的是这里创建的线程,执行的是Worker这个Runnable任务
        //而firstTask只是Worker这个Runnable任务执行的第一个任务。
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
}

AQS结构

Worker 继承了 AQS 类,主要维护了线程运行过程中的中断控制状态。它提供了锁的获取和释放操作。在 Worker 的实现中,我们使用了非重入的互斥锁而不是重入锁,因为 Lea 觉得我们不应该在调用诸如 setCorePoolSize 之类的控制方法时能够重新获取锁。

/**
  * Class Worker mainly maintains interrupt control state for
  * threads running tasks, along with other minor bookkeeping.
  * This class opportunistically extends AbstractQueuedSynchronizer
  * to simplify acquiring and releasing a lock surrounding each
  * task execution.  This protects against interrupts that are
  * intended to wake up a worker thread waiting for a task from
  * instead interrupting a task being run.  We implement a simple
  * non-reentrant mutual exclusion lock rather than use
  * ReentrantLock because we do not want worker tasks to be able to
  * reacquire the lock when they invoke pool control methods like
  * setCorePoolSize.  Additionally, to suppress interrupts until
  * the thread actually starts running tasks, we initialize lock
  * state to a negative value, and clear it upon start (in
  * runWorker).
  */
private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

运行过程

//Delegate by Worker.run
final void runWorker(Worker w) {
    //在添加worker的流程中执行thread.start()之后真实执行的方法
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 获取当前worker携带的任务
    w.firstTask = null;
    // allow interrupts
    // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
    w.unlock(); // 修改state为0,将占用锁的线程设为null(第一次执行之前没有线程占用)
    boolean completedAbruptly = true;
    try {
        // 自旋。先执行自己携带的任务,然后从阻塞队列中获取一个任务直到无法获取任务
        while (task != null || (task = getTask()) != null) {
            //上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
            w.lock();
            /**
             * 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
             * 条件1:线程池状态>=STOP,即STOP或TERMINATED
             * 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED),
             * 条件1与条件2任意满意一个,且wt不是中断状态,则中断wt,否则进入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);// 空方法,留给子类实现
                Throwable thrown = null;
                try {
                    task.run(); //执行外部提交的任务,通过try-catch来保证异常不会影响线程池本身的功能
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);// 空方法,留给子类实现
                }
            } finally {
                task = null;
                w.completedTasks++; //已完成任务数量统计
                w.unlock();
            }
        }
        // 如果执行到这里代表非核心线程在keepAliveTime内无法获取任务而退出
        completedAbruptly = false;
    } finally {
       /**
        * 从上面可以看出如果实际业务(外部提交的Runnable)出现异常会导致当前worker终止
        * completedAbruptly 此时为true意味着worker是突然完成,不是正常退出
        */
        processWorkerExit(w, completedAbruptly);// 执行worker退出收尾工作
    }
}

获取任务

private Runnable getTask() {
    // 最新一次poll是否超时
    boolean timedOut = false; 
    // 自旋获取任务(因为是多线程环境)
    for (;;) {
        int c = ctl.get();// 读取最新的clt
        int rs = runStateOf(c);
        //见状态转换图中Shutdown和Stop向Terminal转换的过程
        //在Shutdown状态下必须处理完queue中的任务才能减少Worker
        //在Stop状态下可直接减少Worker数量
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //CAS减少WorkerCount
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //当处于以下两种情况时,获取任务的机制为poll(keepAliveTime)
        //1. 允许核心线程退出
        //2. 当前的线程数量超过核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //线程超过了允许的数量则进行Worker的删除操作
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // CAS进行worker数量-1,成功则返回null进行worker退出流程,失败则继续自旋
            if (compareAndDecrementWorkerCount(c)) 
                return null;
            continue;
        }
        try {
            // 如果允许超时退出,则调用poll(keepAliveTime)获取任务,否则则通过tack()一直阻塞等待直到有任务提交到队列
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;// 当等待超过keepAliveTime时间未获取到任务时,标记为true。在下次自旋时会进入销毁流程
        } catch (InterruptedException retry) {
            // 什么时候会抛出异常?当调用shutdown或者shutdownNow方法触发worker内的Thread调用interrupt方法时会执行到此处
            timedOut = false;
        }
    }
}

工作线程退出

工作线程退出是 runWorker 的最后一步,这一步会判断工作线程是否突然终止,并且会尝试终止线程,以及是否需要增加线程来替换原工作线程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // worker数量 -1
  // 突然终止(completedAbruptly=true), 说明是 task 执行时异常情况导致,那么正在工作的 worker 线程数量需要 -1.
  // 正常终止(completedAbruptly=false), 说明是 worker 线程没有 task 可执行了,不用-1,因为已经在 getTask() 方法中 -1 了
  if (completedAbruptly)
    decrementWorkerCount();

  // 从 Workers Set 中移除 worker
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  // 尝试终止线程,
  tryTerminate();

  // 是否需要增加 worker 线程
  // 线程池状态是 running 或 shutdown
  // 如果当前线程是突然终止的,addWorker()
  // 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
  // 故如果调用线程池 shutdown(),直到workQueue为空前,线程池都会维持 corePoolSize 个线程,
  // 然后再逐渐销毁这 corePoolSize 个线程
  int c = ctl.get();
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}

释放资源

优雅下线

线程池拒接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 利用排它锁进行上锁,保证只有一个线程执行关闭流程
    mainLock.lock();
    try {
        // 安全检查
        checkShutdownAccess();
        // 内部通过自旋+CAS修改线程池状态为shutdown
        advanceRunState(SHUTDOWN);
        // 遍历空闲的worker(能拿到Worker锁就算空闲),进行线程中断通知
        interruptIdleWorkers();
        // 钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 进行最后的整理工作
    tryTerminate();
}

优雅下线示例

public class Main {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executor.execute(setupTask(i));
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("优雅下线处理剩余任务开始");
            // 优雅下线
            executor.shutdown();
            try {
                // 阻塞等待线程池执行完成
                executor.awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("优雅下线处理剩余任务结束");
        }));

    }

    private static Runnable setupTask(Integer integer) {
        return () -> {
            try {
                Thread.sleep(1000L);
                System.out.println(integer);
            } catch (InterruptedException e) {
                System.out.println("不处理");
            }

        };
    }
}

强制下线

线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    // 利用排它锁进行上锁,保证只有一个线程执行关闭流程
    mainLock.lock();
    try {
      // 安全检查
      checkShutdownAccess();
      // 内部通过自旋+CAS修改线程池状态为shutdown
      advanceRunState(STOP);
      // 遍历所有的worker,进行线程中断通知
      interruptWorkers();
      // 将未执行的任务返回给调用方
      tasks = drainQueue();
    } finally {
      mainLock.unlock();
    }
    // 进行最后的整理工作
    tryTerminate();
    return tasks;
}

实践考量因素

线程池大小设置

一般需要根据任务类型来配置线程池大小

  • 如果是 CPU 密集型任务,那么就意味着 CPU 是稀缺资源,这个时候我们通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值是 N(CPU)数 + 1。比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间。
  • 如果是 I/O 密集型任务,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。这个时候可以参考 Brain Goetz 的推荐方法:线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)。参考值可以是 N(CPU) 核数 * 2。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整(也可以借助压测进行调试)。

任务队列的设置

使用 ThreadPoolExecutor 需要指定一个实现了 BlockingQueue 接口的任务等待队列。

在 ThreadPoolExecutor 线程池的 API 文档中,一共推荐了 3 种等待队列:

  1. SynchronousQueue:同步队列。这是一个内部没有任何容量的阻塞队列,任何一次插入操作的元素都要等待相对的删除/读取操作,否则进行插入操作的线程就要一直等待,反之亦然。
  2. LinkedBlockingQueue:无界队列(严格来说并非无界,上限是Integer.MAX_VALUE),基于链表结构。使用无界队列后,当核心线程都繁忙时,后续任务可以无限加入队列,因此线程池中线程数不会超过核心线程数。这种队列可以提高线程池吞吐量,但代价是牺牲内存空间,甚至会导致内存溢出。另外,使用它时可以指定容量,这样它也就是一种有界队列了。
  3. ArrayBlockingQueue:有界队列,基于数组实现。在线程池初始化时,指定队列的容量,后续无法再调整。这种有界队列有利于防止资源耗尽,但可能更难调整和控制。

另外,Java 还提供了另外 4 种队列:

  1. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。存放在 PriorityBlockingQueue 中的元素必须实现 Comparable 接口,这样才能通过实现 compareTo() 方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序,也不保证当前队列中除了优先级最高的元素以外的元素,随时处于正确排序的位置。
  2. DelayQueue:延迟队列。基于二叉堆实现,同时具备:无界队列、阻塞队列、优先队列的特征。DelayQueue延迟队列中存放的对象,必须是实现Delayed接口的类对象。通过执行时延从队列中提取任务,时间没到任务取不出来。
  3. LinkedBlockingDeque:双端队列。基于链表实现,既可以从尾部插入/取出元素,还可以从头部插入元素/取出元素。
  4. LinkedTransferQueue:由链表结构组成的无界阻塞队列。这个队列比较特别的时,采用一种预占模式,意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素。

拒绝策略的设置

拒绝策略

线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。拒绝策略需要实现 RejectedExecutionHandler 接口,不过 Executors 框架已经为我们实现了 4 种拒绝策略:

  1. AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy:直接运行这个任务的 run 方法,但并非是由线程池的线程处理,而是交由任务的调用线程处理。
  3. DiscardPolicy:直接丢弃任务,不抛出任何异常。
  4. DiscardOldestPolicy:将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行。

启动前的预热

由于一些服务需要预热(预热很重要,不然刚重启的服务有时顶不住瞬时请求)。可以使用以下方法来实现

  • prestartCoreThread:启动一个核心线程
  • prestartAllCoreThreads :启动所有核心线程

需要注意的几个问题

  • 避免任务堆积:workQueue不要使用无界队列,尽量使用有界队列。工作线程太少,导致处理速度跟不上入队速度,这种情况下很可能会导致 OOM(有可能由于允许囤积的任务过多,导致资源耗尽而系统崩溃),诊断时可以使用 jmap 检查是否有大量任务入队。

  • 尽量避免线程泄漏和内存占用:生产实践中很可能由于逻辑不严谨或者工作线程不能及时释放导致线程泄漏,这个时候最好检查一下线程栈。

    需要注意的是以下几个内存占用问题:

    1. ThreadLocal:尽量避免在使用线程池时操作 ThreadLocal,因为工作线程的生命周期可能会超过任务的生命周期,被 ThreadLocal 引用会导致线程泄露。线程池保持空闲的核心线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存一般不大。怕的就是业务代码中使用 ThreadLocal 缓存的数据过大又不清理。
    2. 局部变量:线程处于阻塞状态,肯定还有栈帧没有出栈,栈帧中有局部变量表,凡是被局部变量表引用的内存都不能回收。所以如果这个线程创建了比较大的局部变量,那么这一部分内存无法GC。
    3. TLAB 机制:如果你的应用线程数处于高位,那么新的线程初始化可能因为 Eden 没有足够的空间分配 TLAB 而触发 YoungGC。因此,如果你的应用线程数处于高位,那么需要观察一下 YoungGC 的情况,估算一下 Eden 大小是否足够。如果不够的话,可能要谨慎地创建新线程,并且让空闲的线程终止;必要的时候,可能需要对 JVM 进行调参。
  • 避免死锁等同步问题

  • 适当监控:对于资源紧张的应用,如果担心线程池资源使用不当,可以利用 ThreadPoolExecutor 的API实现简单的监控,然后进行分析和优化。

  • 优化资源利用率:如果是资源紧张的应用,使用 allowsCoreThreadTimeOut 可以提高资源利用率。

  • 简单场景下,Semaphore替换的方案

    ThreadPool tp = ThreadPool.of(core:1, max:1, bq: ArrayBlockingQueue);
    tp.getQueue().put();

    将 core + max 线程数与 BlockingQueue 长度之和视作信号量,实现 Semaphore 的替换方案。在该场景下,超出信号量的任务会被阻塞。

ScheduledThreadPool

函数列表

已略过父类或接口已定义的函数。

// 修改或替换用于执行 callable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)
// 修改或替换用于执行 runnable 的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)
// 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
// 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
// 返回此执行程序使用的任务队列。
BlockingQueue<Runnable> getQueue()
// 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
boolean remove(Runnable task)
// 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
// 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)

执行任务模式

单次执行任务

简单的延时单次执行任务。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

周期执行任务

通过传入 period 参数构建 ScheduledFutureTask 使得任务得以周期执行。更详细的解释是,如果 period 为 0 则只执行一次,大于或小于 0 则周期执行。

这里需要关注的点是,当 period 大于 0 是以固定周期执行任务,小于 0 是以固定延时执行任务。原理下面详细介绍。

以固定周期执行任务

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

以固定延时执行任务

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

执行逻辑

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    //判断任务是否为周期性
    public boolean isPeriodic() {
        return period != 0;
    }

    //根据period的正负性,设置对应模式的周期延时
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }

    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        //单任务执行模式
        else if (!periodic)
            ScheduledFutureTask.super.run();
        //周期任务执行模式
        else if (ScheduledFutureTask.super.runAndReset()) {
            //重新设置运行时间
            setNextRunTime();
            //重新添加进任务队列等待被执行
            reExecutePeriodic(outerTask);
        }
    }
}

任务触发机制

ScheduledThreadPoolExecutor 实现了自己的阻塞队列 DelayedWorkQueue,它是一个基于最小堆的数组结构,类似于 DelayQueue 和 PriorityQueue。DelayedWorkQueue 的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的。

ScheduledThreadPoolExecutor 的任务触发机制采用了 Leader-Follower 模式的变体,用于减少不必要的定时等待。其状态流转如下图:

所有线程会有三种身份中的一种:Leader 和 Follower,以及一个处理中的状态:Processor。它的基本原则就是,永远最多只有一个 Leader。而所有 Follower 都在等待成为 Leader。线程池启动时会自动产生一个 Leader 负责等待事件发生。当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将其提拔为新的 Leader,然后自己转为 Processor 状态去处理这个事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。这种方法可以增强 CPU 高速缓存相似性,及消除动态内存分配和线程间的数据交换。

具体的代码在 ScheduledThreadPoolExecutorDelayedWorkQueue.take() 中实现。

ForkJoinPool

work-stealing机制

Work stealing 翻译为工作窃取,是指工作线程本身的任务队列为空时,从其他工作线程的任务队列从窃取任务来执行。

在 fork/join 中,假如我们要线程池做一些比较大的任务,做的过程中会把这个人物分割为多个较小的任务(较小的任务也可能分割成更小的任务,为了减少工作线程对公共任务队列的竞争,我们让每个工作线程持有一个任务队列,自己做任务时分割出来的小任务就放到自己的工作队列中。

但是这样会存在一个问题,初始的任务有大有小,有的工作线程自己的任务做完了,其他线程还在忙碌,从而产生负载不均衡的问题。为了解决这个问题,人们发明了工作窃取算法,这个算法的核心很简单,就是当前工作线程的任务队列为空时,去其他还有任务的工作线程的任务队列取一个(或多个)任务回来。

外部任务提交进公共队列还是直接散列到工作线程的任务队列?

外部任务提交进公共队列还是直接散列到工作线程的任务队列主要看需求,从竞争激烈程度来看,散列的竞争应该比公共队列少。但是如果散列的话,窃取从队尾取任务,可能导致后进的任务反而先完成,不符合整个线程池先进先出的预期。ForkJoinPool 是有公共队列的,所以这里我们也使用公共队列缓存外部提交的任务。

为什么使用双端队列?

每个工作线程有独立的任务队列,为什么使用双端队列,我们需要从两个方面来分析。

一方面,我们两端都需要提交任务。如果用散列的话,我们就需要从外部提交到任务队列队尾(先进先出)。而 fork/join 提交子任务是提交到队首的(后进先出)。

另一方面,我们两端都需要取任务。队首不用说,工作线程是从队首取任务的,工作窃取一般是从队尾窃取任务的,因为双端队列两端可以分别被两个锁保护,减少竞争。而且 fork/join 情况下,队尾的任务更大,我们倾向于窃取大的任务。

从哪个任务队列窃取?

提交时就散列到各任务队列的话这个问题很好回答,那就是随机选一个,然后从这个开始遍历其他。

有公共队列的情况需要特别考虑,就是,我们先窃取其他队列的,还是先从公共队列取?

先从公共队列取很符合自觉,但实际上不符合整个线程池先进先出的预期,因为其他任务队列的任务必定的先进任务分割出来的。但是如果先窃取,那窃取的频率又会大幅上升,可能每次都需要遍历一遍其他工作队列以搜索可窃取的任务,这可能要加锁解锁很多次。Java 的 ForkJoinPool 是先窃取的,所以这里我们也采用先窃取的方案。

一次窃取多少个任务?

一次窃取多少个任务主要是考虑锁的竞争,每次窃取一个,窃取很多次就可能有很多次锁竞争,一次窃取多个又可能窃取者自己又做不完了要等别人窃取了,毕竟队尾的任务比较大。Java 的 ForkJoinPool 是一次窃取一个的,但笔者也用过一次窃取多个的实现,不过这个实现并不是用于 fork/join 的,而是大量提交任务,提交时散列到各个队列的,这时候我们可以假设每个任务差不多大,所以可以按一定比例窃取。我们这里是 fork/join 篇的续篇, 所以还是考虑 fork/join 的场景下任务大小比较不一的情况,每次窃取一个。

什么时候唤醒?

没有任务的时候工作线程需要进入阻塞等待,问题是什么时候唤醒呢? 主要考虑两点,窃取的时候和 fork 的时候。

很自然我们说唤醒是唤醒一个而不是多个。窃取的时候发现队列里面有好多任务,那肯定是要唤醒的,但如果任务队列就剩一个任务了,那还要唤醒吗? 从 java 的 ForkJoinPool 的实现看确实是要唤醒的,毕竟不能眼见着有任务却不去执行。

工作线程 fork 了子任务,考虑到 fork 之后通常是要 join 的,我们得留一个任务给 join 的时候 try_execute_one, 所以 fork 的时候应该是任务队列有多于 1 个任务的时候唤醒。