新聞中心
上一篇我們講了定時器的幾種實現(xiàn),分析了在大數(shù)據(jù)量高并發(fā)的場景下這幾種實現(xiàn)方式就有點力不從心了,從而引出時間輪這種數(shù)據(jù)結(jié)構(gòu)。在netty 和kafka 這兩種優(yōu)秀的中間件中,都有時間輪的實現(xiàn)。文章最后,我們模擬kafka 中scala 的代碼實現(xiàn)java版的時間輪。

公司主營業(yè)務(wù):網(wǎng)站制作、成都網(wǎng)站制作、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)建站是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)建站推出三山免費(fèi)做網(wǎng)站回饋大家。
Netty 的時間輪實現(xiàn)
接口定義
Netty 的實現(xiàn)自定義了一個超時器的接口io.netty.util.Timer,其方法如下:
- public interface Timer
- {
- //新增一個延時任務(wù),入?yún)槎〞r任務(wù)TimerTask,和對應(yīng)的延遲時間
- Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
- //停止時間輪的運(yùn)行,并且返回所有未被觸發(fā)的延時任務(wù)
- Set < Timeout > stop();
- }
- public interface Timeout
- {
- Timer timer();
- TimerTask task();
- boolean isExpired();
- boolean isCancelled();
- boolean cancel();
- }
Timeout接口是對延遲任務(wù)的一個封裝,其接口方法說明其實現(xiàn)內(nèi)部需要維持該延遲任務(wù)的狀態(tài)。后續(xù)我們分析其實現(xiàn)內(nèi)部代碼時可以更容易的看到。
Timer接口有唯一實現(xiàn)HashedWheelTimer。首先來看其構(gòu)造方法,如下:
- public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
- {
- //省略代碼,省略參數(shù)非空檢查內(nèi)容。
- wheel = createWheel(ticksPerWheel);
- mask = wheel.length - 1;
- //省略代碼,省略槽位時間范圍檢查,避免溢出以及小于 1 毫秒。
- workerThread = threadFactory.newThread(worker);
- //省略代碼,省略資源泄漏追蹤設(shè)置以及時間輪實例個數(shù)檢查
- }
mask 的設(shè)計和HashMap一樣,通過限制數(shù)組的大小為2的次方,利用位運(yùn)算來替代取模運(yùn)算,提高性能。
構(gòu)建循環(huán)數(shù)組
首先是方法createWheel,用于創(chuàng)建時間輪的核心數(shù)據(jù)結(jié)構(gòu),循環(huán)數(shù)組。來看下其方法內(nèi)容
- private static HashedWheelBucket[] createWheel(int ticksPerWheel)
- {
- //省略代碼,確認(rèn) ticksPerWheel 處于正確的區(qū)間
- //將 ticksPerWheel 規(guī)范化為 2 的次方冪大小。
- ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
- HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
- for(int i = 0; i < wheel.length; i++)
- {
- wheel[i] = new HashedWheelBucket();
- }
- return wheel;
- }
數(shù)組的長度為 2 的次方冪方便進(jìn)行求商和取余計算。
HashedWheelBucket內(nèi)部存儲著由HashedWheelTimeout節(jié)點構(gòu)成的雙向鏈表,并且存儲著鏈表的頭節(jié)點和尾結(jié)點,方便于任務(wù)的提取和插入。
新增延遲任務(wù)
方法HashedWheelTimer#newTimeout用于新增延遲任務(wù),下面來看下代碼:
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
- {
- //省略代碼,用于參數(shù)檢查
- start();
- long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
- if(delay > 0 && deadline < 0)
- {
- deadline = Long.MAX_VALUE;
- }
- HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
- timeouts.add(timeout);
- return timeout;
- }
可以看到任務(wù)并沒有直接添加到時間輪中,而是先入了一個 mpsc 隊列,我簡單說下 mpsc【多生產(chǎn)者單一消費(fèi)者隊列】 是 JCTools 中的并發(fā)隊列,用在多個生產(chǎn)者可同時訪問隊列,但只有一個消費(fèi)者會訪問隊列的情況。,采用這個模式主要出于提升并發(fā)性能考慮,因為這個隊列只有線程workerThread會進(jìn)行任務(wù)提取操作。
工作線程如何執(zhí)行
- public void run()
- {
- {//代碼塊①
- startTime = System.nanoTime();
- if(startTime == 0)
- {
- //使用startTime==0 作為線程進(jìn)入工作狀態(tài)模式標(biāo)識,因此這里重新賦值為1
- startTime = 1;
- }
- //通知外部初始化工作線程的線程,工作線程已經(jīng)啟動完畢
- startTimeInitialized.countDown();
- }
- {//代碼塊②
- do {
- final long deadline = waitForNextTick();
- if(deadline > 0)
- {
- int idx = (int)(tick & mask);
- processCancelledTasks();
- HashedWheelBucket bucket = wheel[idx];
- transferTimeoutsToBuckets();
- bucket.expireTimeouts(deadline);
- tick++;
- }
- } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
- }
- {//代碼塊③
- for(HashedWheelBucket bucket: wheel)
- {
- bucket.clearTimeouts(unprocessedTimeouts);
- }
- for(;;)
- {
- HashedWheelTimeout timeout = timeouts.poll();
- if(timeout == null)
- {
- break;
- }
- if(!timeout.isCancelled())
- {
- unprocessedTimeouts.add(timeout);
- }
- }
- processCancelledTasks();
- }
- }
看 waitForNextTick,是如何得到下一次執(zhí)行時間的。
- private long waitForNextTick()
- {
- long deadline = tickDuration * (tick + 1);//計算下一次需要檢查的時間
- for(;;)
- {
- final long currentTime = System.nanoTime() - startTime;
- long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
- if(sleepTimeMs <= 0)//說明時間已經(jīng)到了
- {
- if(currentTime == Long.MIN_VALUE)
- {
- return -Long.MAX_VALUE;
- }
- else
- {
- return currentTime;
- }
- }
- //windows 下有bug sleep 必須是10 的倍數(shù)
- if(PlatformDependent.isWindows())
- {
- sleepTimeMs = sleepTimeMs / 10 * 10;
- }
- try
- {
- Thread.sleep(sleepTimeMs);// 等待時間到來
- }
- catch(InterruptedException ignored)
- {
- if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
- {
- return Long.MIN_VALUE;
- }
- }
- }
- }
簡單的說就是通過 tickDuration 和此時已經(jīng)滴答的次數(shù)算出下一次需要檢查的時間,時候未到就sleep等著。
任務(wù)如何入槽的。
- private void transferTimeoutsToBuckets() {
- //最多處理100000 怕任務(wù)延遲
- for(int i = 0; i < 100000; ++i) {
- //從隊列里面拿出任務(wù)呢
- HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
- if (timeout == null) {
- break;
- }
- if (timeout.state() != 1) {
- long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
- //計算排在第幾輪
- timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
- long ticks = Math.max(calculated, this.tick);
- //計算放在哪個槽中
- int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
- HashedWheelTimer.HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
- //入槽,就是鏈表入隊列
- bucket.addTimeout(timeout);
- }
- }
- }
如何執(zhí)行的
- public void expireTimeouts(long deadline) {
- HashedWheelTimer.HashedWheelTimeout next;
- //拿到槽的鏈表頭部
- for(HashedWheelTimer.HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
- boolean remove = false;
- if (timeout.remainingRounds <= 0L) {//如果到這輪l
- if (timeout.deadline > deadline) {
- throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
- }
- timeout.expire();//執(zhí)行
- remove = true;
- } else if (timeout.isCancelled()) {
- remove = true;
- } else {
- --timeout.remainingRounds;//輪數(shù)-1
- }
- next = timeout.next;//繼續(xù)下一任務(wù)
- if (remove) {
- this.remove(timeout);//移除完成的任務(wù)
- }
- }
- }
就是通過輪數(shù)和時間雙重判斷,執(zhí)行完了移除任務(wù)。
小結(jié)一下
總體上看 Netty 的實現(xiàn)就是上文說的時間輪通過輪數(shù)的實現(xiàn),完全一致??梢钥闯鰰r間精度由 TickDuration 把控,并且工作線程的除了處理執(zhí)行到時的任務(wù)還做了其他操作,因此任務(wù)不一定會被精準(zhǔn)的執(zhí)行。
而且任務(wù)的執(zhí)行如果不是新起一個線程,或者將任務(wù)扔到線程池執(zhí)行,那么耗時的任務(wù)會阻塞下個任務(wù)的執(zhí)行。
并且會有很多無用的 tick 推進(jìn),例如 TickDuration 為1秒,此時就一個延遲350秒的任務(wù),那就是有349次無用的操作。出現(xiàn)空推。
但是從另一面來看,如果任務(wù)都執(zhí)行很快(當(dāng)然你也可以異步執(zhí)行),并且任務(wù)數(shù)很多,通過分批執(zhí)行,并且增刪任務(wù)的時間復(fù)雜度都是O(1)來說。時間輪還是比通過優(yōu)先隊列實現(xiàn)的延時任務(wù)來的合適些。
Kafka 中的時間輪
上面我們說到 Kafka 中的時間輪是多層次時間輪實現(xiàn),總的而言實現(xiàn)和上述說的思路一致。不過細(xì)節(jié)有些不同,并且做了點優(yōu)化。
先看看添加任務(wù)的方法。在添加的時候就設(shè)置任務(wù)執(zhí)行的絕對時間。
Kafka 中的時間輪
上面我們說到 Kafka 中的時間輪是多層次時間輪實現(xiàn),總的而言實現(xiàn)和上述說的思路一致。不過細(xì)節(jié)有些不同,并且做了點優(yōu)化。
先看看添加任務(wù)的方法。在添加的時候就設(shè)置任務(wù)執(zhí)行的絕對時間。
- def add(timerTaskEntry: TimerTaskEntry): Boolean = {
- val expiration = timerTaskEntry.expirationMs
- if (timerTaskEntry.cancelled) {
- // Cancelled
- false
- } else if (expiration < currentTime + tickMs) {
- // 如果已經(jīng)到期 返回false
- // Already expired
- false
- } else if (expiration < currentTime + interval) {//如果在本層范圍內(nèi)
- // Put in its own bucket
- val virtualId = expiration / tickMs
- val bucket = buckets((virtualId % wheelSize.toLong).toInt)//計算槽位
- bucket.add(timerTaskEntry)//添加到槽內(nèi)雙向鏈表中
- // Set the bucket expiration time
- if (bucket.setExpiration(virtualId * tickMs)) {//更新槽時間
- // The bucket needs to be enqueued because it was an expired bucket
- // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
- // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
- // will pass in the same value and hence return false, thus the bucket with the same expiration will not
- // be enqueued multiple times.
- queue.offer(bucket)//將槽加入DelayQueue,由DelayQueue來推進(jìn)執(zhí)行
- }
- true
- } else {
- //如果超過本層能表示的延遲時間,則將任務(wù)添加到上層。這里看到上層是按需創(chuàng)建的。
- // Out of the interval. Put it into the parent timer
- if (overflowWheel == null) addOverflowWheel()
- overflowWheel.add(timerTaskEntry)
- }
- }
那么時間輪是如何推動的呢?Netty 中是通過固定的時間間隔掃描,時候未到就等待來進(jìn)行時間輪的推動。上面我們分析到這樣會有空推進(jìn)的情況。
而 Kafka 就利用了空間換時間的思想,通過 DelayQueue,來保存每個槽,通過每個槽的過期時間排序。這樣擁有最早需要執(zhí)行任務(wù)的槽會有優(yōu)先獲取。如果時候未到,那么 delayQueue.poll 就會阻塞著,這樣就不會有空推進(jìn)的情況發(fā)送。
我們來看下推進(jìn)的方法。
- def advanceClock(timeoutMs: Long): Boolean = {
- //從延遲隊列中獲取槽
- var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
- if (bucket != null) {
- writeLock.lock()
- try {
- while (bucket != null) {
- // 更新每層時間輪的currentTime
- timingWheel.advanceClock(bucket.getExpiration())
- //因為更新了currentTime,進(jìn)行一波任務(wù)的重新插入,來實現(xiàn)任務(wù)時間輪的降級
- bucket.flush(reinsert)
- //獲取下一個槽
- bucket = delayQueue.poll()
- }
- } finally {
- writeLock.unlock()
- }
- true
- } else {
- false
- }
- }
- // Try to advance the clock
- def advanceClock(timeMs: Long): Unit = {
- if (timeMs >= currentTime + tickMs) {
- // 必須是tickMs 整數(shù)倍
- currentTime = timeMs - (timeMs % tickMs)
- //推動上層時間輪也更新currentTime
- // Try to advance the clock of the overflow wheel if present
- if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
- }
- }
從上面的 add 方法我們知道每次對比都是根據(jù)expiration < currentTime + interval 來進(jìn)行對比的,而advanceClock 就是用來推進(jìn)更新 currentTime 的。
小結(jié)一下
Kafka 用了多層次時間輪來實現(xiàn),并且是按需創(chuàng)建時間輪,采用任務(wù)的絕對時間來判斷延期,并且對于每個槽(槽內(nèi)存放的也是任務(wù)的雙向鏈表)都會維護(hù)一個過期時間,利用 DelayQueue 來對每個槽的過期時間排序,來進(jìn)行時間的推進(jìn),防止空推進(jìn)的存在。
每次推進(jìn)都會更新 currentTime 為當(dāng)前時間戳,當(dāng)然做了點微調(diào)使得 currentTime 是 tickMs 的整數(shù)倍。并且每次推進(jìn)都會把能降級的任務(wù)重新插入降級。
可以看到這里的 DelayQueue 的元素是每個槽,而不是任務(wù),因此數(shù)量就少很多了,這應(yīng)該是權(quán)衡了對于槽操作的延時隊列的時間復(fù)雜度與空推進(jìn)的影響。
模擬kafka的時間輪實現(xiàn)java版
定時器
- public class Timer {
- /**
- * 底層時間輪
- */
- private TimeWheel timeWheel;
- /**
- * 一個Timer只有一個delayQueue
- */
- private DelayQueue
delayQueue = new DelayQueue<>(); - /**
- * 過期任務(wù)執(zhí)行線程
- */
- private ExecutorService workerThreadPool;
- /**
- * 輪詢delayQueue獲取過期任務(wù)線程
- */
- private ExecutorService bossThreadPool;
- /**
- * 構(gòu)造函數(shù)
- */
- public Timer() {
- timeWheel = new TimeWheel(1000, 2, System.currentTimeMillis(), delayQueue);
- workerThreadPool = Executors.newFixedThreadPool(100);
- bossThreadPool = Executors.newFixedThreadPool(1);
- //20ms獲取一次過期任務(wù)
- bossThreadPool.submit(() -> {
- while (true) {
- this.advanceClock(1000);
- }
- });
- }
- /**
- * 添加任務(wù)
- */
- public void addTask(TimerTask timerTask) {
- //添加失敗任務(wù)直接執(zhí)行
- if (!timeWheel.addTask(timerTask)) {
- workerThreadPool.submit(timerTask.getTask());
- }
- }
- /**
- * 獲取過期任務(wù)
- */
- private void advanceClock(long timeout) {
- try {
- TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
- if (timerTaskList != null) {
- //推進(jìn)時間
- timeWheel.advanceClock(timerTaskList.getExpiration());
- //執(zhí)行過期任務(wù)(包含降級操作)
- timerTaskList.flush(this::addTask);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
任務(wù)
- public class TimerTask {
- /**
- * 延遲時間
- */
- private long delayMs;
- /**
- * 任務(wù)
- */
- private MyThread task;
- /**
- * 時間槽
- */
- protected TimerTaskList timerTaskList;
- /**
- * 下一個節(jié)點
- */
- protected TimerTask next;
- /**
- * 上一個節(jié)點
- */
- protected TimerTask pre;
- /**
- * 描述
- */
- public String desc;
- public TimerTask(long delayMs, MyThread task) {
- this.delayMs = System.currentTimeMillis() + delayMs;
- this.task = task;
- this.timerTaskList = null;
- this.next = null;
- this.pre = null;
- }
- public MyThread getTask() {
- return task;
- }
- public long getDelayMs() {
- return delayMs;
- }
- @Override
- public String toString() {
- return desc;
- }
- }
時間槽
- public class TimerTaskList implements Delayed {
- /**
- * 過期時間
- */
- private AtomicLong expiration = new AtomicLong(-1L);
- /**
- * 根節(jié)點
- */
- private TimerTask root = new TimerTask(-1L, null);
- {
- root.pre = root;
- root.next = root;
- }
- /**
- * 設(shè)置過期時間
- */
- public boolean setExpiration(long expire) {
- return expiration.getAndSet(expire) != expire;
- }
- /**
- * 獲取過期時間
- */
- public long getExpiration() {
- return expiration.get();
- }
- /**
- * 新增任務(wù)
- */
- public void addTask(TimerTask timerTask) {
- synchronized (this) {
- if (timerTask.timerTaskList == null) {
- timerTask.timerTaskList = this;
- TimerTask tail = root.pre;
- timerTask.next = root;
- timerTask.pre = tail;
- tail.next = timerTask;
- root.pre = timerTask;
- }
- }
- }
- /**
- * 移除任務(wù)
- */
- public void removeTask(TimerTask timerTask) {
- synchronized (this) {
- if (timerTask.timerTaskList.equals(this)) {
- timerTask.next.pre = timerTask.pre;
- timerTask.pre.next = timerTask.next;
- timerTask.timerTaskList = null;
- timerTask.next = null;
- timerTask.pre = null;
- }
- }
- }
- /**
- * 重新分配
- */
- public synchronized void flush(Consumer
flush) { - TimerTask timerTask = root.next;
- while (!timerTask.equals(root)) {
- this.removeTask(timerTask);
- flush.accept(timerTask);
- timerTask = root.next;
- }
- expiration.set(-1L);
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
- }
- @Override
- public int compareTo(Delayed o) {
- if (o instanceof TimerTaskList) {
- return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
- }
- return 0;
- }
- }
時間輪
- public class TimeWheel {
- /**
- * 一個時間槽的范圍
- */
- private long tickMs;
- /**
- * 時間輪大小
- */
- private int wheelSize;
- /**
- * 時間跨度
- */
- private long interval;
- /**
- * 時間槽
- */
- private TimerTaskList[] timerTaskLists;
- /**
- * 當(dāng)前時間
- */
- private long currentTime;
- /**
- * 上層時間輪
- */
- private volatile TimeWheel overflowWheel;
- /**
- * 一個Timer只有一個delayQueue
- */
- private DelayQueue
delayQueue; - public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue
delayQueue) { - this.currentTime = currentTime;
- this.tickMs = tickMs;
- this.wheelSize = wheelSize;
- this.interval = tickMs * wheelSize;
- this.timerTaskLists = new TimerTaskList[wheelSize];
- //currentTime為tickMs的整數(shù)倍 這里做取整操作
- this.currentTime = currentTime - (currentTime % tickMs);
- this.delayQueue = delayQueue;
- for (int i = 0; i < wheelSize; i++) {
- timerTaskLists[i] = new TimerTaskList();
- }
- }
- /**
- * 創(chuàng)建或者獲取上層時間輪
- */
- private TimeWheel getOverflowWheel() {
- if (overflowWheel == null) {
當(dāng)前題目:聊一聊時間輪的實現(xiàn)
網(wǎng)址分享:http://fisionsoft.com.cn/article/dhhpjdi.html


咨詢
建站咨詢
