时间轮,是一种高效的、批量管理定时任务的调度模型 。我在《透彻理解Java网络编程》和《透彻理解Kafka》两个专栏中,分别介绍过Netty和Kafka中的时间轮算法实现。
在Dubbo中,对时间轮的应用主要体现在如下两个方面:
- 失败重试: 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等;
- 周期性定时任务: 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制。
本章,我将对Apache Dubbo中的时间轮算法进行讲解。在Dubbo中,实现时间轮算法的思路和Netty几乎是完全一样的:时间轮是一种环形结构,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务,指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。通过时间轮算法,可以将定时任务的存取操作以及取消操作的时间复杂度降为 O(1)
,非常适合海量定时任务的调度管理。
一、核心接口
关于时间轮算法的介绍我就不赘述了,本章我主要讲解Dubbo中的时间轮算法实现。Dubbo 的时间轮实现位于 dubbo-common
模块的 org.apache.dubbo.common.timer
包中:
一共四个包含四个核心类:
- Timer:时间轮调度器,该接口提供了两个核心方法:创建任务
newTimeout()
、停止所有未执行任务stop()
; - TimerTask:时间轮任务,所有的定时任务都要继承该接口;
- Timeout:与 TimerTask 对象是一对一的关系,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象。通过 Timeout 对象,不仅可以查看定时任务的状态,还可以取消定时任务;
- HashedWheelTimer:Timer接口的时间轮算法实现类。
1.1 Timer
Timer 接口定义了定时器的基本行为,如下所示:
public interface Timer {
/**
* 在指定的delay时间后,调度一个定时任务.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* 停止所有尚未执行的任务
*/
Set<Timeout> stop();
/**
* 判断当前Timer是否已经停止
*/
boolean isStop();
}
上述方法的核心是 newTimeout()
:提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,类似于向线程池提交任务返回一个Future对象。
1.2 TimerTask
TimerTask代表了一个定时任务,该接口非常简单,只定义了一个 run() 方法:
public interface TimerTask {
/**
* 在指定延时后,执行任务
*/
void run(Timeout timeout) throws Exception;
}
1.3 Timeout
Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对象,我们不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。Timeout 接口中的方法如下:
public interface Timeout {
/**
* 返回关联的Timer
*/
Timer timer();
/**
* 返回关联的TimerTask
*/
TimerTask task();
/**
* 判断关联的TimerTask是否已过期
*/
boolean isExpired();
/**
* 判断关联的TimerTask是否已取消
*/
boolean isCancelled();
/**
* 取消关联的TimerTask
*/
boolean cancel();
}
二、算法实现
HashedWheelTimer是Timer接口的时间轮算法实现,我们通过一个示例来看看该如何使用HashedWheelTimer:
public class HashedWheelTimerTest {
public static void main(String[] args) {
Timer timer = new HashedWheelTimer();
// 创建一个任务timeout1,10秒后执行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("timeout1: " + new Date());
}
}, 10, TimeUnit.SECONDS);
// 取消任务timeout1
if (!timeout1.isExpired()) {
timeout1.cancel();
}
// 创建一个任务,1秒后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws InterruptedException {
System.out.println("timeout2: " + new Date());
Thread.sleep(5000);
}
}, 1, TimeUnit.SECONDS);
// 创建一个任务,3秒后执行
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("timeout3: " + new Date());
}
}, 3, TimeUnit.SECONDS);
}
}
HashedWheelTimer的内部结构如下图:
2.1 HashedWheelTimeout
HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类,它扮演了两个角色:
- 时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器;
- 定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于查看和控制定时任务。
HashedWheelTimeout 中的核心字段和方法说明如下:
// HashedWheelTimeout.java
private static final class HashedWheelTimeout implements Timeout {
// 定时任务状态:新建
private static final int ST_INIT = 0;
// 定时任务状态:已取消
private static final int ST_CANCELLED = 1;
// 定时任务状态:已过期
private static final int ST_EXPIRED = 2;
// 定时任务当前所处状态
private volatile int state = ST_INIT;
// 原子更新器,用于更新当前定时任务的状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
// 所属时间轮
private final HashedWheelTimer timer;
// 实际被调度的任务
private final TimerTask task;
// 定时任务执行的时间,在创建HashedWheelTimeout时指定
// 计算公式:currentTime + delay(任务延迟时间) - startTime(HashedWheelTimer的启动时间),单位为纳秒。
private final long deadline;
// 当前任务剩余的时钟周期数
// 当任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长时,就出现了套圈的情况,需要该字段值表示剩余的时钟周期。
long remainingRounds;
// 当前定时任务在链表中的前驱节点
HashedWheelTimeout next;
// 后继节点
HashedWheelTimeout prev;
// 时间轮中的Bucket
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public boolean cancel() {
// 设置任务状态为ST_CANCELLED
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// 将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
// 获取所在的时间轮Bucket
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
// 从时间轮删除该任务
bucket.remove(this);
} else {
// 待执行的任务数减去1
timer.pendingTimeouts.decrementAndGet();
}
}
public void expire() {
// 设置任务状态为ST_EXPIRED
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
// 立即同步执行当前任务
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
}
2.2 HashedWheelBucket
HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。
HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。
// HashedWheelBucket.java
private static final class HashedWheelBucket {
// 链表头节点
private HashedWheelTimeout head;
// 链表尾节点
private HashedWheelTimeout tail;
/**
* 新增 HashedWheelTimeout 到链表尾节点
*/
void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
// 链表尾插法
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* 遍历链表中所有HashedWheelTimeout节点:
* 1.如果任务到期,则remove()取出,然后调用expire()方法执行;
* 2.如果任务已取消,则remove()取出后丢弃;
* 3.如果任务未到期,则将remainingRounds字段值减去1
*/
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 遍历链接表所有节点
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 任务到期
if (timeout.remainingRounds <= 0) {
// 取出任务
next = remove(timeout);
// 任务已到期
if (timeout.deadline <= deadline) {
// 立即执行
timeout.expire();
} else {
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
}
// 任务已取消
else if (timeout.isCancelled()) {
// 直接抛弃任务
next = remove(timeout);
}
// 任务未到期
else {
timeout.remainingRounds--;
}
timeout = next;
}
}
/**
* 从双向链表中移除指定的 HashedWheelTimeout 节点
*/
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
// 双向链表操作,从链表移除节点
HashedWheelTimeout next = timeout.next;
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
tail = timeout.prev;
}
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
// 待执行的任务数减去1
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* 移除双向链表中所有已过期或已取消的任务,并添加到set集合中
*/
void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
/**
* 移除双向链表的头节点任务,并返回该任务
*/
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
2.3 HashedWheelTimer
HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减1。
核心字段
我们先来看下HashedWheelTimer的核心字段:
// HashedWheelTimer.java
public class HashedWheelTimer implements Timer {
//...
// 时间轮状态的原子修改器
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
// 后台工作线程
private final Worker worker = new Worker(); // Runnable对象
private final Thread workerThread; // 工作线程
// 时间轮当前所处的状态:init、started、shutdown
private volatile int workerState;
private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;
// 时间指针每次加1所代表的实际时间间隔,单位纳秒
private final long tickDuration;
// 时间轮的环形队列,每个元素是一个槽,当指定时间轮槽数为n时,实际上会取大于且最靠近n的2的幂次方值
private final HashedWheelBucket[] wheel;
// 掩码,mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽
private final int mask;
// 暂存外部提交到时间轮中的定时任务
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
// 暂存已取消的定时任务
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
// 当前时间轮中剩余的定时任务总数
private final AtomicLong pendingTimeouts = new AtomicLong(0);
// 最大定时任务数
private final long maxPendingTimeouts;
// 当前时间轮的启动时间,提交到该时间轮的定时任务的deadline字段值以该时间戳为起点进行计算
private volatile long startTime;
}
构造函数
然后来看时间轮的构造:
// HashedWheelTimer.java
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit,
int ticksPerWheel, long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// 初始化时间轮
wheel = createWheel(ticksPerWheel);
// 掩码,用于后续计算每个任务的槽位
mask = wheel.length - 1;
// 间隔时长,纳秒
this.tickDuration = unit.toNanos(tickDuration);
// 防止溢出
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 工作线程
workerThread = threadFactory.newThread(worker);
// 最大能容纳的定数任务数
this.maxPendingTimeouts = maxPendingTimeouts;
// 时间轮对象数加1
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
createWheel方法用于创建HashedWheelTimer对象内部的时间槽:
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// 时间槽数量调整为2的幂次
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 创建槽数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
提交任务
HashedWheelTimer对外提供了一个 newTimeout()
接口用于提交定时任务,在定时任务进入到 timeouts
队列之前会先调用 start()
方法启动时间轮,内部会完成以下关键步骤:
- 确定时间轮的 startTime 字段;
- 启动 workerThread 线程,开始执行 worker 任务。
// HashedWheelTimer.java
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 任务数加1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 判断任务数是否超过限制的最大值
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 启动工作线程
start();
// 计算任务的deadline:当前时间 + delay时间 - 时间轮创建时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止deadline溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 将任务封装成HashedWheelTimeout
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 加入队列
timeouts.add(timeout);
return timeout;
}
我们来看start方法,它的内部会启动工作线程,并且主线程会等待工作线程完成startTime
的设置:
// HashedWheelTimer.java
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
public void start() {
// 判断当前时间轮的状态
switch (WORKER_STATE_UPDATER.get(this)) {
// INIT初始化
case WORKER_STATE_INIT:
// 更新时间轮状态
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 启动工作线程
workerThread.start();
}
break;
// STARTED已启动
case WORKER_STATE_STARTED:
break;
// SHUTDOWN已关闭
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while (startTime == 0) {
try {
// 主线程在这里等待,直到工作线程启动
startTimeInitialized.await();
} catch (InterruptedException ignore) {
}
}
}
工作线程
HashedWheelTimer的构造过程比较简单,主要就是创建了一个工作线程workerThread
,并传入一个Woker对象,Woker本质是一个Runnabdle任务类,它的run
方法由workerThread工作线程执行,会一直循环执行以下逻辑:
- 时间轮指针转动,工作线程等待当前tick结束;
- 清理已取消的定时任务,这些定时任务在用户取消时,会记录到
cancelledTimeouts
队列中,每次指针转动时,都会清理该队列; - 将缓存在
timeouts
队列中的定时任务转移到时间轮中对应的槽中; - 根据指针指向的当前时间槽,处理该槽位的双向链表中的所有定时任务。
// Worker.java
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 时间轮指针,一个步长为1的单调递增计数器
private long tick;
@Override
public void run() {
// 初始化时间轮的启动时间
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
// 通知主线程放行,startTimeInitialized是一个CountdownLatch对象
startTimeInitialized.countDown();
do {
// 1.等待tick结束
final long deadline = waitForNextTick();
if (deadline > 0) { // 大于0说明没有被中断
// 计算时间槽位
int idx = (int) (tick & mask);
// 2.清理已取消的任务
processCancelledTasks();
// 3.将timeouts中的任务转移到各自对应的槽位
transferTimeoutsToBuckets();
// 4.处理当前槽位的双向链表中的所有定时任务
HashedWheelBucket bucket = wheel[idx];
bucket.expireTimeouts(deadline);
// 5.指向下一个时间槽
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 执行到这里,说明时间轮已经关闭
// 遍历所有时间槽
for (HashedWheelBucket bucket : wheel) {
// 移除该槽的双向链表中的所有已过期或已取消的任务,把它们添加到unprocessedTimeouts集合中
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将timeouts队列中任务全部取出
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
// 如果任务还没取消
if (!timeout.isCancelled()) {
// 添加任务到队列unprocessedTimeouts
unprocessedTimeouts.add(timeout);
}
}
// 清理 cancelledTimeouts 队列中用户主动取消的定时任务
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// 每次最多转移100000个任务
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// 计算任务的剩余轮数
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 计算任务所属的槽位
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
// 向指定槽位添加任务
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
// 遍历已取消的任务
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
break;
}
try {
// 移除任务
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* 等待当前tick的时间间隔结束,并返回结束时的当前时间
*/
private long waitForNextTick() {
// 计算当前时间槽的deadline
long deadline = tickDuration * (tick + 1);
for (; ; ) {
// 计算线程需要等待多久(sleepTimeMs),才会到达deadline
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// 已到达deadline
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
// 返回当前时间
return currentTime;
}
}
// Windows平台特殊处理
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
// Sleep一段时间,等待deadline到来
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
从上述算法实现可以看出,Dubbo中的时间轮算法实现,基本就是copy了Netty的代码。
三、总结
本章,我再次对时间轮算法的思想及其实现进行了讲解。事实上在Dubbo中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成时,再向时间轮提交一次当前任务,这样就会在下个周期执行该任务。这样的话,即使当前任务执行出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。
当然,Netty和Dubbo实现的时间轮算法都有一定的局限,特别是不能适应 海量定时任务,且任务的开始时间跨度非常长的场景,比如有的是 1 分钟之后执行,有的是 1 小时之后执行,有的是 1 年之后执行 ,这种情况下就需要对时间轮算法进行优化,Kafka采用的解决方案就是 多层时间轮+DelayQueue结构时间槽 ,具体我不再赘述了,读者可以自己去看下Kafka的实现,或者参考我的专栏《透彻理解Java网络编程》中的内容。