李磊的笔记本

纸上得来终觉浅,绝知此事要躬行。

0%

DelayQueue有趣的延迟队列

简介

原文介绍:

1
2
3
4
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.
This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces. The Iterator provided in method iterator() is not guaranteed to traverse the elements of the DelayQueue in any particular order.
This class is a member of the Java Collections Framework.

简单翻译

DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。
队列的头部,是延迟期满后保存时间最长的delay元素。

使用场景

订单定时取消,我们原来有这样一个需求:一个订单提交后在三十分钟内未支付就取消该订单。在订单少的时候采用的方法是一个定时调度任务每隔一分钟就扫描一次数据库(一分钟能接受),查找创建时间大于当前时间加三十分钟且未支付的订单,订单少任务还能处理,订单一旦多起来,有些订单的取消时间就不止三十分钟,这就不能接受了。尤其是在抢货时,延迟就更加严重。这时候DelayQueue就能排上用场

其他场景:定时调度,key失效(如缓存)

DelayQueue的使用条件

  • 存放DelayQueue的元素,必须继承Delay接口,Delay接口使对象成为延迟对象。
    1
    2
    3
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    }
  • 该接口强制实现两个方法:
    1.CompareTo(Delayed o):用于比较延时,队列里元素的排序依据,这个是Comparable接口的方法,因为Delay实现了Comparable接口,所以需要实现。
    2.getDelay(TimeUnit unit):这个接口返回到激活日期的–剩余时间,时间单位由单位参数指定。
    1
    2
    3
    4
    5
    6
    public interface Delayed extends Comparable<Delayed> {
    /**
    * 剩余时间,小于等于0就表示该元素已过时
    */
    long getDelay(TimeUnit unit);
    }
  • 队列中不允许使用null元素。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package io.github.mylyed.h1.base;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 延时队列测试
*/
@Slf4j
public class DelayedQueneTest {

public static void main(String[] args) {

DelayQueue<Order> queue = new DelayQueue<>();
log.debug("start time:{}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));

AtomicInteger atomicInteger = new AtomicInteger(0);
//添加订单
Runnable producers = () -> {
while (true) {
try {
String orderNum = String.valueOf(atomicInteger.incrementAndGet());
Integer time = RandomUtils.nextInt(1, 7);
log.debug("add order => orderNum:{},time:{} SECONDS", orderNum, time);
queue.put(new Order(orderNum, time, TimeUnit.SECONDS));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

new Thread(producers, "producers-1").start();
new Thread(producers, "producers-2").start();
//消费
Runnable consumers = () -> {
while (true) {
try {
Order take = queue.take();
log.debug("orderNum:{}, time:{}", take.getOrderNum(), LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
} catch (InterruptedException e) {
e.printStackTrace();
}

}
};
new Thread(consumers, "consumers-1").start();
new Thread(consumers, "consumers-2").start();
}

}

@Slf4j
@Data
class Order implements Delayed {
/**
* 触发时间 单位纳秒
*/
private long triggerTime;
/**
* 订单号
*/
private String orderNum;

public Order(String orderNum, long time, TimeUnit timeUnit) {
this.triggerTime = System.currentTimeMillis() + (time > 0 ? timeUnit.toMillis(time) : 0);
this.orderNum = orderNum;
}

@Override
public long getDelay(TimeUnit unit) {
//剩余延迟时间
return triggerTime - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
//根据触发事件排序
Order order = (Order) o;
long diff = this.triggerTime - order.triggerTime;
return (int) diff;
}

}

部分运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
23:52:08.795 [main] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - start time:2020-04-08T23:52:08.784
23:52:08.801 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:2,time:2 SECONDS
23:52:08.801 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:1,time:1 SECONDS
23:52:09.806 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:4,time:3 SECONDS
23:52:09.806 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:3,time:4 SECONDS
23:52:09.806 [consumers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:1, time:2020-04-08T23:52:09.806
23:52:10.806 [consumers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:2, time:2020-04-08T23:52:10.806
23:52:10.807 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:6,time:6 SECONDS
23:52:10.807 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:5,time:4 SECONDS
23:52:11.807 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:8,time:1 SECONDS
23:52:11.807 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:7,time:4 SECONDS
23:52:12.806 [consumers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:4, time:2020-04-08T23:52:12.806
23:52:12.807 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:10,time:6 SECONDS
23:52:12.807 [consumers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:8, time:2020-04-08T23:52:12.807
23:52:12.807 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:9,time:4 SECONDS
23:52:13.806 [consumers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:3, time:2020-04-08T23:52:13.806
23:52:13.807 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:11,time:2 SECONDS
23:52:13.807 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:12,time:2 SECONDS
23:52:14.807 [consumers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - orderNum:5, time:2020-04-08T23:52:14.807
23:52:14.808 [producers-1] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:14,time:3 SECONDS
23:52:14.808 [producers-2] DEBUG i.g.mylyed.h1.base.DelayedQueneTest - add order => orderNum:13,time:5 SECONDS

源码详解

TODO

题外话

说来惭愧,许久没有写点东西了。