时间轮机制在Redisson分布式锁中的实际应用以及时间轮源码分析
本篇文章主要基于Redisson中实现的分布式锁机制继续进行展开,分析Redisson中的时间轮机制。
在前面分析的Redisson的分布式锁实现中,有一个Watch Dog机制来对锁键进行续约,代码如下:
private void renewExpiration() { |
实际上是构建了一个TimerTask,通过timer.newTimeout(task, delay, unit);
添加到时间轮中。
|
private HashedWheelTimer timer; |
先来了解一下什么是时间轮
时间轮这个技术其实出来很久了,在kafka、zookeeper等技术中都有时间轮使用的方式。我第一次听这个概念,是当时我一个朋友在拼多多,负责整体架构设计时需要考虑到超时订单的自动关单,而订单交易量又特别多,直接去轮询数据的效率有点低,所以当时沟通下来聊到了时间轮这个东西。什么是时间轮呢?
简单来说: 时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。
所以时间轮的模型能够高效管理各种延时任务、周期任务、通知任务。 以后大家在工作中遇到类似的功能,可以采用时间轮机制。
如图3-11,时间轮,从图片上来看,就和手表的表圈是一样,所以称为时间轮,是因为它是以时间作为刻度组成的一个环形队列,这个环形队列采用数组来实现,数组的每个元素称为槽,每个槽可以放一个定时任务列表,叫HashedWheelBucket,它是一个双向链表,量表的每一项表示一个定时任务项(HashedWhellTimeout),其中封装了真正的定时任务TimerTask。
时间轮是由多个时间格组成,下图中有8个时间格,每个时间格代表当前时间轮的基本时间跨度(tickDuration),其中时间轮的时间格的个数是固定的。
在下图中,有8个时间格(槽),假设每个时间格的单位为1s,那么整个时间轮走完一圈需要8s钟。每秒钟指针会沿着顺时针方向移动一个,这个单位可以设置,比如以秒为单位,可以以一小时为单位,这个单位可以代表时间精度。通过指针移动,来获得每个时间格中的任务列表,然后遍历这一个时间格中的双向链表来执行任务,以此循环。
时间轮的使用
这里使用的时间轮是Netty这个包中提供的,使用方法比较简单。
- 先构建一个HashedWheelTimer时间轮。
- tickDuration: 100 ,表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格
- ticksPerWheel:1024,表示时间轮上一共有多少个窗格,分配的窗格越多,占用内存空间就越大
- leakDetection:是否开启内存泄漏检测。
- maxPendingTimeouts[可选参数],最大允许等待的任务数,默认没有限制。
- 通过newTimeout()把需要延迟执行的任务添加到时间轮中
|
时间轮的原理解析
时间轮的整体原理,分为几个部分。
创建时间轮
时间轮本质上是一个环状数组,比如我们初始化时间轮时:ticksPerWheel=8,那么意味着这个环状数组的长度是8,如图3-12所示。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
图3-12 添加任务,如图3-13所示
当通过newTimeout()方法添加一个延迟任务时,该任务首先会加入到一个阻塞队列中中。
然后会有一个定时任务从该队列获取任务,添加到时间轮的指定位置,计算方法如下。
//当前任务的开始执行时间除以每个窗口的时间间隔,得到一个calculated值(表示需要经过多少tick,指针没跳动一个窗格,tick会递增),单位为nanos(微毫秒)
long calculated = timeout.deadline / tickDuration;
//计算当前任务需要在时间轮中经历的圈数,因为当前任务执行时间有可能大于完整一圈的时间,所以需要计算经过几圈之后才能执行该任务。
timeout.remainingRounds = (calculated - tick) / wheel.length;
//取最大的一个tick,有可能当前任务在队列中已经过了执行时间,这种情况下直接用calculated这个值就没意义了。
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask); //通过ticks取模mask,得到一个下标
HashedWheelBucket bucket = wheel[stopIndex]; //把任务添加到指定数组下标位置图3-13 任务执行
Worker线程按照每次间隔时间转动后,得到该时间窗格中的任务链表,然后从链表的head开始逐个取出任务,有两个判断条件
- 当前任务需要转动的圈数为0,表示任务是当前圈开始执行
- 当前任务达到了delay时间,也就是
timeout.deadline <= deadline
- 最终调用timeout.expire()方法执行任务。
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}
时间轮的源码分析
HashedWheelTimer的构造
- 调用createWheel创建一个时间轮,时间轮数组一定是2的幂次方,比如传入的ticksPerWheel=6,那么初始化的wheel长度一定是8,这样是便于时间格的计算。
- tickDuration,表示时间轮的跨度,代表每个时间格的时间精度,以纳秒的方式来表现。
- 把工作线程Worker封装成WorkerThread,从名字可以知道,它就是最终那个负责干活的线程。
public HashedWheelTimer( |
- 对传入的ticksPerWheel进行整形
- 初始化固定长度的HashedWheelBucket
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { |
添加任务到时间轮
完成时间轮的初始化之后,并没有去启动时间轮,继续看FailbackClusterInvoker中的代码。
构建了一个RetryTimerTask,也就是一个重试的定时任务,接着把这个任务通过newTimeout加入到时间轮中,其中
- retryTimerTask,表示具体的重试任务
- RETRY_FAILED_PERIOD , 表示重试间隔时间,默认为5s
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD); |
调用newTimeout方法,把任务添加进来。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { |
start
任务添加到阻塞队列之后,我们再来看启动方法
start方法会根据当前的workerState状态来启动时间轮。并且用了startTimeInitialized来控制线程的运行,如果workerThread没有启动起来,那么newTimeout方法会一直阻塞在运行start方法中。如果不阻塞,newTimeout方法会获取不到startTime。
public void start() { |
启动时间轮
调用start()方法, 会调用workerThread.start();
来启动一个工作线程,这个工作线程是在构造方法中初始化的,包装的是一个Worker内部线程类。
所以直接进入到Worker这个类的run方法,了解下它的设计逻辑
public void run() { |
时间轮指针跳动
这个方法的主要作用就是返回下一个指针指向的时间间隔,然后进行sleep操作。
大家可以想象一下,一个钟表上秒与秒之间是有时间间隔的,那么waitForNextTick就是根据当前时间计算出跳动到下个时间的时间间隔,然后进行sleep,然后再返回当前时间距离时间轮启动时间的时间间隔。
说得再直白一点:,假设当前的tickDuration的间隔是1s,tick默认=0, 此时第一次进来,得到的deadline=1,也就是下一次跳动的时间间隔是1s。假设当前处于
private long waitForNextTick() { |
transferTimeoutsToBuckets
转移任务到时间轮中,前面我们讲过,任务添加进来时,是先放入到阻塞队列。
而在现在这个方法中,就是把阻塞队列中的数据转移到时间轮的指定位置。
在这个转移方法中,写死了一个循环,每次都只转移10万个任务。
然后根据HashedWheelTimeout的deadline延迟时间计算出时间轮需要运行多少次才能运行当前的任务,如果当前的任务延迟时间大于时间轮跑一圈所需要的时间,那么就计算需要跑几圈才能到这个任务运行。
最后计算出该任务在时间轮中的槽位,添加到时间轮的链表中。
private void transferTimeoutsToBuckets() { |
运行时间轮中的任务
当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现
这个方法的主要作用是: 过期并执行格子中到期的任务。也就是当tick进入到指定格子时,worker线程会调用这个方法
HashedWheelBucket是一个链表,所以我们需要从head节点往下进行遍历。如果链表没有遍历到链表尾部那么就继续往下遍历。
获取的timeout节点节点,如果剩余轮数remainingRounds大于0,那么就说明要到下一圈才能运行,所以将剩余轮数减一;
如果当前剩余轮数小于等于零了,那么就将当前节点从bucket链表中移除,并判断一下当前的时间是否大于timeout的延迟时间,如果是则调用timeout的expire执行任务。
void expireTimeouts(long deadline) { |