DelayQueue延迟队列

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.delayqueue.demo.entity;
import lombok.Data;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author luoyangjie
* @version 1.0
* @date 2021/2/3 15:17
*/
@Data
public class Order implements Delayed {
/**
* 订单号
*/
private String orderNo;
/**
* 用户id
*/
private String userId;
/**
* 订单状态(0待领取,1已领取,2已勾销)
*/
private Integer status;
/**
* 订单创立时间
*/
private Date createTime;
/**
* 订单失效时间
*/
private Date cancelTime;
public Order(String orderNo, String userId, Integer status, Date createTime, Date cancelTime) {
this.orderNo = orderNo;
this.userId = userId;
this.status = status;
this.createTime = createTime;
this.cancelTime = cancelTime;
}
@Override
public String toString() {
return "Order{" +
"orderNo='" + orderNo + '\'' +
", userId='" + userId + '\'' +
", status=" + status +
", createTime=" + createTime +
", cancelTime=" + cancelTime +
'}';
}
/**
* 取得延迟时间,用失效时间-当前时间,时间单位须要对立
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
//上面用到unit.convert()办法,其实在这个小场景不须要用到,只是学习一下如何应用罢了
return unit.convert(cancelTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 用于提早队列外部比拟排序,以后工夫的延迟时间 - 比拟对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
//这里依据勾销工夫来比拟,如果勾销所剩时间小的,就会优先被队列提取进去
//注意延迟时间 的绑定就是这绑定的属性
return this.getCancelTime().compareTo(((Order) o).getCancelTime());
}
}

SERVICE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 提早队列,用来寄存订单对象
*/
DelayQueue<Order> queue = new DelayQueue<>();

@Resource
private ThreadPoolTaskExecutor executorService;

@Override
public void cancelOrder() {
//新建一个线程,用来模仿定时排查取消过期订单job
executorService.submit(()->{
try {
System.out.println("***********开启主动取消订单job***********,时间:" + DateUtil.date());
while (isOpen == 1) {
try {
Order order = queue.take();
order.setStatus(2);
System.out.println("订单:" + order.getOrderNo() + "付款超时,主动勾销,当前时间:" + DateUtil.date());
System.out.println("当前订单延迟队列数量:"+queue.size());
System.out.println(queue.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
}

ScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

@GetMapping("/testDeferredResult")
@ResponseBody
public DeferredResult<String> test1() {
DeferredResult<String> deferredResult = new DeferredResult<>();
scheduledExecutor.schedule(() -> {
deferredResult.setResult("deferred result");
}, 10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "执行结束");
return deferredResult;
}

其它

1
2
3
4
5
6
7
8
9
10
11
12
13
@Scheduled(fixedDelayString = "${time.fixedDelay}")

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

@Scheduled(fixedDelay = 5000)
public void scheduleTaskWithFixedDelay() {
executorService.schedule(new Runnable() {
@Override
public void run() {
// 任务内容
}
}, 10, TimeUnit.SECONDS);
}