引言
线程是程序执行的最小单元,合理的使用线程可以充分利用系统资源、提高吞吐率以及加快响应时间。然而在实际应用中,很多线程都是朝生夕死的。而创建和销毁线程又极大的耗费系统资源,因此从jdk1.5开始引入了线程池的概念,用户可以使用Executors静态工厂类来创建各种各样的满足自己的需求的线程池。一般来说最常用的线程池主要有以下三种:
- ThreadPoolExecutor:基础多任务线程池框架。
- ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,通过实现延时队列实现定时任务
- ForkJoinPool(>=jdk1.7):并行任务框架,利用多核处理器并行执行任务
ThreadPoolExecutor
ThreadPoolExecutor在初始化之前可以根据用户的配置来维护一个线程池来执行用户提交的任务。其核心方法是execute(),用来提交并执行任务(实现了Runnable接口),同时通过shutDown和shutDownNow来实现对线程池的生命周期的管理。在介绍核心方法之前,我们先介绍一些ThreadPoolExecutor的重要组成变量或类
ctl
|
|
ctl是ThreadPoolExecutor类的核心,其包含了线程池运行状态以及线程池中的线程数,利用AtomicInteger来保证对这个变量的修改是原子性的。如下图所示为ctl结构图
ctl的低29位用来表示线程池中线程的数量,高3位用来表示线程池的运行状态。
线程池的运行状态:
|
|
- RUNNING:线程池正常运行,可接受新的任务或者消费队列中的任务
- SHUTDOWN:线程池关闭(主线程调用shutdown方法),不再接受新的任务,但会继续执行队列中的任务
- STOP:线程池终止(主线程调用shudownNow方法),既不接受新的任务也不再执行队列中的任务
- TIDYING:所有的任务都被终止,线程数为0,所有线程都被终止。当线程池转换到TIDYING状态时会执行terminated(默认什么都不执行,由子类复写)方法
- TERMINATED:TIDYING状态下执行完terminated方法,线程池转化为此状态
线程池状态转换图:
后两种状态相对来说比较复杂,我们这里着重关注前三种状态即可。
构造函数
|
|
先来看一下构造函数的入参:
- corePoolSize:线程池核心线程数
- maximumPoolSize:线程池最大线程数
- keepAliveTime:线程空闲时存活时长,当allowCoreThreadTimeOut=true时,核心线程也会被回收
- unit:keepAliveTime的单位
- workQueue:任务存储队列,主要有以下几种队列
- LinkedBlockingQueue:无界的FIFO队列(maximumPoolSize失效)
- ArrayBlockingQueue:有界FIFO队列
- SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待移除操作完成后才能执行
- ProorityBlockingQueue:具有优先级的无界阻塞队列
- threadFactory:创建线程的工厂类(默认为DefaultThreadFactory)
- handler:线程池不能接受新线程时拒绝策略,默认有以下几种策略
- AbortPolicy:直接抛异常(默认)
- CallerRunsPolicy:由创建线程池的线程执行当前提交的任务
- DiscardOldestPolicy:抛弃队列头的任务
- DiscardPolicy:直接抛弃
线程池的上面的几个参数决定了线程创建策略以及任务执行过程,如下图所示为一个任务提交执行的流程:
Worker
Worker是线程池中线程的基本单位,其对线程进行了一个简单的包装,本身实现了Runnable接口。同时Worker继承自AbstractQueuedSynchronizer(AQS),实现了一份简单的非重入互斥锁。
|
|
ThreadPoolExecutor重要方法
execute(Runnable command)提交任务
execute方法用于用户提交任务,线程池对任务有三种处理方式:新建一个线程执行任务、放入queue队列或者拒绝任务执行。由于execute方法执行的时候并未加锁,因此会在多个地方进行double check线程池的状态。
- 尝试增加核心线程(step1)
- 尝试将任务放入任务队列(step2)
- 尝试增加非核心线程(step3)
|
|
addWorker(Runnable firstTask, boolean core) 新增线程,并处理当前任务
- firstTask:创建线程时当前需要执行的任务
- core:创建的线程是否是核心线程(true:核心线程,false:非核心线程)
新增Worker主要有以下三个步骤:
- 线程池状态检查:非RUNNING和SHUTDOWN状态下线程池拒绝创建新线程并拒绝提交任务,SHUTDOWN状态下不允许提交新任务,但在线程池中线程数为0并且任务队列不为空时才允许创建一个线程来执行任务队列中剩余的任务。
- 线程池中线程数量检查:线程池线程数量 >= CAPACITY || 核心线程数 >= corePoolSize || 总线程数 >= maximumPoolSize 不允许创建线程;
- 创建新线程并将其加到线程池中,同时调用start()启动线程。
|
|
Worker.run()
由于Worker本身实现了Runable接口,因此主线程在addWorker中调用t.start()方法后就启动了线程。而Worker的run中又调用了ThreadPoolExecutor的runWorker方法。
|
|
ThreadPoolExecutor.runWorker(Worker worker)
runWorker是线程池中线程运行的核心,通过一个while的loop循环来保证线程运行状态,首先会处理初始化Worker的任务,如果初始化任务为空则会从任务队列中获取任务进行执行。
- 获取需要执行的任务,初始化任务或者从任务队列中获取的任务
- 检查线程池状态,保证线程池能够及时中断
- beforeExecute—>task.run()—>afterExecute
- 退出while循环后,执行processWorkerExit方法,中断一个空闲线程
|
|
getTask()
getTask主要有两个个功能,获取任务队列中的任务并判断是否需要回收线程(返回null则说明需要回收线程)。
- 例行检查线程池状态
- 根据timed(是否需要回收线程)和timeOut(线程空闲时间是否超过keepAlive)等来判断是否回收线程
- 如果需要回收线程,则利用任务队列的poll机制来设定线程空闲时间
|
|
processWorkerExit
processWorkerExit主要是在线程结束后做一些处理工作
- 如果线程异常结束则原子递减线程池中的线程数,同时移除线程池中的线程
- 尝试中断线程池,主要是进行例行检查
- 检查线程池状态,并根据线程池配置或者当前线程是否是正常结束来判断是否回补线程
|
|