浅谈Java线程池

引言

  Java中线程池是运用场景最多的并发框架,几乎所有的多任务或者异步并发任务都可以使用Java的线程池。在实际应用中,Java线程池的应用场景主要有以下几个:

  • 单任务执行时间短,并发量高
  • 定时循环任务或者延迟任务
  • 处理可以分解的大任务(ForkJoinPool)

  在开发过程中合理地使用线程池可以带来的好处有以下几种:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

Executor框架

  Executor 框架是从 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在 Java 5之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,更易管理,效率更高。Executor框架提供了Executors工厂方法,通过Executors可以创建我们需要的线程池。

Executor架构

  Executor只提供了一个execute()方法提交任务,而其继承接口ExecutorService则丰富了其接口,不但提供了不同的任务提交方法,还提供了对线程池的管理的方法接口。AbstractExecutorService和ScheduledExecutorService是对ExecutorService更进一步的实现。而最终可以使用的实体类只有ThreadPoolExecutor和ScheduledThreadPoolExecutor。ThreadPoolExecutor会维护一个线程池,用户可以向线程池提交任务同时可以调用shunDown函数来关闭线程池,而ScheduledThreadPoolExecutor则专门用来执行定时或者循环任务,用户可以通过设置任务的执行时间来定时执行任务或者设置任务执行间隔时间来循环地执行任务。

ThreadPoolExecutor

  ThreadPoolExecutor通过execute来提交任务,通过corePoolSize和maximumPoolSize来控制线程池中线程的数量,通过设置keepAliveTime来控制线程空闲时长,通过BlockingQueue来存储用户提交的任务,通过RejectExecutionHandler来定义任务拒绝策略。

ThreadPoolExecutor组成

  在详细介绍ThreadPoolExecutor之前,我们先了解下ThreadPoolExecutor的一些重要组成变量及内部类。

重要成员变量

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//非核心线程空闲时长
private volatile long keepAliveTime;
//是否允许回收核心线程
private volatile boolean allowCoreThreadTimeOut;
//线程池核心线程数
private volatile int corePoolSize;
//线程池最大线程数
private volatile int maximumPoolSize;
  • ctl是ThreadPoolExecutor类的核心,由两部分组成,高3位表示线程池的状态,低29位表示线程池中的线程数。如下图所示为ctl结构图

    ctl结构图

  • keepAliveTime:非核心线程空闲时长,如果非核心线程在keepAliveTime内没有获得可执行的任务,则会被回收

  • allowCoreThreadTimeOut:是否回收核心线程,如果为true表示可以回收核心线程,核心线程回收逻辑同非核心线程相同
  • corePoolSize:线程池允许的核心线程数
  • maximumPoolSize:线程池中允许的最大线程数量

Worker线程内部类

  Worker实现了Runnable接口,是对线程池中线程的一个简单封装。当一个worker线程被创建并启用时,会调用ThreadPoolExecutor的runWorker方法来开始执行任务。

1
2
3
4
5
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
public void run() {
runWorker(this);
}
}

ThreadPoolExecutor剖析

  我们会从以下几个方面着手分析ThreadPoolExecutor

  • 线程池状态(线程池的生命周期)
  • execute任务提交过程(任务的流向)
  • Worker线程创建逻辑(线程的生命周期)
  • Worker线程销毁逻辑(线程的生命周期)
线程池状态(线程池的生命周期)

  ThreadPoolExecutor通过ctl的高三位存储线程池的状态,线程池共有五中状态:

1
2
3
4
5
6
7
8
9
10
11
12
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //00011111111111111111111111111111 = 2^29 - 1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //11100000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //00000000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS; //00100000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS; //01000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS; //01100000000000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; } //获取线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; } //获取线程池中线程的个数
private static int ctlOf(int rs, int wc) { return rs | wc; } //根据线程池状态和线程个数组合称ctrl
  • RUNNING:线程池正常运行,可接受新的任务或者消费队列中的任务
  • SHUTDOWN:线程池关闭(主线程调用shutdown方法),不再接受新的任务,但会继续执行队列中的任务
  • STOP:线程池终止(主线程调用shudownNow方法),既不接受新的任务也不再执行队列中的任务
  • TIDYING:所有的任务都被终止,线程数为0,所有线程都被终止。当线程池转换到TIDYING状态时会执行terminated(默认什么都不执行,由子类复写)方法
  • TERMINATED:TIDYING状态下执行完terminated方法,线程池转化为此状态

线程池状态转换图:

状态转换图

execute任务提交过程(任务的流向)

  execute方法用于用户提交任务,线程池对任务有三种处理方式:新建一个线程执行任务、放入queue队列或者拒绝任务执行。由于execute方法执行的时候并未加锁,因此会在多个地方进行double check线程池的状态。

任务提交流程图

  1. 工作线程数(workCount)小于核心线程数时新建一个核心线程执行提交的任务
  2. 工作线程数大于等于核心线程数并且任务队列(WorkQueue)未满时,将用户新提交的任务放入任务队列
  3. 任务队列满并且工作线程数小于最大线程数时,创建一个新的非核心线程执行提交的任务
  4. 工作线程数等于最大线程数且任务队列满时,则拒绝提交的任务

BlockingQueue:任务存储队列,主要有以下几种队列

  • LinkedBlockingQueue:无界的FIFO队列(maximumPoolSize失效)
  • ArrayBlockingQueue:有界FIFO队列
  • SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待移除操作完成后才能执行,任务吞吐量比较高
  • ProorityBlockingQueue:具有优先级的无界阻塞队列

RejectExecutionHandler:线程池不能接受新线程时拒绝策略,默认有以下几种策略

  • AbortPolicy:直接抛异常(默认)
  • CallerRunsPolicy:由创建线程池的线程执行当前提交的任务
  • DiscardOldestPolicy:抛弃队列头的任务
  • DiscardPolicy:直接抛弃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程池中线程数小于核心线程数(step1)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get(); //如果增加线程失败,说明线程池状态发生变化,需要重新获取线程池状态
}
//线程池正常,将任务放进队列中(step2)
if (isRunning(c) && workQueue.offer(command)) {
//需要进行double check,防止在执行该方法时线程状态变化
int recheck = ctl.get();
//如果线程处于非运行状态则需要移除该任务,并调用拒绝策略拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池中线程数为0说明在执行该方法时主线程执行了shutdown操作,需要重新启动一个线程执行队列中的任务(由于workQueue.offer(command)执行成功,因此队列中至少有一个任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false)) //增加非核心线程(step3)
reject(command); //拒绝任务(step4)
}

上述为execute的源码,其中step1-step4对应上述的任务提交流程图的四个步骤

Worker线程创建(线程生命周期)

  通过上边的execue的提交流程可知,ThreadPoolExecutor会在两种情况下新增一个Worker线程。

  • 线程池中线程数小于核心线程数
  • 任务队列满并且线程池中线程数小于最大线程数

  新增Worker主要有以下三个步骤:

  • 线程池状态检查:非RUNNING和SHUTDOWN状态下线程池拒绝创建新线程并拒绝提交任务,SHUTDOWN状态下不允许提交新任务,但是需要注意的是即使线程池处于SHUTDOWN状态,但如果任务队列中还有任务未执行完成,并且此时线程池中线程数量为0,线程池允许新建一个线程来消费任务队列中的任务
  • 线程池中线程数量检查:线程池线程数量 >= CAPACITY || 核心线程数 >= corePoolSize || 总线程数 >= maximumPoolSize 不允许创建线程;
  • 创建新线程并将其加到线程池中,同时调用start()启动线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/******************* 1. 状态检查begin *****************************/
//等同于 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
//等同于
// 1. rs > SHUTDOWN:不允许新增线程,不接受新任务
// 2. rs = SHUTDOWN && firstTask != null:不允许新增线程,并拒绝新任务
// 3. rs = SHUTDOWN && firstTask == null && workQueue.isEmpty(): 线程池关闭,任务队列中的任务为空,不再允许提交新的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/******************* 2. 线程池数量检查begin *****************************/
for (;;) {
int wc = workerCountOf(c);
//1. 线程数 >= CAPACITY
//2. 线程数 >= corePoolSize时不允许创建核心线程
//3. 线程数 >= maximumPoolSize 不允许创建非核心线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加线程数成功,跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//re-check线程池状态,不一致说明触发了线程池状态变更,需要重新验证是否需要创建新的线程
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/******************* 3.创建并运行线程begin *****************************/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 1. 线程池处于正常运行时,新增线程ok
// 2. rs = SHUTDOWN状态下,只有firstTask为null时才允许新增线程,见execute(),线程池关闭并且线程数为0
//但任务队列中还有未完成的任务.
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker线程销毁逻辑(线程生命周期)

  ThreadPoolExecutor通过addWorker函数新增并启动一个工作线程,通过上边的Worker源码可知,Worker线程执行的是ThreadPoolExecutor.runWorker(Worker)里面的逻辑。runWorker主要做了以下几个工作:

  • 获取需要执行的任务,初始化任务或者从任务队列中获取的任务
  • 检查线程池状态,保证线程池能够及时中断
  • beforeExecute—>task.run()—>afterExecute
  • 退出while循环后,执行processWorkerExit方法,中断一个空闲线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //先执行初始化时的任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; //主要用来判断线程是正常结束还是异常结束,true为异常结束,在processWorkerExit中用来标志是否将线程池中的线程数减一
try {
// task是控制线程周期的重要因素
while (task != null || (task = getTask()) != null) {
w.lock();
//判断线程池状态,中断线程
// 线程池状态>=STOP并且线程未被中断,需要中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行自定义的任务执行前操作,默认什么也不执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后执行的操作,默认什么也不执行
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程退出后执行清理工作
processWorkerExit(w, completedAbruptly);
}
}

  由上述源码可知线程池中的线程通过while循环来保证线程正常进行,任务来源是初始化任务firstTask或者通过getTask(从任务队列中获取任务),当线程获取到的任务为空时则退出while循环,结束线程生命周期。所以task是否为空控制着线程的生命周期,而task的来源是getTask函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 返回null则退出线程
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1. rs > SHUTDOWN:线程池关闭,回收该线程
// 2. rs = SHUTDOWN && workQueue.isEmpty():任务队列中任务执行完毕,回收该线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed表示是否需要回收该线程,如果allowCoreThreadTimeOut设置为true则无论当前线程数
// 设置多少都需要回收,否则只有当线程池中线程数大于corePoolSize时才需要回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//需要回收线程时,则将线程数减一,并返回null,在runWorker中回收线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//1:如果timed为true,该线程需要回收,通过将workQueue.poll的超时时间设置为
// keepAliveTime来保证返回的task是否为空,从而来判断该线程是否需要回收
// 2:timed为false,则阻塞获取workQueue,直到线程中断或者获取到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

  由上述源码可知getTask是通过workQueue的poll来hold当前工作线程keepAlive时长,从而实现工作线程空闲keepAliveTime能够及时回收。

getTask

ScheduleThreadPoolExecutor

  ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其新增和回收线程逻辑,执行任务方式都沿用了ThreadPoolExecutor的逻辑。ScheduledThreadPoolExecutor之所以能够执行定时任务和延迟任务,主要是其自定义实现了一个DelayQueue并封装了一个ScheduledFutureTask(extend FutureTask),其中DelayWorkQueue维护着一个最小堆,最先需要执行的任务在堆顶。

构造函数

  ScheduledThreadPoolExecutor本质是一个ThreadPoolExecutor,其构造函数直接t通过super来完成对象的初始化。默认ScheduledThreadPoolExecutor的maximumPoolSize为Integer.MAX_VALUE,keepAliveTime=0,任务队列为DelayedWorkQueue。但是由于DelayedWorkQueue无限增长(最大值Integer.MAX_VALUE),其相当于一个无界队列,所以设置maximumPoolSize基本上是无效的。

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledFutureTask

  ScheduledFutureTask是ScheduledThreadPoolExecutor对任务的封装,其中包含了该任务的类型(period)、下次需要执行的时间(time)以及在任务队列中的位置(heapInex)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//入队列的序号
private final long sequenceNumber;
//任务执行的时间
private long time;
//任务类型
//正数:按固定频率执行
//0:非重复执行的任务
// 负数:按固定延迟执行
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
// 在任务队列数组中的索引
int heapIndex;

DelayedWorkQueue

  DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

排序规则:
  • 执行时间距离当前时间越近,越靠前
  • 如果执行时间相同,则先执行插入时间靠前的任务。

获取任务(take)

  take函数主要是获取任务队列最小堆中的第一个任务,其使用了leader-follower模式,关于leader-follower模式可以参考这篇博客

leader-follower模式中,所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

  • 获取任务队列堆顶元素,如果为null则进入wail状态,等待offer的signal唤醒
  • 如果堆顶任务执行时间小于当前时间,则返回堆顶任务
  • 如果leader为空,则将当前线程设置为leader,并等待至堆顶任务执行时间
  • 如果leader已存在,则进入wait状态,等待被唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) //队列中没有任务,需要等待offer函数唤醒
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) //到任务的执行时间,执行该任务
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null) //当leader线程不为null时说明有leader线程在等待第一个任务,其他线程进入wait状态
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; //设置为leader线程
try {
//等待delay时间,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null; //将leader设置为null并在下一个循环中获取任务
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal(); //唤醒其他线程
lock.unlock();
}
}

新增任务

  ScheduledThreadPoolExecutor支持三种新增任务的方式,新增普通延迟任务,新增固定频率执行任务,新增固定频率执行的延迟任务。

  1. 通过schedule函数直接新增一条延迟执行的任务(ScheduleThreadPoolExecutor.schedule)
  2. 通过scheduleAtFixedRate新增一条按固定频率执行的任务(ScheduleThreadPoolExecutor.scheduleAtFixedRate)
  3. 通过scheduleWithFixedDelay新增一条固定频率执行的延迟任务(ScheduleThreadPoolExecutor.scheduleWithFixedDelay)

  ScheduledThreadPoolExecutor是如何实现定时任务和延迟任务的呢?由上面可知ScheduledThreadPoolExecutor重新封装了task也就是ScheduledFutureTask,而定时和延迟任务的执行就在ScheduledFutureTask的run中完成的。任务下次执行时间:

  • 非周期循环任务,无下次执行时间

  • 定时周期任务:上次执行时间+延迟时间

  • 延迟周期任务:当前时间+延迟时间

    周期任务执行时间序列图

任务执行步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {
//判断是否是定时任务
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//非循环任务,直接执行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//循环任务任务,执行完后设置下次执行的时间
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
//循环任务直接在本次执行时间上加上时间间隔
if (p > 0)
time += p;
else
//延迟定时任务则将当前时间+延迟时间作为下次执行的时间
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

Executors工厂类

  Executors是一个线程池的工厂类,用户可以通过调用其静态函数来创建不同的线程池,常用的线程池有以下几个:

  1. newFixedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    newFixedThreadPool返回特定线程数的线程池,但需要注意的是newFixedThreadPool的任务队列是LinkedBlockingQueue无界队列,如果生产者速度大于消费者会造成jvm频繁full gc

  2. newSingleThreadExecutor

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    newSingleThreadExecutor返回只有一个线程的线程池,其并没有返回ThreadPoolExecutor对象,而是返回FinalizableDelegatedExecutorService的一个代理类,主要是为了屏蔽一些不必要的操作,例如allowCoreThreadTimeOut()。通newFixedThreadPool一样,newSingleThreadExecutor也是使用LinkedBlockingQueue无界队列来存储任务的。

  3. newCachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    newCachedThreadPool利用SynchronousQueue作为任务队列,核心线程数设置为0,线程最大空闲时间为60s。newCachedThreadPool实现了一个线程缓存池,当提交任务比较频繁时可以快速创建新的线程处理任务,任务提交不频繁时又可以优雅地回收线程。其适用于处理吞吐量比较高的场景。

  4. newScheduledThreadPool

    1
    2
    3
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    newScheduledThreadPool返回一个定时执行任务的线程池,类似于Timer。

线程池分配

  合理地配置线程池,需要对任务的特性进行分析,可以从以下几个角度进行分析

  1. 任务的性质:CPU密集型、IO密集型还是混合密集型。一般来说CPU密集型可以将线程池数设置为CPU+1,IO密集型可以将线程数设置为2*CPU,混合型的最好能够拆分成CPU密集型和IO密集型两个线程池,这种计算方式适用于一台机器上只跑一个服务的情况。

  2. 多应用,多任务线程计算公式:最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目或者进一步转化后:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

  3. 任务优先级:高、中、低任务优先级,可以使用PriorityBlockQueue来存储任务

  4. 任务执行的时长:长、中、短等,不同时长的任务可以分发给不同的线程池来执行,或者使用PriorityBlockQueue来保证执行时间短的任务先执行。

  5. 任务的依赖:是否依赖其他资源,可以根据所依赖的资源的类型来判断执行任务的类型。