引言
Java中线程池是运用场景最多的并发框架,几乎所有的多任务或者异步并发任务都可以使用Java的线程池。在实际应用中,Java线程池的应用场景主要有以下几个:
- 单任务执行时间短,并发量高
- 定时循环任务或者延迟任务
- 处理可以分解的大任务(ForkJoinPool)
在开发过程中合理地使用线程池可以带来的好处有以下几种:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
Executor框架
Executor 框架是从 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在 Java 5之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,更易管理,效率更高。Executor框架提供了Executors工厂方法,通过Executors可以创建我们需要的线程池。
Executor只提供了一个execute()方法提交任务,而其继承接口ExecutorService则丰富了其接口,不但提供了不同的任务提交方法,还提供了对线程池的管理的方法接口。AbstractExecutorService和ScheduledExecutorService是对ExecutorService更进一步的实现。而最终可以使用的实体类只有ThreadPoolExecutor和ScheduledThreadPoolExecutor。ThreadPoolExecutor会维护一个线程池,用户可以向线程池提交任务同时可以调用shunDown函数来关闭线程池,而ScheduledThreadPoolExecutor则专门用来执行定时或者循环任务,用户可以通过设置任务的执行时间来定时执行任务或者设置任务执行间隔时间来循环地执行任务。
ThreadPoolExecutor
ThreadPoolExecutor通过execute来提交任务,通过corePoolSize和maximumPoolSize来控制线程池中线程的数量,通过设置keepAliveTime来控制线程空闲时长,通过BlockingQueue来存储用户提交的任务,通过RejectExecutionHandler来定义任务拒绝策略。
ThreadPoolExecutor组成
在详细介绍ThreadPoolExecutor之前,我们先了解下ThreadPoolExecutor的一些重要组成变量及内部类。
重要成员变量
|
|
ctl是ThreadPoolExecutor类的核心,由两部分组成,高3位表示线程池的状态,低29位表示线程池中的线程数。如下图所示为ctl结构图
keepAliveTime:非核心线程空闲时长,如果非核心线程在keepAliveTime内没有获得可执行的任务,则会被回收
- allowCoreThreadTimeOut:是否回收核心线程,如果为true表示可以回收核心线程,核心线程回收逻辑同非核心线程相同
- corePoolSize:线程池允许的核心线程数
- maximumPoolSize:线程池中允许的最大线程数量
Worker线程内部类
Worker实现了Runnable接口,是对线程池中线程的一个简单封装。当一个worker线程被创建并启用时,会调用ThreadPoolExecutor的runWorker方法来开始执行任务。
|
|
ThreadPoolExecutor剖析
我们会从以下几个方面着手分析ThreadPoolExecutor
- 线程池状态(线程池的生命周期)
- execute任务提交过程(任务的流向)
- Worker线程创建逻辑(线程的生命周期)
- Worker线程销毁逻辑(线程的生命周期)
线程池状态(线程池的生命周期)
ThreadPoolExecutor通过ctl的高三位存储线程池的状态,线程池共有五中状态:
|
|
- RUNNING:线程池正常运行,可接受新的任务或者消费队列中的任务
- SHUTDOWN:线程池关闭(主线程调用shutdown方法),不再接受新的任务,但会继续执行队列中的任务
- STOP:线程池终止(主线程调用shudownNow方法),既不接受新的任务也不再执行队列中的任务
- TIDYING:所有的任务都被终止,线程数为0,所有线程都被终止。当线程池转换到TIDYING状态时会执行terminated(默认什么都不执行,由子类复写)方法
- TERMINATED:TIDYING状态下执行完terminated方法,线程池转化为此状态
线程池状态转换图:
execute任务提交过程(任务的流向)
execute方法用于用户提交任务,线程池对任务有三种处理方式:新建一个线程执行任务、放入queue队列或者拒绝任务执行。由于execute方法执行的时候并未加锁,因此会在多个地方进行double check线程池的状态。
- 工作线程数(workCount)小于核心线程数时新建一个核心线程执行提交的任务
- 工作线程数大于等于核心线程数并且任务队列(WorkQueue)未满时,将用户新提交的任务放入任务队列
- 任务队列满并且工作线程数小于最大线程数时,创建一个新的非核心线程执行提交的任务
- 工作线程数等于最大线程数且任务队列满时,则拒绝提交的任务
BlockingQueue:任务存储队列,主要有以下几种队列
- LinkedBlockingQueue:无界的FIFO队列(maximumPoolSize失效)
- ArrayBlockingQueue:有界FIFO队列
- SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待移除操作完成后才能执行,任务吞吐量比较高
- ProorityBlockingQueue:具有优先级的无界阻塞队列
RejectExecutionHandler:线程池不能接受新线程时拒绝策略,默认有以下几种策略
- AbortPolicy:直接抛异常(默认)
- CallerRunsPolicy:由创建线程池的线程执行当前提交的任务
- DiscardOldestPolicy:抛弃队列头的任务
- DiscardPolicy:直接抛弃
|
|
上述为execute的源码,其中step1-step4对应上述的任务提交流程图的四个步骤
Worker线程创建(线程生命周期)
通过上边的execue的提交流程可知,ThreadPoolExecutor会在两种情况下新增一个Worker线程。
- 线程池中线程数小于核心线程数
- 任务队列满并且线程池中线程数小于最大线程数
新增Worker主要有以下三个步骤:
- 线程池状态检查:非RUNNING和SHUTDOWN状态下线程池拒绝创建新线程并拒绝提交任务,SHUTDOWN状态下不允许提交新任务,但是需要注意的是即使线程池处于SHUTDOWN状态,但如果任务队列中还有任务未执行完成,并且此时线程池中线程数量为0,线程池允许新建一个线程来消费任务队列中的任务
- 线程池中线程数量检查:线程池线程数量 >= CAPACITY || 核心线程数 >= corePoolSize || 总线程数 >= maximumPoolSize 不允许创建线程;
- 创建新线程并将其加到线程池中,同时调用start()启动线程。
|
|
Worker线程销毁逻辑(线程生命周期)
ThreadPoolExecutor通过addWorker函数新增并启动一个工作线程,通过上边的Worker源码可知,Worker线程执行的是ThreadPoolExecutor.runWorker(Worker)里面的逻辑。runWorker主要做了以下几个工作:
- 获取需要执行的任务,初始化任务或者从任务队列中获取的任务
- 检查线程池状态,保证线程池能够及时中断
- beforeExecute—>task.run()—>afterExecute
- 退出while循环后,执行processWorkerExit方法,中断一个空闲线程
|
|
由上述源码可知线程池中的线程通过while循环来保证线程正常进行,任务来源是初始化任务firstTask或者通过getTask(从任务队列中获取任务),当线程获取到的任务为空时则退出while循环,结束线程生命周期。所以task是否为空控制着线程的生命周期,而task的来源是getTask函数。
|
|
由上述源码可知getTask是通过workQueue的poll来hold当前工作线程keepAlive时长,从而实现工作线程空闲keepAliveTime能够及时回收。
ScheduleThreadPoolExecutor
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其新增和回收线程逻辑,执行任务方式都沿用了ThreadPoolExecutor的逻辑。ScheduledThreadPoolExecutor之所以能够执行定时任务和延迟任务,主要是其自定义实现了一个DelayQueue并封装了一个ScheduledFutureTask(extend FutureTask),其中DelayWorkQueue维护着一个最小堆,最先需要执行的任务在堆顶。
构造函数
ScheduledThreadPoolExecutor本质是一个ThreadPoolExecutor,其构造函数直接t通过super来完成对象的初始化。默认ScheduledThreadPoolExecutor的maximumPoolSize为Integer.MAX_VALUE,keepAliveTime=0,任务队列为DelayedWorkQueue。但是由于DelayedWorkQueue无限增长(最大值Integer.MAX_VALUE),其相当于一个无界队列,所以设置maximumPoolSize基本上是无效的。
|
|
ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor对任务的封装,其中包含了该任务的类型(period)、下次需要执行的时间(time)以及在任务队列中的位置(heapInex)。
|
|
DelayedWorkQueue
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
排序规则:
- 执行时间距离当前时间越近,越靠前
- 如果执行时间相同,则先执行插入时间靠前的任务。
获取任务(take)
take函数主要是获取任务队列最小堆中的第一个任务,其使用了leader-follower模式,关于leader-follower模式可以参考这篇博客。
leader-follower模式中,所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
- 获取任务队列堆顶元素,如果为null则进入wail状态,等待offer的signal唤醒
- 如果堆顶任务执行时间小于当前时间,则返回堆顶任务
- 如果leader为空,则将当前线程设置为leader,并等待至堆顶任务执行时间
- 如果leader已存在,则进入wait状态,等待被唤醒。
|
|
新增任务
ScheduledThreadPoolExecutor支持三种新增任务的方式,新增普通延迟任务,新增固定频率执行任务,新增固定频率执行的延迟任务。
- 通过schedule函数直接新增一条延迟执行的任务(ScheduleThreadPoolExecutor.schedule)
- 通过scheduleAtFixedRate新增一条按固定频率执行的任务(ScheduleThreadPoolExecutor.scheduleAtFixedRate)
- 通过scheduleWithFixedDelay新增一条固定频率执行的延迟任务(ScheduleThreadPoolExecutor.scheduleWithFixedDelay)
ScheduledThreadPoolExecutor是如何实现定时任务和延迟任务的呢?由上面可知ScheduledThreadPoolExecutor重新封装了task也就是ScheduledFutureTask,而定时和延迟任务的执行就在ScheduledFutureTask的run中完成的。任务下次执行时间:
非周期循环任务,无下次执行时间
定时周期任务:上次执行时间+延迟时间
延迟周期任务:当前时间+延迟时间
|
|
Executors工厂类
Executors是一个线程池的工厂类,用户可以通过调用其静态函数来创建不同的线程池,常用的线程池有以下几个:
newFixedThreadPool
12345public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}newFixedThreadPool返回特定线程数的线程池,但需要注意的是newFixedThreadPool的任务队列是LinkedBlockingQueue无界队列,如果生产者速度大于消费者会造成jvm频繁full gc
newSingleThreadExecutor
123456public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}newSingleThreadExecutor返回只有一个线程的线程池,其并没有返回ThreadPoolExecutor对象,而是返回FinalizableDelegatedExecutorService的一个代理类,主要是为了屏蔽一些不必要的操作,例如allowCoreThreadTimeOut()。通newFixedThreadPool一样,newSingleThreadExecutor也是使用LinkedBlockingQueue无界队列来存储任务的。
newCachedThreadPool
12345public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}newCachedThreadPool利用SynchronousQueue作为任务队列,核心线程数设置为0,线程最大空闲时间为60s。newCachedThreadPool实现了一个线程缓存池,当提交任务比较频繁时可以快速创建新的线程处理任务,任务提交不频繁时又可以优雅地回收线程。其适用于处理吞吐量比较高的场景。
newScheduledThreadPool
123public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}newScheduledThreadPool返回一个定时执行任务的线程池,类似于Timer。
线程池分配
合理地配置线程池,需要对任务的特性进行分析,可以从以下几个角度进行分析
任务的性质:CPU密集型、IO密集型还是混合密集型。一般来说CPU密集型可以将线程池数设置为CPU+1,IO密集型可以将线程数设置为2*CPU,混合型的最好能够拆分成CPU密集型和IO密集型两个线程池,这种计算方式适用于一台机器上只跑一个服务的情况。
多应用,多任务线程计算公式:最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目或者进一步转化后:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
任务优先级:高、中、低任务优先级,可以使用PriorityBlockQueue来存储任务
任务执行的时长:长、中、短等,不同时长的任务可以分发给不同的线程池来执行,或者使用PriorityBlockQueue来保证执行时间短的任务先执行。
任务的依赖:是否依赖其他资源,可以根据所依赖的资源的类型来判断执行任务的类型。