文章目录
- 前言
- 一 理论基础
- 1.1 小顶堆(平衡二叉堆)
- 1.2 小顶堆的存取方式
- 1.2.1 插入顶堆元素
- 1.2.2 删除顶堆元素
- 1.3 任务与小顶堆
- 1.3 时间轮算法
- 二 Spring Boot集成JDK定时任务
- 2.1 TaskQueue源码分析
- 2.2 TimerThread源码分析
- 2.2.1 Timer构造器
- 2.2.2 Timer类中的执行方法
- 2.2.3 TimerTask源码分析
- 2.4 ScheduledExecutorService源码分析
- 2.4.1 ScheduledThreadPoolExecutor构造器
- 2.4.2 ScheduledExecutorService接口
- 2.4.3 DelayedWorkQueue【部分】
- 2.4.4 ScheduledFutureTask【部分】
- 2.4.5 getDelay&compareTo
- 2.6 案例
- 2.7 总结
前言
- 本来准备简单入门一下,但是看到网上的资源并不是很完整,所以,自己动手学习总结了一下!
一 理论基础
1.1 小顶堆(平衡二叉堆)
- 堆是一种特殊的树,满足下面的条件:
- 堆是一颗完全二叉树
- 堆中某个节点的值总是不大于(或不小于)其父节点的值
- 其中,根节点最大的堆叫做大顶堆,根节点最小的堆叫做小顶堆
- 满二叉树:所有层都达到最大结点数1、2、4、8
- 完全二叉树:除了最后一层外其他层都达到最大结点数,且最后一层结点都靠左排列1、2、4、2
- 完全二叉树最适合用数组做存储,因为它的节点都是紧凌的,且只有最后一层节点数不满
- 下标 0 0 0的位置不存在元素的解释:这是因为这样存储我们可以很方便地找到父节点: 下标 / 2 下标/2 下标/2,比如, 4 4 4的父节点即 4 / 2 = 2 4/2=2 4/2=2; 5 5 5的父节点即 5 / 2 = 2 5/2=2 5/2=2
1.2 小顶堆的存取方式
- 数组存储结构的优势:采用数组,堆的结点父子关系明确,可以简单快捷地找到指定结点的父节点或子节点【方便后面的堆化】
1.2.1 插入顶堆元素
- 插入尾部,然后上浮
- 往堆中插入一个元素后,需要继续满足堆的两个特性,即:
- 堆是一颗完全二叉树
- 堆中某个节点的值总是不大于(或不小于)其父节点的值
- 为了满足
条件
1
条件1
条件1,要把元素插入到最后一层最后一个节点往后一位的位置,但是插入之后可能不再满足
条件
2
条件2
条件2了,所以这时候需要堆化
1.2.2 删除顶堆元素
- 将尾部元素放到堆顶(最大的元素),然后下沉
- 小顶堆中堆顶存储的是最小的元素。
- 删除了堆顶元素后,要使得还满足堆的两个特性,首先,我们可以把最后一个元素移到根节点的位置,这时候就满足
条件
1
条件1
条件1,之后就是使它满足
条件
2
条件2
条件2,就需要堆化了
1.3 任务与小顶堆
- 在java中每个任务都是放在小顶堆的结点中的,结点的值即【任务的到期时间】,所以会首先取出堆顶的元素。
1.3 时间轮算法
- 顶堆结构存在的缺陷:
- 当数据量过大时,下沉或上浮操作降低性能,损耗时间
- 结点的值比对的过程,存在冗余比对操作
- 当时间维度较大时,堆的结构复杂
- 时间轮记录的缺陷:因为轮中通常设计12个刻度,导致无法区分 1 12 1~12 1 12与 13 24 13~24 13 24的时间段,所以使用round时间轮一个round变量初始化为1,当第一次遍历时间点时,时间段为 1 12 1~12 1 12,将round减1,当时间段为 12 24 12~24 12 24遍历到round值为0的时间点时,执行任务。
- round时间轮的缺陷:需要遍历所有任务,效率低下。经过改进之后,使用 分层时间轮 分层时间轮 分层时间轮
- 分层时间轮:使用多个不同时间维度的轮
二 Spring Boot集成JDK定时任务
- Timer是JDK提供的java.util包中的一个定时器工具,其内部维护着TimerThread和TaskQueue两个对象,当程序启动后,会在主线程之外另起一个线程来执行计划任务。
- 在Timer定时任务中,主要用两个类Timer和TimerTask,可以被看成是定时器和可以被定时器重复执行的任务
- 在开发时,只需要继承TimerTask类重写 run()方法,实现业务逻辑,然后通过Timer设置时间,执行TimerTask任务即可。
import java.time.LocalDateTime;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author 缘友一世
* date 2023/3/7-13:39
*/
public class MyTriggerTask {
public static void main(String[] args) {
//TimerTask 定时器任务
MyTask myTask = new MyTask();
//定时器
Timer timer = new Timer();
//执行方法:延迟3秒后第一次执行,以后每1秒执行1次
timer.schedule(myTask,3000,1000);
}
static class MyTask extends TimerTask{
@Override
public void run() {
System.out.println("MyTask正在执行的时间"+LocalDateTime.now().toLocalTime());
}
}
}
- 进入Timer源码进行分析,我们首先可以看到两个属性
//定时器任务队列
private final TaskQueue queue = new TaskQueue();
//定时器线程
private final TimerThread thread = new TimerThread(queue);
- TaskQueue类表示计时任务队列,用来存放TimerTask和管理任务队列。核心是封装一个以小顶堆为数据结构的优先级队列,该队列按照nextExecutionTime的大小进行排序,其值越小,那么在堆中的位置就越靠近堆顶,越有可能被执行。
- nextExecutionTime:TimerTask中属性,表示下一次开始执行的时间。
2.1 TaskQueue源码分析
- 以下是TaskQueue的源码实现,深入探究列表的增、删、改、查和重新排列的逻辑。
class TaskQueue {
//TimerTask数组,默认为128个
private TimerTask[] queue = new TimerTask[128];
//表示优先级队列中的任务数
private int size = 0;
/**
* Returns the number of tasks currently on the queue.
*/
int size() {
return size;
}
//添加一个新的任务到优先队列中
void add(TimerTask task) {
// 如果队列已满,则进行扩容
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);
//然后将任务放入队列
queue[++size] = task;
//队列排序方法,向上提升新任务在队列中的位置
fixUp(size);
}
//返回优先队列中的根节点【头部任务】
TimerTask getMin() {
return queue[1];
}
//返回第i个任务
TimerTask get(int i) {
return queue[i];
}
//从优先队列中移除头部任务,然后从根节点向下"降级"
void removeMin() {
queue[1] = queue[size];
queue[size--] = null; // Drop extra reference to prevent memory leak
fixDown(1);
}
//快速删除第i个任务,不考虑保持堆不变,不用调整队列
void quickRemove(int i) {
assert i <= size;
queue[i] = queue[size];
queue[size--] = null; // Drop extra ref to prevent memory leak
}
/**
* Sets the nextExecutionTime associated with the head task to the
* specified value, and adjusts priority queue accordingly.
*/
//设置与头部任务关联的nextExecutionTime为指定值,并调整优先队列
void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}
//判断当前队列是否为空
boolean isEmpty() {
return size==0;
}
//从优先队列中移除所有元素
void clear() {
// Null out task references to prevent memory leak
for (int i=1; i<=size; i++)
queue[i] = null;
size = 0;
}
//将queue[k]向上‘提升’,直到queue[k]的nextExecutionTime大于或等于父级的执行时间
private void fixUp(int k) {
while (k > 1) {
int j = k >> 1;
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
//将queue[k]向下‘降级’,直到queue[k]的nextExecutionTime小于或等于子级的执行时间
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
//将整个堆重新排列,并将最小元素排在堆顶
void heapify() {
for (int i = size/2; i >= 1; i--)
fixDown(i);
}
}
2.2 TimerThread源码分析
- TimerThread类是用于执行TimeTask的线程,不仅可以启动定时任务并重复执行,而且还可以从队列中删除取消的任务及只允许执行一次的非重复任务。
- TimerThread的实现逻辑就是通过一个死循环来判断当前队列中最小的nextExecutionTime任务是否可运行。
class TimerThread extends Thread {
//用于标识当前Timer实例中是否存在任务需要调度
boolean newTasksMayBeScheduled = true;
//定时任务队列,避免被循环依赖
private TaskQueue queue;
//带参数构造器
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
//主循环调用
mainLoop();
} finally { //如果线程被杀死或者退出循环,则清空任务队列
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
//设置是否存在任务需要调度标识newTasksMayBeScheduled为false
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
//主定时器循环
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
//设置同步锁,锁定当前queue对象
synchronized(queue) {
// 判断当前队列是否为空,如果为空且没有新的调度任务,则线程进入等待状态
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // 如果队列为空,则跳出死循环
//当前时间 执行时间
long currentTime, executionTime;
//获取任务队列中的头部任务(小顶堆的根节点)
task = queue.getMin();
//设置同步锁锁定当前执行的任务
synchronized(task.lock) {
//他们当前任务,如果危险组状态,则从队列中移除
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // 有任何操作,作再次轮循队列
}
//获取当前时间戳,单位为ms
currentTime = System.currentTimeMillis();
//获取当前任务的执行时间,单位为ms
executionTime = task.nextExecutionTime;
//判断当前任务执行时间是否小于等于当前时间
if (taskFired = (executionTime<=currentTime)) {
/*
period:TimeTask类的属性
周期(以毫秒为单位)用于重复任务。
正值表示固定速率执行。
负值表示固定延迟执行。
0表示非重复任务。
*/
//如果当今任务只执行一次,则从队列中移除
if (task.period == 0) {
queue.removeMin();
//设置任务状态为已执行
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
//如果是重复任务则重新设置nextExecutionTime
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
//如果还会到执行时间则设置队列等待时间为executionTime - currentTime
if (!taskFired)
queue.wait(executionTime - currentTime);
}
//如果任务执行时间已到则在当前线程中运行该任务不需要持有锁
if (taskFired)
task.run();
} catch(InterruptedException e) {
}
}
}
}
- 由此,可以看出Timer方式的时间任务在调度设计方面的不足:
- 第一点,在线程中捕获的异常只是任务中断异常,如果在执行任务时内部抛出其他异常,当前线程就会被终止,而其他任务也无法执行,Timer定时器也会被终止并清空任务队列。
- 第二点,在Timer定时器中只有一个形成执行所有的任务,这样就存在一个任务正在执行或执行过长,其他任务执行时间即使到也不会立刻执行的情况。
- 第三点,绝对时间依赖于系统时间的设置,一旦系统时间调整了,则调度任务也会发生变化。
2.2.1 Timer构造器
//默认的构造器
public Timer() {
//以Timer+加序列号为该线程的名字
this("Timer-" + serialNumber());
}
//在构造器中指定是否为守护线程
public Timer(boolean isDaemon) {
this("Timer-" + serialNumber(), isDaemon);
}
//带有名字的构造器,可设置线程名
public Timer(String name) {
thread.setName(name);
thread.start();
}
//不仅可以设置线成名,还可以指定是否为守护线程
public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}
2.2.2 Timer类中的执行方法
//调度指定的任务在给定的延迟时间之后执行,只执行一次
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}
//调度指定的任务在指定时间内执行,如果指定的是当前时间,则调度任务立即执行
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}
/*
在指定的延迟时间之后执行,将重复延迟执行
在fixed-delay方式的执行中,每次执行都是相对于上一次执行的实际执行时间来计算的
如何执行过程中因任何原因(如垃圾回收或者其他后台活动而延迟),则后续任务的执行也将延迟
*/
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
//指定第一次执行时间,将重复延迟执行
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}
/*
在指定的延迟时间之后执行,将重复地一固定速率执行
在fixed-delay方式的执行中,每次执行都是相对于充实执行的时间来计算的
如果任务执行过程中因任何原因(如垃圾声音或者其他后台活动)而延迟,则两个或多个执行任务将快速、连续地发生,以追赶延迟的计划
固定资源的方式适用于对时间绝对敏感的重复性活动。例如每10秒钟执行一次的提示任务
*/
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}
//指定第一次执行时间,将以固定速率重复执行
public void scheduleAtFixedRate(TimerTask task, Date firstTime,long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), period);
}
/*
在指定时间段内的指定时间执行指定的定时任务以后,以毫秒为单位
如果period为非零,则重复调度任务;如果period为零,则只调度一次任务
*/
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
//设置同步锁,锁定当前的queue对象
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
//确实同步锁,锁定当前的任务
synchronized(task.lock) {
//判断任务是否被调用
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException("Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
//将任务添加到队列中
queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}
2.2.3 TimerTask源码分析
- TimerTask是一个抽象类,实现了Runnable接口,在内部定义了几种任务状态,分别是:未执行、已调度(非重复任务还未调度)、已执行(或正在执行)和已删除等,并且还提供了运行、删除、获取执行时间、的方法。
public abstract class TimerTask implements Runnable {
//对象锁,此对象用于控制对TimerTask内部的访问
final Object lock = new Object();
//当前任务状态,默认为VIRGIN
int state = VIRGIN;
//当前任务还未被执行
static final int VIRGIN = 0;
//当前任务已被调度,如果是非重复任务,则还未被调度
static final int SCHEDULED = 1;
//当前非重复任务已经执行或者正在执行,并且未被删除。
static final int EXECUTED = 2;
//当前任务已被删除
static final int CANCELLED = 3;
/*
当前任务的下一次执行时间,格式为System.currentTimeMillis()方法返回的毫秒数
如果是重复任务,则会在任务执行之前更新
*/
long nextExecutionTime;
/*
指定重复任务的间隔毫秒数,重复任务的周期
正值表示固定速率执行。
负值表示固定延迟执行。
0表示非重复任务。
*/
long period = 0;
//构造器
protected TimerTask() {
}
//该任务要实现的操作逻辑 我们在继承TimerTask后需要重写的方法
public abstract void run();
/*
取消此计时器任务,可重复调用,取消场景如下:
当前任务为所以重复任务,而且尚未运行或者还未被调度,则它将永远不会再次运行。
当前任务为重复任务则他正在进行,浙江等到任务执行完成后,永远不会再次运行。
@return true 如果该方法阻止一个或多个计划的执行发生,则返回true
*/
public boolean cancel() {
synchronized(lock) {
boolean result = (state == SCHEDULED);
state = CANCELLED;
return result;
}
}
//返回此任务最近一次实际执行的调度执行时间
public long scheduledExecutionTime() {
synchronized(lock) {
return (period < 0 ? nextExecutionTime + period : nextExecutionTime - period);
}
}
}
2.4 ScheduledExecutorService源码分析
- 从Java SE5开始,
java.util.concurrent
包中新增了ScheduledExecutorService
接口,该接口继承于ExecutorService
,因此支持线程池的所有功能。其默认的实现类是java.util.concurrent.ScheduledThreadPoolExecutor
2.4.1 ScheduledThreadPoolExecutor构造器
//给定核心线程池之大小,创建ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
//给定初始参数,创建ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//给定初始参数,创建ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
//给定初始参数,创建ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
2.4.2 ScheduledExecutorService接口
- ScheduledExecutorService接口中提供了4个方法,分别是:延迟执行延迟执行并返回执行结果,以固定的频率循环执行和以固定的延迟时间循环执行。
public interface ScheduledExecutorService extends ExecutorService {
//设置延迟时间的调度,只执行一次,调度之后可通过Future.get()阻塞直至任务执行完毕
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//设置延迟时间的调度,只执行一次,调度之后,阻塞直至任务执行完毕,并且可以返回执行结果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
/*
设置延迟时间的调度,以固定频率循环执行。
即在initialDelay初识延迟后,initialDelay+period执行第一次,initialDelay+2*period,执行第二次。
如果执行时间大于延迟时间,则间隔时间为任务执行时间。否则使用上面的公式计算间隔时间
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
/*
设置延迟时间的调度,以固定的延迟时间循环执行任务。
不管执行多长时间,都是一个执行任务的终止时间和下一个执行任务的开始时间之间的间隔时间再加上固定的延迟时间为任务的执行时间
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}
-
JDK提供的默认实现类
ScheduledThreadPoolExecutor
类同时继承了ThreadPoolExecutor
类,因此ScheduledThreadPoolExecutor
不仅可以延迟执行和周期性循环任务,还具有提交异步任务功能。 -
在
ScheduledThreadPoolExecuto
r类中有两个重要的内部类:ScheduledFutureTask
和DelayedWorkQueue
类。
2.4.3 DelayedWorkQueue【部分】
DelayedWorkQueue
类继承自AbstractQueue
类,并且实现了BlockingQueue
接口, 该类是基于最小堆结构的优先队列,而且是一个阻塞队列,在该类内部包含的成员变量中,有一个RunnableScheduledFuture
类型的队列,初始数量为16
。
//特有的延迟队列也是一个有序对列,通过每个任务按照距离下次执行时间的间隔长短来排序
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
//基于堆的数据结构,注意所有的对称作必须记录索引更改
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//主从线程设计,避免了不必要的等待时间,当线程池中的一个线程变成主线程时,它只等待下一个延迟时间,但是其他线程将无限期等待
private Thread leader = null;
//当队列顶部的新任务变为可用时,发出信号状态
private final Condition available = lock.newCondition();
//调整堆数组的大小
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
//将某个元素从队列中移除
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
//返回堆顶的第一个元素
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
//将第一个元素从队列中弹出,如果队列是空就返回null
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
2.4.4 ScheduledFutureTask【部分】
ScheduledFutureTask
类继承了FutureTask
类,并实现了RunnableScheduledFuture
接口,表示返回异步任务的结果。
/*
将任务封装为ScheduledFutureTask对象,基于相对时间,不因系统时间改变而受到影响
可以通过返回FutureTask对象来获取执行的结果
*/
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
//记录任务被添加到ScheduledThreadPoolExecutor中的序号
private final long sequenceNumber;
//以ns为单位指定下一次任务的执行时间
private long time;
//以ns为单位指定重复执行任务的执行周期
/*
正值:执行固定速率
负值:固定延迟执行
0:非重复任务及非周期性
*/
private final long period;
//由reExecutePeriodic重新排队的任务
RunnableScheduledFuture<V> outerTask = this;
//延迟队列中索引,支持快速取消任务
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
2.4.5 getDelay&compareTo
- 由上图可知,ScheduledThreadPoolExecutor要实现getDelay和compareTo方法,所以我们看一下相关源码
//返回距离下次任务执行时间的间隔
public long getDelay(TimeUnit unit) {
//计算距下次执行时间与当前系统时间的差值
return unit.convert(time - now(), NANOSECONDS);
}
//比较任务之间的优先级,如果距离下次执行的时间间隔较短,则表示优先级较高
public int compareTo(Delayed other) {
if (other == this) //同一个对象返回0
return 0;
//如果是ScheduledFutureTask类型,则强制转化后为变量time的差值
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//如果不是ScheduledFutureTask类型,则比较getDelay的返回值
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//覆盖FutureTask的run()方法
public void run() {
//判断是否为周期性任务
boolean periodic = isPeriodic();
//根据当前任务运行状态判断是否删除
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果是周期性任务,则调用父类的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//如果任务执行结束,则重置以备下次执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
//重新执行周期性任务
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
- 由于作者能力有限,有关ScheduledThreadPoolExecutor类包含的四个方法的实现逻辑
schedule
方法源码就不再进行详说。
2.6 案例
/**
* @author 缘友一世
* date 2023/3/7-13:39
*/
public class MyTriggerTask2 {
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
//立即执行,任务执行结束后每隔1秒执行一次,真正的执行时间是任务执行时间+间隔时间
executorService.scheduleWithFixedDelay(()->{
System.out.println("["+LocalDateTime.now()+"] 任务执行中...");},0,1, TimeUnit.SECONDS);
System.out.println("任务开始执行!");
}
}
2.7 总结
- 相较于Timer实现方式,ScheduledExecutorService实现方法弥补了Timer的几个缺陷:
- 通过多线程调用,单个任务抛出异常不会对其他任务产生影响
- 由于是多线程执行,某个任务执行过长不影响其他任务的执行时间
- 基于相对时间,不因系统时间的变化而受到影响
- Leader-Follower模式
- 假如说现在有一堆等待执行的任务(一般是存放在一个队列中排好序),而所有的工作线程中只会有一个是leader线程,其他的线程都是follower线程。只有leader线程能执行任务,而剩下的follower线程则不会执行任务,它们会处在休眠中的状态。当leader线程拿到任务、执行任务前,自己会变成follower线程,同时会选出一个新的leader线程,然后才去执行任务。如果此时有下一个任务,就是这个新的leader线程来执行了,并以此往复这个过程。
- 当之前那个执行任务的线程执行完毕再回来时,会判断如果此时已经没任务了,又或者有任务,但是有其他的线程作为leader线程,那么自己就休眠了;如果此时有任务但是没有leader线程,那么自己就会重新成为leader线程来执行任务。
- 避免没必要的唤醒和阻塞的操作,这样会更加有效,且节省资源