并发编程-线程池源码详解

阿里巴巴Java手册有一条:

【强制】线程资源必须通过线程池提供,禁止在应用程序中显示创建线程。

说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程导致消耗完内存或者过度切换的问题。

简单来说使用线程池有以下几个目的:

  • 避免频繁的创建。线程是稀缺资源。
  • 解耦。线程的创建与执行分开,方便维护。
  • 线程资源复用。

1. 线程池原理

本文从线程池的创建开始说起,跟着源码分析一下线程池的工作原理,本文源码基于JDK1.8

1.1 Executors

Executors有一个私有的默认构造函数,不能实例化,是一个工具类,主要用于提供各种类型线程池创建的静态方法。 提供的静态创建方法有:

  • newSingleThreadExecutor 创建一个执行器,该执行器使用单个工作线程操作一个无界队列。(但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,一个新的线程将取代它。)保证任务按顺序执行,并且在任何给定时间不会有超过一个任务处于活动状态。与等效的{@code newFixedThreadPool(1)}不同,返回的执行器保证不会被重新配置以使用其他线程。
  • newFixedThreadPool 创建一个线程池,该线程池重用固定数量的线程,在需要时使用提供的ThreadFactory创建新线程。在任何时候,最多有 nThreads个活动线程执行任务,如果在所有线程都处于活动状态时提交了额外的任务,那么它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,一个新的线程将取代它。线程池中的线程将一直存在,直到显式地shutdown。
  • newWorkStealingPool 创建一个线程池,该线程池维护足够的线程来支持给定的并行级别,并可以使用多个队列来减少争用。并行级别对应于积极参与或可参与任务处理的最大线程数。线程的实际数量可以动态地增长和收缩。工作窃取池不保证提交任务的执行顺序。
  • newCachedThreadPool 创建一个线程池,该线程池根据需要创建新线程,但在可用时将重用以前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,对execute的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。未使用60秒的线程将被终止并从缓存中删除。因此,长时间空闲的池不会消耗任何资源。注意,可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如超时参数)的池。
  • newSingleThreadScheduledExecutor 创建一个单线程执行器,该执行器可以安排命令在给定的延迟之后运行,或者定期执行。(但是请注意,如果这个线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,将会有一个新的线程代替它。),与 newFixedThreadPool(1)不同,返回的executor不能被其他线程重新配置。
  • newScheduledThreadPool 创建一个线程池,该线程池可以在给定延迟之后调度命令运行,或者定期执行命令。

Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以线程池的创建,一般会根据需求场景,使用自选参数,由用户主动构造,而不是使用静态方法构造。

1.2 ThreadPoolExecutor

首先看一下newFixedThreadPool创建方法的源码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

事实上,大多数类型的线程池创建都是调用new ThreadPoolExecutor(…)创建一个ThreadPoolExecutor对象,只不过初始化参数不同而已。newWorkStealingPool创建时构造的是ForkJoinPool对象,本文不述。

下面是ThreadPoolExecutor的其中一个构造方法:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}

初始化参数的如下:

  • corePoolSize :表示线程池的核心数,线程池保持alive状态的线程数,即使线程是空闲的。
  • maximumPoolSize: 表示线程池支持的最大的线程个数。
  • keepAliveTime :表示池中线程空闲后的生存时间
  • unit: 表示上一个时间参数的单位
  • workQueue: 用于存放任务的阻塞队列
  • threadFactory: 表示创建线程的工厂,一般使用默认的线程创建工厂Excutors.DefaultThreadFactory()
  • handler: 当队列和最大线程池都满了之后的饱和策略,一般使用默认的handler—AbortPolicy(内部类)
1
2
3
4
5
6
7
8
9
10
11
//代码摘自:java.util.concurrent.ThreadPoolExecutor
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
}
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

用户也可以自己实现RejectedExecutionHandler接口定义一个handler,当提交的任务因为各种原因被线程池拒绝,就会调用rejectedExecution方法。

1.2.1 提交任务excute()

使用线程池时,通常我们用

1
threadPool.execute(new Job());

这样的方式提交一个任务到线程池中,所以线程池ThreadPoolExecutor的核心逻辑就是execute()函数了,这个方法是在Excutor接口中声明。

在分析核心逻辑之前,先了解一下线程池中定义的状态,这些状态都和线程的执行密切相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//代码摘自:java.util.concurrent.ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPCITY = (1 << COUNT_BITS) - 1;

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

private static int runStateOf(int c){return c & ~CAPCITY;}
private static int workerCountOf(int c){return c & CAPCITY;}
private static int ctlOf(int rs, int wc){return rs | wc;}

分析上面的代码得到下表:

常量名 二进制
CAPCITY 0001 1111 1111 1111 1111 1111 1111 1111
RUNNING 1110 0000 0000 0000 0000 0000 0000 0000
SHUTDOWN 0000 0000 0000 0000 0000 0000 0000 0000
STOP 0010 0000 0000 0000 0000 0000 0000 0000
TIDYING 0100 0000 0000 0000 0000 0000 0000 0000
TERMINATED 0110 0000 0000 0000 0000 0000 0000 0000

由上表可以看出,原子对象ctl的前三位表示状态,后29位记录池中worker的个数,CAPCITY就像是一个掩码,通过掩码可以快速的从ctl中获得当前线程池的运行状态和池中的worker个数。

JDK1.8的并发包中不再通过设置阻塞队列的长度来限制任务的提交。阻塞队列的长度初始化之后就不能改变,因此如果担心阻塞队列太大导致内存占用太多,可以从两方面入手:1、初始化的时候选择合适的阻塞队列大小;2、调高corePoolSize或maxmumPoolSize加快任务的处理速度。参数的动态调整见下文。

线程池状态简述:

  • RUNNING 是运行状态,指可以接受任务,执行队列里的任务。
  • SHUTDOWN 是指调用了shutdown()函数,不再接受新任务,但是会把队列里的任务执行完毕。
  • STOP 是指调用了shutdownNow()函数,不再接受新任务,同时终端正在执行的任务并丢弃队列中的待执行任务。
  • TIDYING 指所用任务都执行完毕。
  • TERMINATED 终止状态,在调用shutdown()或shutdownNow()时都会尝试更新这个状态。

下面分析核心代码excute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//代码摘自:java.util.concurrent.ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//1、获取当前线程池的状态
int c = ctl.get();
//2、当线程数量小于corePoolSize,创建新线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池线程数到达核心线程数 或者 新增worker失败失败
//3、如果线程池处于运行状态,将任务写入阻塞队列,如果入队也顺利继续往下执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//3-1.再次检查线程状态,若线程池状态发生改变(变为非运行状态),则从阻塞队列移除该任务,如果出队顺利就执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//3-2.如果线程池仍然为运行态,检查当前池是否为空,为空就创建一个线程,但不指定任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//4、如果第一次检查就不通过(线程池非运行态或者任务入队失败),尝试新建线程,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

疑问:addWorker(null, false) 添加了一个没有具体任务的worker,作用是什么?

如果线程池中的线程数为0,但任务队列中有需要执行的任务,这时候新建一个没有任务的线程是为了去执行任务队列中的任务。

下图表示了当有任务提交到线程池后线程池的处理流程:
execute执行流程图

1.2.2 创建工人(线程)

addWorker(Runnable firstTask, boolean core)

参数:

firstTask: worker线程的初始任务,可以为空
core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限

addWorker函数是execute函数的核心逻辑,线程池持有一个HashSet对象存放池中的workers,每个worker对应一个线程,addWorker的作用就是创建worker执行任务。

addWorker方法有4种调用方式:

  • addWorker(command, true)

  • addWorker(command, false)

  • addWorker(null, false)

  • addWorker(null, true)

在execute方法中就使用了前3种,结合这个方法进行以下分析

  1. 线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
  2. 当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
  3. 放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
  4. 这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

下面将源代码分成两部分进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//代码摘自:java.util.concurrent.ThreadPoolExecutor
private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//状态为 RUNNING 继续往下执行
//状态为不为RUNNING时,如果状态为SHUTDOWN并且firstTask为null并且阻塞队列空时,可继续向下运行
//否则返回false,添加worker失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程数大于CAPACITY
//线程数大于corePoolSize或maximumPoolSize(取决于core)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//线程数验证通过,使用CAS对c加1,执行成功则终止大循环继续向下运行
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS设置失败则重新获取运行状态,若线程池状态发生改变,从头开始大循环,否则继续小循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
...
}

这段代码为主要负责检查,主要判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以则返回 false,具体分为三种情况:

  • 线程池状态>shutdown,可能为stop、tidying、terminated,不能添加worker线程
  • 线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务
  • 线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义

当以上的情况都不符合,继续向下执行。在创建worker之前还需要验证一下线程池中的线程数量有没有达到极限,达到极限直接返回false;没达到极限,先CAS修改线程池状态(+1操作),若修改成功,直接退出检验模块循环,执行下面的运行模块。CAS设置状态失败则重新获取运行状态进行二重检验,若线程池状态发生改变,从头开始大循环检验,否则继续小循环执行cas。

第二部分为运行模块,直接进入主题,将提交的任务包装成worker对象,加入worker set 并启动该worker的线程,worker插入set需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private boolean addWorker(Runnable firstTask, boolean core) {
...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 二重验证,获取池状态
int rs = runStateOf(ctl.get());
//状态为RUNNING 则通过继续执行
//状态为SHUTDOWN并且提交的任务为null 则通过继续执行
//否则直接执行finally解锁
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 如果worker中的线程t已经处于运行状态
throw new IllegalThreadStateException();//抛异常
workers.add(w);//将w加入HashSet
int s = workers.size();
//更新largestPoolSize,largestPoolSize只能在lock下修改
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker执行流程总结:

  1. 判断是否可以addworker
  2. 线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步
  3. 在线程池的ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁,并启动worker线程,只有在新建的线程成功启动的情况下才能返回 true。如果添加worker入Set失败或启动失败,调用addWorkerFailed()逻辑
1.2.3 worker创建失败的善后处理

addWorkerFailed()

当任务执行失败,程序需要进行善后处理,即恢复任务执行过程中对内存的改动,移除Worker set中的worker对象,修改池状态,最后尝试终止线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
代码摘自:java.util.concurrent.ThreadPoolExecutor
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
//CAS对ctl减1
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

1.2.4 Worker类

我们发现 addWorker 方法只是构造了一个 Worker,并且把 firstTask 封装到 worker 中,它是做什么的呢?我们来看看

  1. 每个 worker,都是一条线程,同时里面包含了一个 firstTask,即初始化时要被首先执行的任务.

  2. 最终执行任务的,是 runWorker()方法

1
2
3
4
5
6
7
8
9
10
Worker(Runnable firstTask) {
setState(-1); //初始化信号量为-1
this.firstTask = firstTask;
//创建线程将worker对象传入,线程执行的是worker的run方法
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);// 线程启动执行的是此方法>>
}

Worker 类实现了 Runnable 接口,注意其中的 firstTask 和 thread 属性:

firstTask 用它来保存传入的任务;

thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。

在调用构造方法时,需要传入任务,这里通过 getThreadFactory().newThread(this);来新建一个线程,newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,也就是一个线程,所以一个 Worker 对象在启动的时候会调用 Worker 类中的 run 方法。

Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的。

lock 方法一旦获取了独占锁,表示当前线程正在执行任务中,lock方法将state置为1,unlock方法将state置为0,那么它会有以下几个作用

  1. 如果正在执行任务,state =1 则不应该中断线程;

  2. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;

  3. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态

  4. 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池

控制方法时重新获取锁,这样会中断正在运行的线程

1.2.5 runWorker方法

前面已经了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作线程,而 Worker 简单理解其实就是一个线程,里面重新了 run 方法,这块是线程池中执行任务的真正处理逻辑,也就是 runWorker 方法,这个方法主要做几件事

  1. 如果 task 不为空,则开始执行 task

  2. 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务

  3. 执行完毕后,通过 while 循环继续 getTask()取任务

  4. 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用Worker 类的 tryRelease()方法,将 state 置为 0, 而 interruptIfStarted()中只有 state>=0 才允许调用中断
w.unlock();
boolean completedAbruptly = true;
try {
// 如果 task 为空,则通过getTask 来获取任务
while (task != null || (task = getTask()) != null) {
// 上锁不是为了防止并发执行任务,为了在 shutdown()时不终止正在运行的任务
w.lock();
// worker线程池为 stop 状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执行的任务
//所以对于 stop 状态以上是要中断线程的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)确保线程中断标志位为 true 且是 stop 状态以上,接着清除了 中断标志 !wt.isInterrupted()则再一次检查保证线程需要设置中断标志位
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//这里默认是没有实现的,在一些特定的场景中我们可以自己继承 ThreadpoolExecutor 自己重写
Throwable thrown = null;
try {
task.run(); //runWorker最终执行的是task的 run 方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //这里默认默认而也是没有实现
}
} finally {
//置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask()取) + 记录该 Worker 完成任务数量 + 解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
//1.将入参 worker 从数组 workers 里删除掉;
//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组workers
}
}

2. 配置线程池

流程介绍完了先来总结以下上文提到了几个核心参数在流程中的具体作用,然后介绍应该如何配置。

2.1 参数详解

  1. corePoolSize:核心线程数
  • 核心线程会一直存活,即使没有任务需要执行
  • 当线程数小于核心线程数时,即使有线程空闲,线程池也会有限创建新的线程
  • 设置allowCoreThreadTimeout=true(默认是false)时,核心线程会超时关闭
  1. maximumPoolSize:最大线程数
  • 当线程数 >= corePoolSize,且队列已满。线程池会创建新线程来处理
  • 当线程数 = maxmumPoolSize,且队列任务已满是,线程会拒绝处理任务
  1. keepAliveTime:线程空闲时间
  • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量 = corePoolSize
  • 如果allowCoreThreadTimeout = true,则会直到线程数量 = 0
  1. rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务:
  • 当线程数已经达到maxmumPoolSize,且队列已满,会拒绝新任务
  • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务

线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常,
ThreadPoolExecutor类有几个内部实现类来处理这类情况:

AbortPolicy 丢弃任务,抛运行时异常

CallerRunsPolicy 执行任务,调用Runnable的run强制执行。

DiscardPolicy 忽视,什么都不会发生

DiscardOldestPolicy 如果是因为第一种情况被拒绝,则从阻塞队列中踢出最先进入队列的任务,然后再次提交当前任务。

实现RejectedExecutionHandler接口,可自定义处理器处理reject。

2.2 参数配置

默认值:

1
2
3
4
5
corePoolSize=1
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()

如何设置,需要根据几个值来决定:

  • tasks :系统每秒任务数,假设为500~1000
  • taskcost:单任务耗时,假设为0.1s
  • responsetime:系统允许容忍的最大响应时间,假设为1s

做几个计算:
corePoolSize = 系统每秒任务数/单线程每秒任务数 = 系统每秒任务数/(1/单任务耗时)
corePoolSize = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 。 corePoolSize设置应该大于50,根据8020原则,如果80%的系统每秒任务数小于800,那么corePoolSize设置为80即可

maxPoolSize = (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
计算可得 maxPoolSize = (1000-80)/10 = 92
队列容量在初始化池的时候指定,一旦指定不能修改

rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理

keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件和优化代码,降低taskcost来处理。

2.3 参数动态调整

用户可以通过corePoolSize和maxmumPoolSize的getter/setter进行访问和设置,具体怎么设置需要根据当前池中一些状态变量进行判断,如:

  • getLargestPoolSize() 获取到目前为止达到过的最大线程数
  • getPoolSize() 获取当前线程数
  • getQueue().size() 获取当前阻塞队列任务数

3. 关闭线程池

关闭线程池无非就是两个方法 shutdown()/shutdownNow()。

但他们有着重要的区别:

  • shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

两个方法都会中断线程,用户可自行判断是否需要响应中断。
shutdownNow() 要更简单粗暴,可以根据实际场景选择不同的方法。

通常是按照以下方式关闭线程池的:

1
2
3
4
5
6
7
8
9
10
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new Job());
}
pool.shutdown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行。。。");
}
long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池已经完全终止了。