1.简介
DelayQueue同样也是适用于并发环境下的容器之一,该容器属于阻塞队列的一种,其底层数据结构是PriorityQueue,主要应用于执行定时任务和缓存过期删除的场景。
DelayQueue也是线程安全的,它通过内部的ReentrantLock实现了线程间的互斥访问。
DelayQueue要求其内部元素必须实现Delayed接口,并重写getDelay方法。默认情况下,DelayQueue会按照元素的到期时间进行升序排列,且仅当元素到期(getDelay() <= 0)时,才能取出该元素。
2.实现原理
- 为了实现延时的语义,其内部采用
PriorityQueue进行延时任务的存储与管理,通过强制每个元素实现Delayed接口以实现getDelay和compareTo方法,从而实现了延时低的任务优先被执行的目标。 - 内部设置了一把
ReentrantLock,从而实现了进程间的互斥访问,线程安全性得到保证。 - 内部还配有
Condition,通过await和signal方法来完成多线程间的唤醒与等待。
3.源码分析
3.1 类定义
")
DelayQueue继承自AbstractQueue,因此具有一些基本的队列的增删改查的模板操作;
同时,它还实现了BlockQueue接口,因此该队列将具备阻塞队列的一些性质。
此外,我们还注意到,DelayQueue的泛型参数必须为Delayed接口的子类,这也印证了我们上面所说的一点:DelayQueue的元素必须强制实现Delayed接口。
同时,Delayed接口又继承了Comparable接口,因此,DelayQueue中的元素天然具有可排序的特性。
Q:为什么要强制队列元素实现
Delayed接口?
A:
实现Delayed接口,具体来讲是实现其要求的getDelay和compareTo这两个方法。
- 实现
getDelay方法:用于指示延时任务的剩余延迟时间,作为执行线程衡量该延时任务目前能不能够被执行的标准。- 实现
compareTo方法:用于作为内部PriorityQueue的排序标准,用于比较任务的优先级,以编排好当前延时任务队列的正确的执行顺序。
")
3.2 初始化
")
构造方法逻辑很简单,对于有参构造函数,它将调用addAll方法,而在addAll中,待要添加的集合校验通过后,遍历整个集合并挨个儿调用add方法,而在add方法中又调用了offer方法,这个我们待会细说,最终如果一切没问题的话,那么modified将被置为true并返回,否则,只要有一个元素添加失败就会抛异常(往往是由于队列满导致的)。
调用链:有参构造 --> addAll --> 遍历调用add --> offer。
3.3 关键字段说明
")
- q:
DelayQueue的核心,也是其底层数据结构,用于存放延时任务,后续任务的添加与执行都是要靠它来完成的。同时它作为一个优先级队列,会将任务按照其延时时间(通过重写Delayed接口的父接口Comparable中的compareTo方法)进行升序排序。 - leader:执行延时任务的唯一线程,以防止多个线程争抢而使任务执行效率降低。(相当于领导者-追随者模式中的领导者)
- available:唤醒因到来的时候队列为空而等待或者到来时已经有其他线程在处理任务而等待的线程或者满足这些条件而阻塞,实现了工作线程间的阻塞与唤醒。
3.4 添加
元素的方法(JDK17)")
你会发现,不论是add方法还是put方法,都是直接调用的offer方法,因此,这里我们只分析offer的执行流程:
- 加锁。
- 调用内部
PriorityQueue上的offer方法添加元素。 - 判断添加了该元素后的
PriorityQueue中的头部元素是不是当前添加进入的元素(即当前所添加的元素是不是延时最短的那个),若是,则置leader线程为null,同时唤醒阻塞在available条件上的线程;否则,不做任何处理。 - 释放锁。
3.5 删除(执行延时任务)
")
**poll**方法的执行流程:
- 加锁。
- 调用
PriorityQueue的peek方法,试探性的取一下队头元素。 - 若发现队头元素(延时任务)为
null或者还没到达预定的延时时间,则返回null;否则,调用poll方法将该元素出队并返回。 - 释放锁。
**take**方法的执行流程:
- 加锁。
- 试探性的取到队列的头部元素:
- 如果头部元素为
null,则说明队列目前没有延时任务可供消费,因此阻塞在available条件上。 - 若头部元素不为
null:- 如果该元素已到达或者超过延时时间,则调用
poll方法出队并返回。 - 若还未到达延时时间,则查看一下当前领导者线程是否为空:
- 若不为空,则说明目前已经有线程正在处理延时任务,因此我们需要阻塞等待,因此在
available条件上阻塞。 - 若为空,则令当前线程作为领导者线程,然后阻塞等待至预定的延时执行时间,等待一段时间后,再次将主线程置为
null,然后进入到for循环的下一次循环,进入到b -> i分支,处理延时任务(这也是为什么设置for循环的原因)。
- 若不为空,则说明目前已经有线程正在处理延时任务,因此我们需要阻塞等待,因此在
- 如果该元素已到达或者超过延时时间,则调用
- 如果头部元素为
- 最后的最后,如果领导者线程为
null并且队列中还有延时任务,则随机唤醒阻塞在available条件上的线程以进行下一个延时任务的等待&处理。 - 释放锁。
上述过程其实用到了一个设计模式:领导者-追随者模式。
通过过程分析,不难发现,poll是非阻塞式删除,take是阻塞式删除。
3.6 获取
")
执行流程:
- 加锁。
- 调用
PriorityQueue的peek方法,然后直接返回该元素。 - 释放锁。
3.7 获取元素数量
")
执行流程:
- 加锁。
- 调用
PriorityQueue的size方法并返回。 - 释放锁。
4.评估
如果线程数量过少,且处理的任务耗时较长,而后续延时任务的到期时间相对集中,那么可能会使得后面的延时任务出现延期处理的情况。
线程数量过多也不见得是一件好事,因为这将在线程调度、同步方面花费更多的时间。
(当然,DelayQueue的工作线程的数量固定为1,上面是针对[领导者-追随者模式](https://www.yuque.com/jujingyi-mzjzr/ybr4gh/ix3ifmtyb3rfllxf) + DelayQueue而言的)
参考文档
DelayQueue 源码分析
Java 延迟队列 DelayQueue 的原理



















