引言
对于一般的多任务执行,ThreadPoolExecutor可以满足大部分需求。但是有时候我们需要定时或者延迟地去执行一个任务,这个时候ThreadPoolExecutor已经不能满足我们的需求了,所以Java提供了ScheduledThreadPoolExecutor来执行定时或延迟任务。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其新增和回收线程逻辑,执行任务方式都沿用了ThreadPoolExecutor的逻辑。ScheduledThreadPoolExecutor之所以能够执行定时任务和延迟任务,主要是其自定义实现了一个DelayQueue并封装了一个ScheduledFutureTask(extend FutureTask)。
构造函数
ScheduledThreadPoolExecutor本质是一个ThreadPoolExecutor,其构造函数直接t通过super来完成对象的初始化。默认ScheduledThreadPoolExecutor的maximumPoolSize为Integer.MAX_VALUE,keepAliveTime=0,任务队列为DelayedWorkQueue。但是由于DelayedWorkQueue是无界队列,所以设置maximumPoolSize是无效的。
|
|
ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor对任务的封装,其中包含了该任务的类型(period)、下次需要执行的时间(time)以及在任务队列中的位置(heapInex)
|
|
DelayedWorkQueue
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
排序规则:
- 执行时间距离当前时间越近,越靠前
- 如果执行时间相同,则先执行插入时间靠前的任务。
新增/获取任务
DelayedWorkQueue通过put或者add来新增一条任务,但其底层都是调用offer来新增任务的。对于获取任务,我们知道在ThreadPoolExecutor中线程根据getTask来获取任务队列中的任务,而在getTask中任务队列通过poll或者take函数来获取任务队列中的任务,由于ScheduleThreadPoolExecutor继承自ThreadPoolExecutor,因此其底层获取任务方式相同,只需要DelayedWorkQueue提供take及pool方法即可。
在分析offer、take及poll之前,我们先看下siftUp及siftDown函数。
DelayWorkQueue底层是用最小堆数据结构实现的,需要最先执行的任务在堆的顶部,因此在每次插入或者删除任务时需要调整二叉树节点的顺序,但不同于最小堆的地方在于DelayWorkQueue不关心兄弟节点之间的顺序,只要父节点的任务先于子节点执行即可。
在一个最小堆的队列中,假如索引从0开始,子节点索引值为k,父节点索引值为p,则存在如下规律:
- 一个节点的左子节点的索引为:k = p * 2 + 1;
- 一个节点的右子节点的索引为:k = (p + 1) * 2;
- 一个节点的父节点的索引为:p = (k - 1) / 2。
siftUp函数在新增一个任务时调用,通过循环对比父子节点任务执行的先后顺序来调整新任务在堆中的位置。
|
|
siftDown函数是将一个任务从k节点一层一层地最小堆的底层沉淀,能够保证执行完后最小堆中的父节点任务先于子节点执行。
|
|
take
take函数主要是获取任务队列最小堆中的第一个任务,其使用了leader-follower模式,关于leader-follower模式可以参考这篇博客。
leader-follower模式中,所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
- 获取任务队列堆顶元素,如果为null则进入wail状态,等待offer的signal唤醒
- 如果堆顶任务执行时间小于当前时间,则返回堆顶任务
- 如果leader为空,则将当前线程设置为leader,并等待至堆顶任务执行时间
- 如果leader已存在,则进入wait状态,等待被唤醒。
|
|
poll
poll的功能和take相似,入参多了一个timeout,如果在timeout时间内获取不到任务则直接返回null
|
|
offer
offer是DelayQueue底层往任务列表中新增一个任务的函数
|
|
新增任务
ScheduledThreadPoolExecutor支持三种新增任务的方式,新增普通延迟任务,新增固定频率执行任务,新增固定频率执行的延迟任务。
- 通过schedule函数直接新增一条延迟任务
- 通过scheduleAtFixedRate新增一条按固定频率执行的任务
- 通过scheduleWithFixedDelay新增一条固定频率执行的延迟任务
ScheduledThreadPoolExecutor是如何实现定时任务和延迟任务的呢?由上面可知ScheduledThreadPoolExecutor重新封装了task也就是ScheduledFutureTask,而定时和延迟任务的执行就在ScheduledFutureTask的run中完成的。任务下次执行时间:
非周期循环任务,无下次执行时间
定时周期任务:上次执行时间+延迟时间
延迟周期任务:当前时间+延迟时间
|
|