线程池使用详解

概述

使用线程时, 我们一般使用new Thread的方式, 如下:

1
2
3
4
5
6
7
new Thread(new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
}
}).start();

这种方式有一些弊端:

  • 每次new Thread新建对象性能差。
  • 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom
  • 缺乏更多功能,如定时执行、定期执行、线程中断

java提供了四种线程池,相较之下有如下好处:

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞
  • 提供定时执行、定期执行、单线程、并发数控制等功能

Callable和Future

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这2种方式的缺陷就是:在执行完任务之后无法获取执行结果。

Callable

Callable也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

Callable一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future也是一个接口,定义如下

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

方法说明:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true

  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true

  • isDone方法表示任务是否已经完成,若任务完成,则返回true

  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回

  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

FutureTask

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了FutureTask

1
2
3
4
5
public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值

FutureTask提供了2个构造器:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

线程池实现架构

ThreadPoolExecutor实现了一般的线程池,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的实现,然后增加了调度功能。

Executor

Executor,任务的执行者,线程池框架中几乎所有类都直接或者间接实现Executor接口,它是线程池框架的基础。Executor提供了一种将“任务提交”与“任务执行”分离开来的机制,它仅提供了一个Execute()方法用来执行已经提交的Runnable任务。

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService提供了将任务提交给执行者的接口(submit方法),让执行者执行任务(invokeAll, invokeAny方法)的接口等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public interface ExecutorService extends Executor {

/**
* 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
*/
void shutdown();

/**
* 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
*/
List<Runnable> shutdownNow();

/**
* 如果此执行程序已关闭,则返回 true。
*/
boolean isShutdown();

/**
* 如果关闭后所有任务都已完成,则返回 true
*/
boolean isTerminated();

/**
* 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future
*/
<T> Future<T> submit(Callable<T> task);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
<T> Future<T> submit(Runnable task, T result);

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future
*/
Future<?> submit(Runnable task);

/**
* 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

为ExecutorService的默认实现,AbstractExecutorService除了实现ExecutorService接口外,还提供了newTaskFor()方法返回一个RunnableFuture,在运行的时候,它将调用底层可调用任务,作为 Future 任务,它将生成可调用的结果作为其结果,并为底层任务提供取消操作

ScheduledExecutorService

继承ExecutorService,为一个“延迟”和“定期执行”的ExecutorService。他提供了一些如下几个方法安排任务在给定的延时执行或者周期性执行:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建并执行在给定延迟后启用的 ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

// 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;
//也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

ThreadPoolExecutor

ThreadPoolExecutor继承于抽象类AbstractExecutorService, 是线程池的主要实现类

线程池实现原理

线程池处理流程

excute流程:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

  3. 如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
    RejectedExecutionHandler.rejectedExecution()方法。

线程池饱和拒绝策略
  • AbortPolicy: 为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出

  • DiscardPolicy: 不做任务处理,直接忽略这个任务

  • DiscardOldestPolicy:如果线程池没有被关闭的话,把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中

  • CallerRunsPolicy: 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务

  • 用户自定义拒绝策略(最常用):实现RejectedExecutionHandler,并自己定义策略模式

源码分析

ctl变量

ThreadPoolExcuter是将两个内部值打包成一个值,即将workerCount和runState(运行状态)这两个值打包在一个ctl中,因为runState有5个值,需要3位,所以有3位表示runState,而其他29位表示为workerCount。而运行时要获取其他数据时,只需要对ctl进行拆包即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 // 与0做 |运算, 求出前三位状态位
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 100

// Packing and unpacking ctl   //拆包ctl,分别获取runState和WorkerCount
private static int runStateOf(int c) { return c & ~CAPACITY; } // 取前三位状态位, ~CAPATITY为29个1
private static int workerCountOf(int c) { return c & CAPACITY; }  // 取线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 拼出状态和线程数

关键的类成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final BlockingQueue<Runnable> workQueue;  // 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行

private final HashSet<Worker> workers = new HashSet<Worker>();//任务的执行值集合,来消费workQueue里面的任务

private volatile ThreadFactory threadFactory;//线程工厂

private volatile RejectedExecutionHandler handler;//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:

1、CallerRunsPolicy:在调用者线程里面运行该任务
2、DiscardPolicy:丢弃任务
3、DiscardOldestPolicy:丢弃workQueue的头部任务

private volatile int corePoolSize;//最下保活work数量

private volatile int maximumPoolSize;//work上限

execute流程

execute源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//表示 “线程池状态” 和 “线程数” 的整数
int c = ctl.get();
// 如果当前线程数少于核心线程数,直接添加一个 worker 执行任务,
// 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,即结束
// 执行的结果,会包装到 FutureTask
// 返回 false 代表线程池不允许提交任务
if (addWorker(command, true))
return;
c = ctl.get();
}

// 到这说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败

// 如果线程池处于 RUNNING ,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 若任务进入 workQueue,我们是否需要开启新的线程
* 线程数在 [0, corePoolSize) 是无条件开启新线程的
* 若线程数已经大于等于 corePoolSize,则将任务添加到队列中,然后进到这里
*/
int recheck = ctl.get();
// 若线程池不处于 RUNNING ,则移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 若线程池还是 RUNNING ,且线程数为 0,则开启新的线程
// 这块代码的真正意图:担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 若 workQueue 满,到该分支
// addWorker第二个参数为false, 以 maximumPoolSize 为界创建新 worker,
// 若失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

addWorker流程

addWorker源码流程:

  1. 检查是否可以根据当前池状态和给定的边界(核心或最大)
  2. 添加新工作线程。如果是这样,工作线程数量会相应调整,如果可能的话,一个新的工作线程创建并启动
  3. 将firstTask作为其运行的第一项任务。
  4. 如果池已停止此方法返回false
  5. 如果线程工厂在被访问时未能创建线程,也返回false
  6. 如果线程创建失败,或者是由于线程工厂返回null,或者由于异常(通常是在调用Thread.start()后的OOM)),我们干净地回滚
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private boolean addWorker(Runnable firstTask, boolean core) {
// java标签
retry:
// 死循环
for (;;) {
int c = ctl.get();
// 获取当前线程状态和线程数
int rs = runStateOf(c);

// 线程池状态大于 SHUTDOWN, 其实就是 STOP, TIDYING, 或 TERMINATED, 代表线程池状态为关闭
// 对下面的条件的理解:如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
// 1. 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务(rs >= SHUTDOWN)
// 2. 如果线程池处于 SHUTDOWN,但是 firstTask 为 null,且 workQueue 非空,
// 那么是允许创建 worker 的(rs==SHUTDOWN && firstTask==null & !workQueue.isEmpty;该条件下整个表达式为false)
// 原因:
// 这是因为 SHUTDOWN 的语义:不允许提交新的任务,但是要把已经进入到 workQueue 的任务执行完
// 所以在满足条件的基础上,是允许创建新的 Worker 的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;


// 如果线程池状态为RUNNING并且队列中还有需要执行的任务
for (;;) {
// 获取线程池中线程数量
int wc = workerCountOf(c);
// 检测是否满足创建线程的条件
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
// 这里失败的话,说明有其他线程也在尝试往线程池中创建线程
if (compareAndIncrementWorkerCount(c))
// 跳出retry
break retry;
// 由于有并发,重新再读取一下 ctl
c = ctl.get();
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// 走到这里说明工作线程数增加成功,所有的校验都通过了

// worker 是否启动
boolean workerStarted = false;
// 是否将这个 worker 添加到 workers 这个 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 把 firstTask 传给 worker 的构造方法
w = new Worker(firstTask);
// 取 worker 中的线程对象
final Thread t = w.thread;
if (t != null) {
// 这个是整个线程池的全局锁
// 因为关闭一个线程池需要这个锁,至少持有锁的期间,线程池不会被关闭
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 小于 SHUTTDOWN 那就是 RUNNING,这个自不必说,是最正常的情况
// 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 检查线程状态,不能是已经启动的
if (t.isAlive())
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新线程池线程数且不超过最大值
int s = workers.size();
// largestPoolSize 用于记录 workers 中的个数的最大值
// 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,启动线程
if (workerAdded) {
// 启动 worker 线程, 调用 run()
// 里面的方法是: runWorker()
t.start();
workerStarted = true;
}
}
} finally {
// // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
if (! workerStarted)
// workers 删除对应的 worker
// workerCount 减1
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}

工作线程-runWorker流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
// 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 该线程的第一个任务(若有)
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调用 getTask 获取任务
while (task != null || (task = getTask()) != null) {
// 先将 worker 锁起来
w.lock();
// 若线程池状态大于等于 STOP,那么意味着该线程也要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();


try {
// 这是一个钩子方法,留给需要的子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 将 Throwable 转换成 Error
thrown = x; throw new Error(x);
} finally {
// 同样是个钩子方法
afterExecute(task, thrown);
}
} finally {
// 置空 task,准备 getTask 获取下一个任务
task = null;
// 累加完成的任务数
w.completedTasks++;
// 释放掉 worker 的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 到这里,需要执行线程关闭:
// 1. 说明 getTask 返回 null,也就是说,队列中已经没有任务需要执行了,执行关闭
// 2. 任务执行过程中发生了异常
processWorkerExit(w, completedAbruptly);
}
}
getTask获取任务流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 此方法有三种可能,返回 null 代表线程会被关闭
// 1. 阻塞直到获取到任务返回。默认 corePoolSize 之内的线程是不会被回收的,它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,须返回 null
// - 线程池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
// - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
// - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 返回null的情况
// 1. rs == stop 2.rs == shutdown && workQueue = null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 当前线程数目大于corePoolSize 或者 allowCoreThreadTimeOut(允许核心线程回收)
// allowCoreThreadTimeOut 可以通过 executor.setallowCoreThreadTimeOut(true) 来允许回收核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 1. 大于最大线程数返回 null,回收线程
// 2. 超时并且任务队列为空返回null, 线程回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 到任务队列获取任务
// 如果允许核心线程回收,如果任务队列获取不到任务会返回超时
// 结合上面,可以看出核心线程会被回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit

执行线程回收处理, 走到这里说明:

  1. 说明 getTask 返回 null,也就是说,队列中已经没有任务需要执行了,执行关闭
  2. 任务执行过程中发生了异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果是异常原因中断,那么需要将运行线程数减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    //设置完成任务数
    completedTaskCount += w.completedTasks;
    //将worker从集合里移除
    workers.remove(w);
    } finally {
    mainLock.unlock();
    }
    //判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
    tryTerminate();

    int c = ctl.get();
    //如果是RUNNING或SHUTDOWN则会进入这个方法
    if (runStateLessThan(c, STOP)) {
    //如不是以外中断则会往下走
    if (!completedAbruptly) {
    //判断是否保留最少核心线程数
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    if (min == 0 && ! workQueue.isEmpty())
    min = 1;
    if (workerCountOf(c) >= min)
    return; // replacement not needed
    }
    //如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Worker
    addWorker(null, false);
    }
    }

Worker执行流程:

线程池的使用

创建线程池的参数

我们可以通过ThreadPoolExecutor来创建一个线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 这几个参数都是必须要有的
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

各个参数含义:

  • corePoolSize: 线程池基本大小,核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。

  • maximumPoolSize:最大线程数,超过就reject;线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。

  • keepAliveTime:线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率(创建线程的开销比较大)

  • unit:线程活动保持时间的单位)

  • workQueue:工作队列,线程池中的工作线程都是从这个工作队列源源不断的获取任务进行执行

  • threadFactory:用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样

  • handler: 线程池饱和策略。当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定

线程池拒绝策略

也就是上面的 handler , 当线程池饱和后,对新任务采取的拒绝策略。

  • AbortPolicy: 默认拒绝策略, 直接抛出 RejectedExecutionException, 源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
    }
    }
  • CallerRunsPolicy: 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务:

    1
    2
    3
    4
    5
    6
    7
    8
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    r.run();
    }
    }
    }
  • DiscardPolicy: 什么也不做,忽略这个任务 :

    1
    2
    3
    4
    5
    public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    }
  • DiscardOldestPolicy: 这个策略相对霸道一些,如果线程池没有被关闭的话, 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
    }
    }
    }

固定线程池 FixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列.

刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为nThreads。如果任务过多,很容易导致内存飙升甚至oom

复用线程池 CachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。

这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。

SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点

合理的配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

  • 服务器IO性能优化公式: 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目, 可以优化成:
    最佳线程数目 = (线程等待时间/线程CPU时间之比 + 1)* CPU数目

比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((1.5/0.5 + 1)*8=32

关闭线程池

  • RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务。
  • SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行结束。
  • STOP: 该状态下线程池不再接受新任务,但是不会处理工作队列中的任务,并且将会中断线程。
  • TIDYING:该状态下所有任务都已终止,将会执行 terminated() 钩子方法。
  • TERMINATED:执行完 terminated() 钩子方法之后。

shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 设置线程池状态
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 钩子函数,主要用于清理一些资源
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

这里先加锁,然后检测状态,紧接着线程池状态就会变成SHUTDOWN, 线程池将不在接收任何任务。此时如果还继续往线程池提交任务,将会使用线程池拒绝策略响应,默认情况下将会使用 ThreadPoolExecutor.AbortPolicy,抛出 RejectedExecutionException 异常

interruptIdleWorkers 方法只会中断空闲的线程,不会中断正在执行任务的的线程。空闲的线程将会阻塞在线程池的阻塞队列上

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查状态
checkShutdownAccess();
// 将线程池状态变为 STOP
advanceRunState(STOP);
// 中断所有线程,包括工作线程以及空闲线程
interruptWorkers();
// 丢弃工作队列中存量任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

相较于 shutdown, shutdownNow 会直接中断工作线程。但是中断线程并不代表线程立刻结束。这里需要线程主动配合线程中断响应(线程 interrupt 机制)。

awaitTermination

线程池 shutdown 与 shutdownNow 方法都不会主动等待执行任务的结束,如果需要等到线程池任务执行结束,需要调用 awaitTermination 主动等待任务调用结束。

使用方法如下:

1
2
3
4
5
6
7
8
9
threadPool.shutdown();
try {
// 等待任务执行结束
while (!threadPool.awaitTermination(60,TimeUnit.SECONDS)){
System.out.println("线程池任务还未执行结束");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

如果线程池任务执行结束,awaitTermination 方法将会返回 true,否则当等待时间超过指定时间后将会返回 false

更加优雅的关闭线程池

回顾上面线程池状态关系图,我们可以知道处于 SHUTDOWN 的状态下的线程池依旧可以调用 shutdownNow。所以我们可以结合 shutdown , shutdownNow,awaitTermination ,更加优雅关闭线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
threadPool.shutdown(); // Disable new tasks from being submitted
// 设定最大重试次数
try {
// 等待 60 s
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
// 调用 shutdownNow 取消正在执行的任务
threadPool.shutdownNow();
// 再次等待 60 s,如果还未结束,可以再次尝试,或则直接放弃
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("线程池任务未正常执行结束");
}
} catch (InterruptedException ie) {
// 重新调用 shutdownNow
threadPool.shutdownNow();
}

参考资料


线程池使用详解
https://haobin.work/2018/09/15/并发/线程池使用总结/
作者
Leo Hao
发布于
2018年9月15日
许可协议