新聞中心
一、時(shí)間輪簡介
1.1 為什么要使用時(shí)間輪
在平時(shí)開發(fā)中,經(jīng)常會(huì)與定時(shí)任務(wù)打交道。下面舉幾個(gè)定時(shí)任務(wù)處理的例子。

成都創(chuàng)新互聯(lián)公司是一家專業(yè)從事網(wǎng)站設(shè)計(jì)制作、成都做網(wǎng)站、網(wǎng)頁設(shè)計(jì)的品牌網(wǎng)絡(luò)公司。如今是成都地區(qū)具影響力的網(wǎng)站設(shè)計(jì)公司,作為專業(yè)的成都網(wǎng)站建設(shè)公司,成都創(chuàng)新互聯(lián)公司依托強(qiáng)大的技術(shù)實(shí)力、以及多年的網(wǎng)站運(yùn)營經(jīng)驗(yàn),為您提供專業(yè)的成都網(wǎng)站建設(shè)、營銷型網(wǎng)站建設(shè)及網(wǎng)站設(shè)計(jì)開發(fā)服務(wù)!
1)心跳檢測(cè)。在Dubbo中,需要有心跳機(jī)制來維持Consumer與Provider的長連接,默認(rèn)的心跳間隔是60s。當(dāng)Provider在3次心跳時(shí)間內(nèi)沒有收到心跳響應(yīng),會(huì)關(guān)閉連接通道。當(dāng)Consumer在3次心跳時(shí)間內(nèi)沒有收到心跳響應(yīng),會(huì)進(jìn)行重連。Provider側(cè)和Consumer側(cè)的心跳檢測(cè)機(jī)制都是通過定時(shí)任務(wù)實(shí)現(xiàn)的,而且是本篇文章要分析的時(shí)間輪HashedWheelTimer處理的。
2)超時(shí)處理。在Dubbo中發(fā)起RPC調(diào)用時(shí),通常會(huì)配置超時(shí)時(shí)間,當(dāng)消費(fèi)者調(diào)用服務(wù)提供者出現(xiàn)超時(shí)進(jìn)行一定的邏輯處理。那么怎么檢測(cè)任務(wù)調(diào)用超時(shí)了呢?我們可以利用定時(shí)任務(wù),每次創(chuàng)建一個(gè)Future,記錄這個(gè)Future的創(chuàng)建時(shí)間與超時(shí)時(shí)間,后臺(tái)有一個(gè)定時(shí)任務(wù)進(jìn)行檢測(cè),當(dāng)Future到達(dá)超時(shí)時(shí)間并且沒有被處理時(shí),就需要對(duì)這個(gè)Future執(zhí)行超時(shí)邏輯處理。
3)Redisson分布式鎖續(xù)期。在分布式鎖處理中,通常會(huì)指定分布式鎖的超時(shí)時(shí)間,同樣會(huì)在finally塊里釋放分布式鎖。但是有一個(gè)問題時(shí),通常分布式鎖的超時(shí)時(shí)間不好判斷,如果設(shè)置短了業(yè)務(wù)卻沒執(zhí)行完成就把鎖釋放掉了,或者超時(shí)時(shí)間設(shè)置很長,同樣也會(huì)存在一些問題。Redisson提供了一種看門狗機(jī)制,通過時(shí)間輪定時(shí)給分布式鎖續(xù)期,也就是延長分布式鎖的超時(shí)時(shí)間。
可以看到,上述幾個(gè)例子都與定時(shí)任務(wù)有關(guān),那么傳統(tǒng)的定時(shí)任務(wù)有什么缺點(diǎn)呢?為什么要使用時(shí)間輪來實(shí)現(xiàn)?
假如使用普通的定時(shí)任務(wù)處理機(jī)制來處理例2)中的超時(shí)情況:
1)簡單地,可以針對(duì)每一次請(qǐng)求創(chuàng)建一個(gè)線程,然后Sleep到超時(shí)時(shí)間,之后若判斷超時(shí)則進(jìn)行超時(shí)邏輯處理。存在的問題是如果面臨是高并發(fā)請(qǐng)求,針對(duì)每個(gè)請(qǐng)求都要去創(chuàng)建線程,這樣太耗費(fèi)資源了。
2)針對(duì)方案1的不足,可以改成一個(gè)線程來處理所有的定時(shí)任務(wù),比如這個(gè)線程可以每隔50ms掃描所有需要處理的超時(shí)任務(wù),如果發(fā)現(xiàn)有超時(shí)任務(wù),則進(jìn)行處理。但是,這樣也存在一個(gè)問題,可能一段時(shí)間內(nèi)都沒有任務(wù)達(dá)到超時(shí)時(shí)間,那么就讓CPU多了很多無用的輪詢遍歷操作。
針對(duì)上述方案的不足,可以采用時(shí)間輪來進(jìn)行處理。下面先來簡單介紹下時(shí)間輪的概念。
1.2 單層時(shí)間輪
我們先以單層時(shí)間輪為例,假設(shè)時(shí)間輪的周期是1秒,時(shí)間輪中有10個(gè)槽位,則每個(gè)槽位代表100ms。假設(shè)我們現(xiàn)在有3個(gè)任務(wù),分別是任務(wù)A(220ms后執(zhí)行)、B(410ms之后運(yùn)行)、C(1930ms之后運(yùn)行)。則這三個(gè)任務(wù)在時(shí)間輪所處的槽位如下圖,可以看到任務(wù)A被放到了槽位2,任務(wù)B被放到了槽位4,任務(wù)C被放到了槽位9。
當(dāng)時(shí)間輪轉(zhuǎn)動(dòng)到對(duì)應(yīng)的槽時(shí),就會(huì)從槽中取出任務(wù)判斷是否需要執(zhí)行。同時(shí)可以發(fā)現(xiàn)有一個(gè)剩余周期的概念,這是因?yàn)槿蝿?wù)C的執(zhí)行時(shí)間為1930ms,超過了時(shí)間輪的周期1秒,所以可以標(biāo)記它的剩余周期為1,當(dāng)時(shí)間輪第一次轉(zhuǎn)動(dòng)到它的位置時(shí),發(fā)現(xiàn)它的剩余周期為1,表示還沒有到要處理的時(shí)間,將剩余周期減1,時(shí)間輪繼續(xù)轉(zhuǎn)動(dòng),當(dāng)下一次轉(zhuǎn)動(dòng)到C任務(wù)位置時(shí),發(fā)現(xiàn)剩余周期為0,表示時(shí)間到了需要處理該定時(shí)任務(wù)了。Dubbo中采用的就是這種單層時(shí)間輪機(jī)制。
1.3 多層時(shí)間輪
既然有單層時(shí)間輪,那么自然而然可以想到利用多層時(shí)間輪來解決上述任務(wù)執(zhí)行時(shí)間超出時(shí)間輪周期的情況。下面以兩層時(shí)間輪為例,第一層時(shí)間輪周期為1秒,第二層時(shí)間輪周期為10秒。
還是以上述3個(gè)任務(wù)為例,可以看到任務(wù)A和B分布在第一層時(shí)間輪上,而任務(wù)C分布在第二層時(shí)間輪的槽1處。當(dāng)?shù)谝粚訒r(shí)間輪轉(zhuǎn)動(dòng)時(shí),任務(wù)A和任務(wù)B會(huì)被先后執(zhí)行。1秒鐘之后,第一層時(shí)間輪完成了一個(gè)周期轉(zhuǎn)動(dòng)。從新開始第0跳,這時(shí)第二層時(shí)間輪從槽0跳到了槽1處,將槽1處的任務(wù),也就是任務(wù)C取出放入到第一層時(shí)間輪的槽位9處,當(dāng)?shù)谝粚訒r(shí)間輪轉(zhuǎn)動(dòng)到槽位9處,任務(wù)C就會(huì)被執(zhí)行。這種將第二層的任務(wù)取出放入第一層中稱為降級(jí),它是為了保證任務(wù)被處理的時(shí)間精度。Kafka內(nèi)部就是采用的這種多層時(shí)間輪機(jī)制。
二、時(shí)間輪原理
下面先來看一下Dubbo中的時(shí)間輪的結(jié)構(gòu),可以看到,它和時(shí)鐘很像,它被劃分成了一個(gè)個(gè)Bucket,每個(gè)Bucket有一個(gè)頭指針和尾指針,分別指向雙向鏈表的頭節(jié)點(diǎn)和尾節(jié)點(diǎn),雙向鏈表中存儲(chǔ)的就是要處理的任務(wù)。時(shí)間輪不停轉(zhuǎn)動(dòng),當(dāng)指向Bucket0所負(fù)責(zé)維護(hù)的雙向鏈表時(shí),就將它所存儲(chǔ)的任務(wù)遍歷取出來處理。
下面我們先來介紹下Dubbo中時(shí)間輪HashedWheelTimer所涉及到的一些核心概念,在講解完這些核心概念之后,再來對(duì)時(shí)間輪的源碼進(jìn)行分析。
2.1 TimerTask
在Dubbo中,TimerTask封裝了要執(zhí)行的任務(wù),它就是上圖雙向鏈表中節(jié)點(diǎn)所封裝的任務(wù)。所有的定時(shí)任務(wù)都需要繼承TimerTask接口。如下圖,可以看到Dubbo中的心跳任務(wù)HeartBeatTask、注冊(cè)失敗重試任務(wù)FailRegisteredTask等都實(shí)現(xiàn)了TimerTask接口。
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}2.2 Timeout
TimerTask中run方法的入?yún)⑹荰imeout,Timeout與TimerTask一一對(duì)應(yīng),Timeout的唯一實(shí)現(xiàn)類HashedWheelTimeout中就封裝了TimerTask屬性,可以理解為HashedWheelTimeout就是上述雙向鏈表的一個(gè)節(jié)點(diǎn),因此它也包含兩個(gè)HashedWheelTimeout類型的指針,分別指向當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)和下一個(gè)節(jié)點(diǎn)。
public interface Timeout {
// Timer就是定時(shí)器, 也就是Dubbo中的時(shí)間輪
Timer timer();
// 獲取該節(jié)點(diǎn)要執(zhí)行的任務(wù)
TimerTask task();
// 判斷該節(jié)點(diǎn)封裝的任務(wù)有沒有過期、被取消
boolean isExpired();
boolean isCancelled();
// 取消該節(jié)點(diǎn)的任務(wù)
boolean cancel();
}HashedWheelTimeout是Timeout的唯一實(shí)現(xiàn),它的作用有兩個(gè):
- 它是時(shí)間輪槽所維護(hù)的雙向鏈表的節(jié)點(diǎn),其中封裝了實(shí)際要執(zhí)行的任務(wù)TimerTask。
- 通過它可以查看定時(shí)任務(wù)的狀態(tài)、對(duì)定時(shí)任務(wù)進(jìn)行取消、從雙向鏈表中移除等操作。
下面來看一下Timeout的實(shí)現(xiàn)類HashedWheelTimeout的核心字段與實(shí)現(xiàn)。
1) int ST_INIT = 0、int ST_CANCELLED = 1、int ST_EXPIRED = 2
HashedWheelTimeout里定義了三種狀態(tài),分別表示任務(wù)的初始化狀態(tài)、被取消狀態(tài)、已過期狀態(tài)
2) STATE_UPDATER
用于更新定時(shí)任務(wù)的狀態(tài)
3) HashedWheelTimer timer
指向時(shí)間輪對(duì)象
4) TimerTask task
實(shí)際要執(zhí)行的任務(wù)
5) long deadline
指定時(shí)任務(wù)執(zhí)行的時(shí)間,這個(gè)時(shí)間是在創(chuàng)建 HashedWheelTimeout 時(shí)指定的
計(jì)算公式是: currentTime(創(chuàng)建 HashedWheelTimeout 的時(shí)間) + delay(任務(wù)延遲時(shí)間)
- startTime(HashedWheelTimer 的啟動(dòng)時(shí)間),時(shí)間單位為納秒
6) int state = ST_INIT
任務(wù)初始狀態(tài)
7) long remainingRounds
指當(dāng)前任務(wù)剩余的時(shí)鐘周期數(shù). 時(shí)間輪所能表示的時(shí)間長度是有限的, 在任務(wù)到期時(shí)間與當(dāng)前時(shí)刻
的時(shí)間差超過時(shí)間輪單圈能表示的時(shí)長,就出現(xiàn)了套圈的情況,需要該字段值表示剩余的時(shí)鐘周期
8) HashedWheelTimeout next、HashedWheelTimeout prev
分別對(duì)應(yīng)當(dāng)前定時(shí)任務(wù)在鏈表中的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn),這也驗(yàn)證了時(shí)間輪中每個(gè)槽所對(duì)應(yīng)的任務(wù)鏈表是
一個(gè)雙鏈表
9) HashedWheelBucket bucket
時(shí)間輪中的一個(gè)槽,對(duì)應(yīng)時(shí)間輪圓圈的一個(gè)個(gè)小格子,每個(gè)槽維護(hù)一個(gè)雙向鏈表,當(dāng)時(shí)間輪指針轉(zhuǎn)到當(dāng)前
槽時(shí),就會(huì)從槽所負(fù)責(zé)的雙向鏈表中取出任務(wù)進(jìn)行處理
HashedWheelTimeout提供了remove操作,可以從雙向鏈表中移除當(dāng)前自身節(jié)點(diǎn),并將當(dāng)前時(shí)間輪所維護(hù)的定時(shí)任務(wù)數(shù)量減一。
void remove() {
// 獲取當(dāng)前任務(wù)屬于哪個(gè)槽
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
// 從槽中移除自己,也就是從雙向鏈表中移除節(jié)點(diǎn),
// 分析bucket的方法時(shí)會(huì)分析
bucket.remove(this);
} else {
// pendingTimeouts表示當(dāng)前時(shí)間輪所維護(hù)的定時(shí)任務(wù)的數(shù)量
timer.pendingTimeouts.decrementAndGet();
}
}HashedWheelTimeout提供了cancel操作,可以取消時(shí)間輪中的定時(shí)任務(wù)。當(dāng)定時(shí)任務(wù)被取消時(shí),它會(huì)首先被暫存到canceledTimeouts隊(duì)列中。在時(shí)間輪轉(zhuǎn)動(dòng)到槽進(jìn)行任務(wù)處理之前和時(shí)間輪退出運(yùn)行時(shí)都會(huì)調(diào)用cancel,而cancel會(huì)調(diào)用remove,從而清理該隊(duì)列中被取消的定時(shí)任務(wù)。
@Override
public boolean cancel() {
// 通過CAS進(jìn)行狀態(tài)變更
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// 任務(wù)被取消時(shí),時(shí)間輪會(huì)將它暫存到時(shí)間輪所維護(hù)的canceledTimeouts隊(duì)列中.
// 在時(shí)間輪轉(zhuǎn)動(dòng)到槽進(jìn)行任務(wù)處理之前和時(shí)間輪退出運(yùn)行時(shí)都會(huì)調(diào)用cancel,而
// cancel會(huì)調(diào)用remove,從而清理該隊(duì)列中被取消的定時(shí)任務(wù)
timer.cancelledTimeouts.add(this);
return true;
}
HashedWheelTimeout提供了expire操作,當(dāng)時(shí)間輪指針轉(zhuǎn)動(dòng)到某個(gè)槽時(shí),會(huì)遍歷該槽所維護(hù)的雙向鏈表,判斷節(jié)點(diǎn)的狀態(tài),如果發(fā)現(xiàn)任務(wù)已到期,會(huì)通過remove方法移除,然后調(diào)用expire方法執(zhí)行該定時(shí)任務(wù)。
public void expire() {
// 修改定時(shí)任務(wù)狀態(tài)為已過期
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
// 真正的執(zhí)行定時(shí)任務(wù)所要代表的邏輯
task.run(this);
} catch (Throwable t) {
// 打印日志,可以看到當(dāng)時(shí)間輪中定時(shí)任務(wù)執(zhí)行異常時(shí),
// 不會(huì)拋出異常,影響到時(shí)間輪中其他定時(shí)任務(wù)執(zhí)行
}
}
2.3 HashedWheelBucket
前面也介紹過了,它是時(shí)間輪中的槽,它內(nèi)部維護(hù)了雙向鏈表的首尾指針。下面我們來看一下它內(nèi)部的核心資源和實(shí)現(xiàn)。
1) HashedWheelTimeout head、HashedWheelTimeout tail
指向該槽所維護(hù)的雙向鏈表的首節(jié)點(diǎn)和尾節(jié)點(diǎn)
HashedWheelBucket提供了addTimeout方法,用于添加任務(wù)到雙向鏈表的尾節(jié)點(diǎn)。
void addTimeout(HashedWheelTimeout timeout) {
// 添加之前判斷一下該任務(wù)當(dāng)前沒有被被關(guān)聯(lián)到一個(gè)槽上
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
HashedWheelBucket提供了remove方法,用于從雙向鏈表中刪除指定節(jié)點(diǎn)。核心邏輯如下圖所示,根據(jù)要?jiǎng)h除的節(jié)點(diǎn)找到其前置節(jié)點(diǎn)和后置節(jié)點(diǎn),然后分別調(diào)整前置節(jié)點(diǎn)的next指針和后置節(jié)點(diǎn)的prev指針。刪除過程中需要考慮一些邊界情況。刪除之后將pendingTimeouts,也就是當(dāng)前時(shí)間輪的待處理任務(wù)數(shù)減一。remove代碼邏輯較簡單,這邊就不貼代碼了。
HashedWheelBucket提供了expireTimeouts方法,當(dāng)時(shí)間輪指針轉(zhuǎn)動(dòng)到某個(gè)槽時(shí),通過該方法處理該槽上雙向鏈表的定時(shí)任務(wù),分為3種情況:
- 定時(shí)任務(wù)已到期,則會(huì)通過remove方法取出,并調(diào)用其expire方法執(zhí)行任務(wù)邏輯。
- 定時(shí)任務(wù)已被取消,則通過remove方法取出直接丟棄。
- 定時(shí)任務(wù)還未到期,則會(huì)將remainingRounds(剩余時(shí)鐘周期)減一。
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 時(shí)間輪指針轉(zhuǎn)到某個(gè)槽時(shí)從雙向鏈表頭節(jié)點(diǎn)開始遍歷
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// remainingRounds <= 0表示到期了
if (timeout.remainingRounds <= 0) {
// 從鏈表中移除該節(jié)點(diǎn)
next = remove(timeout);
// 判斷該定時(shí)任務(wù)確實(shí)是到期了
if (timeout.deadline <= deadline) {
// 執(zhí)行該任務(wù)
timeout.expire();
} else {
// 拋異常
}
} else if (timeout.isCancelled()) {
// 任務(wù)被取消,移除后直接丟棄
next = remove(timeout);
} else {
// 剩余時(shí)鐘周期減一
timeout.remainingRounds--;
}
// 繼續(xù)判斷下一個(gè)任務(wù)節(jié)點(diǎn)
timeout = next;
}
}
HashedWheelBucket也提供了clearTimeouts方法,該方法會(huì)在時(shí)間輪停止的時(shí)候被使用,它會(huì)遍歷并移除所有雙向鏈表中的節(jié)點(diǎn),并返回所有未超時(shí)和未被取消的任務(wù)。
2.4 Worker
Worker實(shí)現(xiàn)了Runnable接口,時(shí)間輪內(nèi)部通過Worker線程來處理放入時(shí)間輪中的定時(shí)任務(wù)。下面先來看一下它的核心字段和run方法邏輯。
1) SetunprocessedTimeouts
當(dāng)時(shí)間輪停止時(shí),用于存放時(shí)間輪中未過期的和未被取消的任務(wù)
2) long tick
時(shí)間輪指針,指向時(shí)間輪中某個(gè)槽,當(dāng)時(shí)間輪轉(zhuǎn)動(dòng)時(shí)該tick會(huì)自增
public void run() {
// 初始化startTime, 所有任務(wù)的的deadline都是相對(duì)于這個(gè)時(shí)間點(diǎn)
startTime = System.nanoTime();
// 喚醒阻塞在start()的線程
startTimeInitialized.countDown();
// 只要時(shí)間輪的狀態(tài)為WORKER_STATE_STARTED, 就循環(huán)的轉(zhuǎn)動(dòng)tick,
// 處理槽中的定時(shí)任務(wù)
do {
// 判斷是否到了處理槽的時(shí)間了,還沒到則sleep一會(huì)
final long deadline = waitForNextTick();
if (deadline > 0) {
// 獲取tick對(duì)應(yīng)的槽索引
int idx = (int) (tick & mask);
// 清理用戶主動(dòng)取消的定時(shí)任務(wù), 這些定時(shí)任務(wù)在用戶取消時(shí),
// 會(huì)記錄到 cancelledTimeouts 隊(duì)列中. 在每次指針轉(zhuǎn)動(dòng)
// 的時(shí)候,時(shí)間輪都會(huì)清理該隊(duì)列
processCancelledTasks();
// 根據(jù)當(dāng)前指針定位對(duì)應(yīng)槽
HashedWheelBucket bucket = wheel[idx];
// 將緩存在 timeouts 隊(duì)列中的定時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中對(duì)應(yīng)的槽中
transferTimeoutsToBuckets();
// 處理該槽位的雙向鏈表中的定時(shí)任務(wù)
bucket.expireTimeouts(deadline);
tick++;
}
// 檢測(cè)時(shí)間輪的狀態(tài), 如果時(shí)間輪處于運(yùn)行狀態(tài), 則循環(huán)執(zhí)行上述步驟,
// 不斷執(zhí)行定時(shí)任務(wù)
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this)
== WORKER_STATE_STARTED);
// 這里應(yīng)該是時(shí)間輪停止了, 清除所有槽中的任務(wù), 并加入到未處理任務(wù)列表,
// 以供stop()方法返回
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 將還沒有加入到槽中的待處理定時(shí)任務(wù)隊(duì)列中的任務(wù)取出, 如果是未取消
// 的任務(wù), 則加入到未處理任務(wù)隊(duì)列中, 以供stop()方法返回
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 最后再次清理 cancelledTimeouts 隊(duì)列中用戶主動(dòng)取消的定時(shí)任務(wù)
processCancelledTasks();
}
下面對(duì)run方法中涉及到的一些方法進(jìn)行介紹:
1)waitForNextTick
邏輯比較簡單,它會(huì)判斷有沒有到達(dá)處理下一個(gè)槽任務(wù)的時(shí)間了,如果還沒有到達(dá)則sleep一會(huì)。
2)processCancelledTasks
遍歷cancelledTimeouts,獲取被取消的任務(wù)并從雙向鏈表中移除。
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
timeout.remove();
}
}3)transferTimeoutsToBuckets
當(dāng)調(diào)用newTimeout方法時(shí),會(huì)先將要處理的任務(wù)緩存到timeouts隊(duì)列中,等時(shí)間輪指針轉(zhuǎn)動(dòng)時(shí)統(tǒng)一調(diào)用transferTimeoutsToBuckets方法處理,將任務(wù)轉(zhuǎn)移到指定的槽對(duì)應(yīng)的雙向鏈表中,每次轉(zhuǎn)移10萬個(gè),以免阻塞時(shí)間輪線程。
private void transferTimeoutsToBuckets() {
// 每次tick只處理10w個(gè)任務(wù), 以免阻塞worker線程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
// 沒有任務(wù)了直接跳出循環(huán)
if (timeout == null) {
// all processed
break;
}
// 還沒有放入到槽中就取消了, 直接略過
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// 計(jì)算任務(wù)需要經(jīng)過多少個(gè)tick
long calculated = timeout.deadline / tickDuration;
// 計(jì)算任務(wù)的輪數(shù)
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 如果任務(wù)在timeouts隊(duì)列里面放久了, 以至于已經(jīng)過了執(zhí)行時(shí)間, 這個(gè)時(shí)候
// 就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會(huì)被執(zhí)行.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
// 將任務(wù)加入到相應(yīng)的槽中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
2.5 HashedWheelTimer
最后,我們來分析時(shí)間輪HashedWheelTimer,它實(shí)現(xiàn)了Timer接口,提供了newTimeout方法可以向時(shí)間輪中添加定時(shí)任務(wù),該任務(wù)會(huì)先被暫存到timeouts隊(duì)列中,等時(shí)間輪轉(zhuǎn)動(dòng)到某個(gè)槽時(shí),會(huì)將該timeouts隊(duì)列中的任務(wù)轉(zhuǎn)移到某個(gè)槽所負(fù)責(zé)的雙向鏈表中。它還提供了stop方法用于終止時(shí)間輪,該方法會(huì)返回時(shí)間輪中未處理的任務(wù)。它也提供了isStop方法用于判斷時(shí)間輪是否終止了。
先來看一下HashedWheelTimer的核心字段。
1) HashedWheelBucket[] wheel
該數(shù)組就是時(shí)間輪的環(huán)形隊(duì)列,數(shù)組每個(gè)元素都是一個(gè)槽,一個(gè)槽負(fù)責(zé)維護(hù)一個(gè)雙向鏈表,用于存儲(chǔ)定時(shí)
任務(wù)。它會(huì)被在構(gòu)造函數(shù)中初始化,當(dāng)指定為n時(shí),它實(shí)際上會(huì)取最靠近n的且為2的冪次方值。
2) Queuetimeouts
timeouts用于緩存外部向時(shí)間輪提交的定時(shí)任務(wù)
3) QueuecancelledTimeouts
cancelledTimeouts用于暫存被取消的定時(shí)任務(wù),時(shí)間輪會(huì)在處理槽負(fù)責(zé)的雙向鏈表之前,先處理這兩
個(gè)隊(duì)列中的數(shù)據(jù)。
4) Worker worker
時(shí)間輪處理定時(shí)任務(wù)的邏輯
5) Thread workerThread
時(shí)間輪處理定時(shí)任務(wù)的線程
6) AtomicLong pendingTimeouts
時(shí)間輪剩余的待處理的定時(shí)任務(wù)數(shù)量
7) long tickDuration
時(shí)間輪每個(gè)槽所代表的時(shí)間長度
8) int workerState
時(shí)間輪狀態(tài),可選值有init、started、shut down
下面來看一下時(shí)間輪的構(gòu)造函數(shù),用于初始化一個(gè)時(shí)間輪。首先它會(huì)對(duì)傳入?yún)?shù)ticksPerWheel進(jìn)行轉(zhuǎn)換處理,返回大于該值的2的冪次方,它表示時(shí)間輪上有多少個(gè)槽,默認(rèn)是512個(gè)。然后創(chuàng)建大小為該值的HashedWheelBucket[]數(shù)組。接著通過傳入的tickDuration對(duì)時(shí)間輪的tickDuration賦值,默認(rèn)是100ms。節(jié)通過threadFactory創(chuàng)建workerThread工作線程,該線程就是負(fù)責(zé)處理時(shí)間輪中的定時(shí)任務(wù)的線程。
public HashedWheelTimer(ThreadFactory threadFactory,
long tickDuration, TimeUnit unit,
int ticksPerWheel,
long maxPendingTimeouts) {
// 圓環(huán)上一共有多少個(gè)時(shí)間間隔, HashedWheelTimer對(duì)其正則化
// 將其換算為大于等于該值的2^n
wheel = createWheel(ticksPerWheel);
// 這用來快速計(jì)算任務(wù)應(yīng)該呆的槽
mask = wheel.length - 1;
// 時(shí)間輪每個(gè)槽的時(shí)間間隔
this.tickDuration = unit.toNanos(tickDuration);
// threadFactory是創(chuàng)建線程的線程工廠對(duì)象
workerThread = threadFactory.newThread(worker);
// 最多允許多少個(gè)任務(wù)等待執(zhí)行
this.maxPendingTimeouts = maxPendingTimeouts;
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// 計(jì)算真正應(yīng)當(dāng)創(chuàng)建多少個(gè)槽
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 初始化時(shí)間輪數(shù)組
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
初始化時(shí)間輪之后,就可以向其中提交定時(shí)任務(wù)了,可以通過時(shí)間輪提供的newTimeout方法來完成。首先將待處理的任務(wù)數(shù)量加1,然后啟動(dòng)時(shí)間輪線程,這時(shí)worker的run方法就會(huì)被系統(tǒng)調(diào)度運(yùn)行。然后將該定時(shí)任務(wù)封裝成HashedWheelTimeout加入到timeouts隊(duì)列中。start之后,時(shí)間輪就開始運(yùn)行起來了,直到外界調(diào)用stop方法終止退出。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 待處理的任務(wù)數(shù)量加1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 啟動(dòng)時(shí)間輪
start();
// 計(jì)算該定時(shí)任務(wù)的deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 創(chuàng)建一個(gè)HashedWheelTimeout對(duì)象,它首先會(huì)被暫存到timeouts隊(duì)列中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
public void start() {
/**
* 判斷當(dāng)前時(shí)間輪的狀態(tài)
* 1) 如果是初始化, 則啟動(dòng)worker線程, 啟動(dòng)整個(gè)時(shí)間輪
* 2) 如果已經(jīng)啟動(dòng)則略過
* 3) 如果是已經(jīng)停止,則報(bào)錯(cuò)
*/
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 使用cas來判斷啟動(dòng)時(shí)間輪
if (WORKER_STATE_UPDATER.compareAndSet(this,
WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
// 拋異常
default:
throw new Error("Invalid WorkerState");
}
// 等待worker線程初始化時(shí)間輪的啟動(dòng)時(shí)間
while (startTime == 0) {
try {
// 這里使用countDownLatch來確保調(diào)度的線程已經(jīng)被啟動(dòng)
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
三、時(shí)間輪應(yīng)用
到這里,Dubbo中的時(shí)間輪原理就分析完了。接下來呼應(yīng)本文開頭的三個(gè)例子,結(jié)合它們來分析下時(shí)間輪在Dubbo或Redisson中是如何使用的。
1)HeartbeatTimerTask
在Dubbo的HeaderExchangeClient類中會(huì)向時(shí)間輪中提交該心跳任務(wù)。
private void startHeartBeatTask(URL url) {
// Client的具體實(shí)現(xiàn)決定是否啟動(dòng)該心跳任務(wù)
if (!client.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp =
() -> Collections.singletonList(HeaderExchangeClient.this);
// 計(jì)算心跳間隔, 最小間隔不能低于1s
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
// 創(chuàng)建心跳任務(wù)
this.heartBeatTimerTask =
new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
// 提交到IDLE_CHECK_TIMER這個(gè)時(shí)間輪中等待執(zhí)行, 等時(shí)間到了時(shí)間輪就會(huì)去取出該任務(wù)進(jìn)行調(diào)度執(zhí)行
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}
// 上面用到的IDLE_CHECK_TIMER就是我們本文的分析的時(shí)間輪
private static final HashedWheelTimer IDLE_CHECK_TIMER =
new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
// 上述創(chuàng)建心跳任務(wù)時(shí), 創(chuàng)建了一個(gè)HeartbeatTimerTask對(duì)象, 可以看下該任務(wù)具體要做什么
@Override
protected void doTask(Channel channel) {
try {
// 獲取最后一次讀寫時(shí)間
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
// 最后一次讀寫時(shí)間超過心跳時(shí)間, 就會(huì)發(fā)送心跳請(qǐng)求
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
// 表明它是一個(gè)心跳請(qǐng)求
req.setEvent(HEARTBEAT_EVENT);
channel.send(req);
分享名稱:時(shí)間輪原理及其在框架中的應(yīng)用
本文地址:http://fisionsoft.com.cn/article/codhgih.html


咨詢
建站咨詢
