定时器算法

概述

在日常开发中, 定时任务是一个比较关键的功能。 Java 中一般使用 JDK 中 TimerScheduledExecutorService 和调度框架 Quartz等。 通常用于实现延时任务, 周期性任务等, 一般会有两种需求:

  1. 轮询定时任务:给定周期内扫描所有记录,查看是否有满足要求的数据。
  2. 延时消息:如常见的订单业务, 订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消

轮询的方式在数据量大的时候性能会比较差, 通常我们会选择第二种方式。

Timer

Timer 调度任务有一次性调度和循环调度,循环调度有分为:

  • 固定速率调度(fixRate)
  • 固定时延调度(fixDelay):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Timer timer = new Timer();
    TimerTask timerTask = new TimerTask() {
    @Override
    public void run() {
    System.out.println(LocalDateTime.now());
    }
    };
    // 延迟 1s 打印(只打印一次)
    timer.schedule(timerTask, 1000);
    // 周期性每隔 1s 打印一次
    timer.schedule(timerTask, 1000, 1000);

内部原理

Timer 类里包含一个任务队列和一个异步轮训线程。

  1. 任务队列是一个以下次执行时间比较的最小堆
  2. 任务队列里容纳了所有待执行的任务,所有的任务将会在这一个异步线程里执行,任务执行中代码不能抛出异常,否则会导致 Timer 线程挂掉,所有的任务都无法执行了。
  3. 单个任务也不易执行时间太长,否则会影响任务调度在时间上的精准性。比如你一个任务跑了太久,其它等着调度的任务就一直处于饥饿状态得不到调度。所有任务的执行都是这单一的 TimerThread 线程。

Timer 类:

1
2
3
4
5
6
7
8
9
10
11
12
class Timer {
// 任务队列, 任务通过 Timer.schedule 方法将任务加入 TaskQueue
// taskQueue 是数组实现的一个最小堆
TaskQueue queue = new TaskQueue();
// 执行任务的 timer 线程
TimerThread thread = new TimerThread(queue);
}

class TaskQueue {
TimerTask[] queue = new TimerTask[128];
int size;
}

任务调度过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 调度任务
private void sched(TimerTask task, long time, long period) {
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
queue.add(task);
if (queue.getMin() == task)
queue.notify();
}


// new Timer 的时候 内部的 TimerThread 已经开始 run 调用 mainLoop 了
// 运行任务
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// 队列是空并且还有其他任务, 则 wait
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
// 如果队列没有任务, 直接结束
if (queue.isEmpty())
break;
// taskQueue 按照 nextExecutionTime 进行堆排序, 取出最小的(应该执行的)任务
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
// 如果没有设置执行间隔, 就移除该任务
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else {
// 否则加入队列等待下一次执行(这里是修改下一次任务的执行时间)
// 计算下一次调度时间
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}

注意:

  1. Timer 只能单线程调度
  2. TimerTask 中出现的异常会影响到 Timer 的执行

ScheduledThreadPoolExecutor

  • schedule 提交一个一次性触发的任务, 在给定延时后执行
  • ScheduleAtFixedRate 是基于固定时间间隔进行任务调度
  • ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔的任务调度

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

//提交一个一次性触发的Runnable任务,在给定时延后执行。
public ScheduledFuture<?> schedule(
Runnable command,
long delay,
TimeUnit unit
);

//提交一个定期重复的Runnable任务,在initialDelay时间后第一次执行,此后每隔period时间执行一次。如果某次执行抛出异常,之后所有任务取消执行。
// 如果一次执行时间过长,完成时已经超过下次执行开始时间,下一次执行会等待上一次执行结束后马上执行
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit
);


// 跟 scheduleAtFixRate 类似
// 区别是在上一次执行结束之后,再等待delay时间,然后再执行下一次任务。
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit
);

ScheduledThreadPoolExecutor 执行原理

无论是 scheduleAtFixedRate 还是 scheduleWithFixedDelay 都会把任务包装成 ScheduledFutureTask, 然后调用delayedExecute(RunnableScheduledFuture<?> task 处理,
这里以scheduleAtFixedRate为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 包装成 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 处理任务
delayedExecute(t);
return t;
}

delayedExecute 逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//如果线程池已经关闭,拒绝该任务
reject(task);
else {
//将任务加入队列并再次检查线程池状态,不可运行则取消任务
super.getQueue().add(task);
//如果任务刚入队后线程池关闭了且不允许执行任务,
//从任务队列移除该任务并取消之
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保证核心线程运行任务
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//如果worker数量小于corePoolSize,新建Worker
//但是不设置firstTask而是让Worker从延迟队列获取任务
addWorker(null, true);
else if (wc == 0)
//保证至少有一个Worker在运行
addWorker(null, false);
}

可以看到以上代码并没有包含任何控制或响应延时的代码,因此这些逻辑应该是由延迟队列本身来控制的,这样就可以直接使用继承自ThreadPoolExecutor的方法完成其他相同的部分, 构造函数显示队列类型是DelayedWorkQueue

那我们回到加入队列的任务 ScheduledFutureTask 的 run 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void run() {
//根据periodic是否为0判断是否是周期性任务
boolean periodic = isPeriodic();
//判断为非可执行任务状态时,取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
//不是周期性任务,直接执行
else if (!periodic)
ScheduledFutureTask.super.run();
//是周期性任务runAndReset方法会执行在执行结束时将任务的状态重置为NEW,便于下次再次执行
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下周执行的时间
setNextRunTime();
//再次执行周期行任务
reExecutePeriodic(outerTask);
}
}

private void setNextRunTime() {
long p = period;
// 这里就是 fixRate 和 fixDelay 的区别, fixRate preiod > 0, 以固定频率执行(上次任务时间+执行周期)
if (p > 0)
time += p;
// 固定延迟, 则是当前系统时间+执行周期
else
time = triggerTime(-p);
}

long triggerTime(long delay) {
// 返回当前系统时间加执行周期
return now() +
// 这里是为了防止溢出
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}


void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//将任务重新入队
super.getQueue().add(task);
//如果线程池已关闭,将任务取消
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();//再次执行任务
}
}

TimeWheel 时间轮算法

TimeWheel时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在Netty,Zookeeper,Kafka等各种框架中, 应用场景广泛。

比如在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。

在Dubbo最开始的实现中,是将所有的返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的future,逐个判断是否超时。

这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判断操作。

对于以上问题, 目的就是要减少额外扫描的次数,这样能减少 CPU 的开销, 时间轮可以很好的解决这个问题

时间轮介绍

单时间轮

单时间轮只有一个由bucket串起来的轮子,每个bucket下链接着未来对应时刻到期的节点。

假设相邻bucket到期时间的间隔为slot=1s,从当前时刻0s开始计时,1s时到期的定时器节点挂在bucket[1]下,2s时到期的定时器节点挂在bucket[2]下……

当tick检查到时间过去了1s时,bucket[1]下所有节点执行超时动作,当时间到了2s时,bucket[2]下所有节点执行超时动作……

上图只有 8 个 bucket, 如果按照 slot=expire 来算, 只能挂 8s 的定时任务, 超过 8s 可以使用 slot = expire % N, 这里需要引入 rotation 的概念,定时器中expire表示到期时间,rotation表示节点在时间轮转了几圈后才到期

多时间轮

)

Linux的多时间轮算法,借鉴了日常生活中水表的度量方法,通过低刻度走得快的轮子带动高一级刻度轮子走动的方法,达到了仅使用较少刻度即可表示很大范围度量值的效果

netty HashedWheelTimer 源码分析

HashedWheelTimer 是接口 io.netty.util.Timer 的实现:

1
2
3
4
5
6
7
8
public interface Timer {

// 创建一个定时任务
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

// 停止所有的还没有被执行的定时任务
Set<Timeout> stop();
}

Timeout 是一个接口类, TimerTask 非常简单,就一个 run() 方法:

1
2
3
4
5
6
7
8
9
10
11
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}

public interface Timeout {
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}

构造函数太多,摘要一下几个重要的参数:

  • tickDuration 和 timeUnit 定义了一格的时间长度,默认的就是 100ms。
  • ticksPerWheel 定义了一圈有多少格,默认的就是 512;
  • leakDetection:用于追踪内存泄漏
  • maxPendingTimeouts:最大允许等待的 Timeout 实例数,也就是我们可以设置不允许太多的任务等待。如果未执行任务数达到阈值,那么再次提交任务会抛出 RejectedExecutionException 异常。默认不限制。

HashedWheelTimer 提交任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}

// 校验等待任务数是否达到阈值 maxPendingTimeouts
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}

// 如果工作线程没有启动,这里负责启动(如果是第一个任务, 还会 await 在这里)
start();

/** 下面的代码,构建 Timeout 实例,将其放到任务队列中。 **/

// deadline 是一个相对时间,相对于 HashedWheelTimer 的启动时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// 防止 deadline 溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// timeout 实例,一个上层依赖 timer,一个下层依赖 task,另一个是任务到期时间
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 放到 timeouts 队列中
timeouts.add(timeout);
return timeout;
}

这里的操作很简单:实例化 Timeout,然后放到任务队列中。需要注意的是,任务队列是 MPSC(Multiple Producer Single Consumer)队列, 刚好适用于这里的多生产线程,单消费线程的场景(这个队列是 JCTools 提供的一个并发数据结构)

Worker 线程工作原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private final class Worker implements Runnable {

private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

// tick 过的次数,时针每 100ms tick 一次
private long tick;

@Override
public void run() {
// 在 HashedWheelTimer 中,用的都是相对时间,所以需要启动时间作为基准,并且要用 volatile 修饰
// 这里跟 newTimeout 中 start 的部分结合理解一下, start 希望工作线程真正启动之后, 并赋值完 startTime 才返回, 因为后面的逻辑需要一个正确的 startTime
// 所以在 start 方法中使用 while(startTime ==0 ) startTimeInitialized.awit 这样做保证
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}

// 唤醒工作线程启动的时候提交的任务(在提交任务的start里面 await 的)
startTimeInitialized.countDown();

// 执行逻辑的地方
do {
// 等待并获取下个 tick 相对于开始时间的纳秒数
final long deadline = waitForNextTick();

if (deadline > 0) {
// 该次 tick,bucket 数组对应的 index
int idx = (int) (tick & mask);

// 处理一下已经取消的任务
processCancelledTasks();

// 取得当前 bucket
HashedWheelBucket bucket = wheel[idx];

// 将队列中所有的任务转移到相应的 buckets 中
transferTimeoutsToBuckets();

// 执行进入到这个 bucket 中的任务
bucket.expireTimeouts(deadline);

tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

/* 到这里,说明这个 timer 要关闭了,做一些清理工作 */

// 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,
// 主要目的是用于 stop() 方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将任务队列中的任务也添加到 unprocessedTimeouts 中
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}

在细看一下 run 流程中几个重要的方法, waitForNextTick 在每个 tick 到期返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

/**
* 前面说过,这里用的都是相对时间,所以:
* 第一次进来的时候,工作线程会在 100ms 的时候返回,返回值是 100*10^6 (纳秒)
* 第二次进来的时候,工作线程会在 200ms 的时候返回,依次类推
* 另外就是注意极端情况,比如第二次进来的时候,由于被前面的任务阻塞,导致进来的时候就已经是 250ms,
* 那么,一进入这个方法就要立即返回,返回值是 250ms,而不是 200ms
*/
private long waitForNextTick() {
// 下一个 tick 的毫秒数
long deadline = tickDuration * (tick + 1);

// 嵌套在一个死循环里面
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
// 这里可能是 HashedWheelTimer 启动到现在经过了太久, currentTime 已经溢出了
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
// 这里是出口,所以返回值是当前时间(相对时间)
return currentTime;
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 如果 timer 已经 shutdown,那么返回 Long.MIN_VALUE
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

接着看看转移队列到 bucket 中的方法 transferTimeoutsToBuckets:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void transferTimeoutsToBuckets() {
// 这里一个 for 循环,还特地限制了 10 万次, 怕一直往队列里面仍任务, netty 的源码还是很细节的
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 没有任务了
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// 该任务刚刚被取消了(在transfer之前其实已经做过一次了)
continue;
}


// 计算还需要等待几个 tick
long calculated = timeout.deadline / tickDuration;
// 计算出轮次
timeout.remainingRounds = (calculated - tick) / wheel.length;

final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
// 单个 bucket,是由 HashedWheelTimeout 实例组成的一个链表,
// 放入 bucket 链表
bucket.addTimeout(timeout);
}
}

bucket 中任务的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 这里会执行这个 bucket 中,轮次为 0 的任务,也就是到期的任务。
* 这个方法的入参 deadline 其实没什么用,因为轮次为 0 的都是应该被执行的。
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// 处理链表上的所有 timeout 实例
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 这行代码负责执行具体的任务
timeout.expire();
} else {
// 这里的代码注释也说,不可能进入到这个分支
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 轮次减 1
timeout.remainingRounds --;
}
timeout = next;
}
}

总的来说 HashedWheelTimer 工作原理如下:

参考资料


定时器算法
https://haobin.work/2021/06/22/算法/定时器算法/
作者
Leo Hao
发布于
2021年6月22日
许可协议