這篇文章主要講解了“HashedWheelTimer的作用是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“HashedWheelTimer的作用是什么”吧!
成都創(chuàng)新互聯(lián)公司是一家專注于成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)與策劃設(shè)計(jì),湯旺網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:湯旺等地區(qū)。湯旺做網(wǎng)站價(jià)格咨詢:18980820575
和同事討論一個(gè)定時(shí)審核的需求,運(yùn)營(yíng)設(shè)定審核通過的時(shí)間,到了這個(gè)時(shí)間之后,相關(guān)內(nèi)容自動(dòng)審核通過,本是個(gè)小的需求,但是考慮到如果需要定時(shí)審核的東西很多,這樣大量的定時(shí)任務(wù)帶來的一系列問題,然后逐步的討論到了netty的HashedWheelTimer這個(gè)的實(shí)現(xiàn)。
把所有需要定時(shí)審核的資源放到redis中,例如sorted set中,需要審核通過的時(shí)間作為score值。后臺(tái)啟動(dòng)一個(gè)定時(shí)器,定時(shí)輪詢sortedSet,當(dāng)score值小于當(dāng)前時(shí)間,則運(yùn)行任務(wù)審核通過。
這個(gè)方案在小批量數(shù)據(jù)的情況下沒有問題,但是在大批量任務(wù)的情況下就會(huì)出現(xiàn)問題了,因?yàn)槊看味家喸內(nèi)康臄?shù)據(jù),逐個(gè)判斷是否需要執(zhí)行,一旦輪詢?nèi)蝿?wù)執(zhí)行比較長(zhǎng),就會(huì)出現(xiàn)任務(wù)無法按照定時(shí)的時(shí)間執(zhí)行的問題。
每個(gè)需要定時(shí)完成的任務(wù)都啟動(dòng)一個(gè)定時(shí)任務(wù),然后等待完成之后銷毀
這個(gè)方案帶來的問題很明顯,定時(shí)任務(wù)比較多的情況下,會(huì)啟動(dòng)很多的線程,這樣服務(wù)器會(huì)承受不了之后崩潰。基本上不會(huì)采取這個(gè)方案。
和方案一類似,針對(duì)每一個(gè)需要定時(shí)審核的任務(wù),設(shè)定過期時(shí)間,過期時(shí)間也就是審核通過的時(shí)間,訂閱redis的過期事件,當(dāng)這個(gè)事件發(fā)生時(shí),執(zhí)行相應(yīng)的審核通過任務(wù)。
這個(gè)方案來說是借用了redis這種中間件來實(shí)現(xiàn)我們的功能,這中實(shí)際上屬于redis的發(fā)布訂閱功能中的一部分,針對(duì)redis發(fā)布訂閱功能是不推薦我們?cè)谏a(chǎn)環(huán)境中做業(yè)務(wù)操作的,通常redis內(nèi)部(例如redis集群節(jié)點(diǎn)上下線,選舉等等來使用),我們業(yè)務(wù)系統(tǒng)使用它的這個(gè)事件會(huì)產(chǎn)生如下兩個(gè)問題 1、redis發(fā)布訂閱的不穩(wěn)定問題 2、redid發(fā)布訂閱的可靠性問題 具體可以參考 https://my.oschina.net/u/2457218/blog/3065021 (redis的發(fā)布訂閱缺陷)
也許你和我一樣都是第一次聽說這個(gè)東西,這個(gè)東西就是專為大批量定時(shí)任務(wù)管理而生。具體論文詳見參考文獻(xiàn)[2]
簡(jiǎn)要的說這個(gè)是一個(gè)輪,里面有指針,指針會(huì)根據(jù)設(shè)定的時(shí)間單位旋轉(zhuǎn),任務(wù)根據(jù)一些算法會(huì)落在相應(yīng)的槽位上。如下圖
首先會(huì)有一個(gè)輪,這個(gè)輪在這里分成了8個(gè)槽位,任務(wù)任務(wù)添加的時(shí)候會(huì)根據(jù)相應(yīng)的算法對(duì)槽位個(gè)數(shù)取模,得到任務(wù)會(huì)存儲(chǔ)在具體哪個(gè)槽位,每個(gè)槽位是一個(gè)鏈表結(jié)構(gòu),任務(wù)存儲(chǔ)了任務(wù)的過期時(shí)間(任務(wù)執(zhí)行時(shí)間),任務(wù)需要執(zhí)行需要指針轉(zhuǎn)的輪數(shù),指針(tick)每間隔一個(gè)單位的時(shí)間會(huì)往下走一個(gè)槽位,然后會(huì)查詢這個(gè)槽位上的存儲(chǔ)的任務(wù),并且任務(wù)的存儲(chǔ)的剩余輪數(shù)會(huì)減一,當(dāng)剩余輪數(shù)小于等于零時(shí),就會(huì)開始執(zhí)行這個(gè)任務(wù),執(zhí)行之后會(huì)把任務(wù)從這個(gè)槽位上給刪除掉。
例如上圖: 槽位為8個(gè)槽位 Bucket 指針每個(gè)時(shí)間間隔(100ms)會(huì)往下走一個(gè)槽位,這個(gè)時(shí)間間隔叫做tickDuration 那相當(dāng)于每隔8*100ms=800ms,會(huì)輪詢一圈。
算法理解起來比較簡(jiǎn)單,并且也有成熟的實(shí)現(xiàn),那就是在netty中有一個(gè)HashedWheelTimer這個(gè)類,把這個(gè)算法實(shí)現(xiàn)了出來。接下來分析分析一下它的這個(gè)代碼。
在這個(gè)類上定義的有幾個(gè)比較重要的屬性
/** *這個(gè)work是一個(gè)內(nèi)部類,實(shí)現(xiàn)了Runable接口,是比較核心的一個(gè)類,包裝了具體任務(wù)的運(yùn)行,把任務(wù)放到具體如何放到某個(gè)槽位上,指針往下走的具體方法,任務(wù)取消等。 */ private final Worker worker = new Worker(); /** *工作線程,這個(gè)就是整個(gè)HashedWheelTimer啟動(dòng)的起點(diǎn) */ private final Thread workerThread; /** *當(dāng)前任務(wù)的狀態(tài),1代表任務(wù)已經(jīng)開始執(zhí)行,0任務(wù)初始化,2任務(wù)已關(guān)閉 */ public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; /** *這個(gè)很核心的一個(gè)概念,就是指針往下走的單位,在HashedWheelTimer這個(gè)類中,默認(rèn)是100ms指針往下走一個(gè)單位 */ private final long tickDuration; /** * 這個(gè)就是指的時(shí)間輪,有多少個(gè)槽位,wheel的大小就是多大,HashedWheelTimer中默認(rèn)槽位有512個(gè) */ private final HashedWheelBucket[] wheel; /** * 主要輔助計(jì)算任務(wù)會(huì)存儲(chǔ)在哪個(gè)槽位上,mask =wheel.length-1 */ private final int mask; /** *所有要執(zhí)行的任務(wù)的任務(wù)隊(duì)列 */ private final Queuetimeouts = PlatformDependent.newMpscQueue(); /** *所有要取消的任務(wù)的任務(wù)隊(duì)列 */ private final Queue cancelledTimeouts = PlatformDependent.newMpscQueue(); /** *HashedWheelTimer實(shí)例開始運(yùn)行的時(shí)間,是納秒數(shù),開始時(shí)間是System.nanotime() */ private volatile long startTime;
這些屬性的定義和概念映射到上面時(shí)間輪算法上就是下圖的樣子了。
HashedWheelTimer初始化主要是在它的構(gòu)造函數(shù)中,提供了多種重載方式,只需要看最全的構(gòu)造函數(shù)即可。
/** * Creates a new timer. * @param threadFactory 執(zhí)行任務(wù)的工廠 * @param tickDuration 指針往下走一步的時(shí)間間隔 * @param unit 指針往下走一步的時(shí)間單位,秒,毫秒。納秒等 * @param ticksPerWheel 時(shí)間輪的大小,也就是槽位的個(gè)數(shù) */ public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { /** * 先校驗(yàn)參數(shù)的合法性,對(duì)threadFactory,時(shí)間單位,時(shí)間間隔,時(shí)間輪大小做了限制 */ if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // 創(chuàng)建槽位,實(shí)際上就是初始化HashedWheelBucket數(shù)組,直接new出來的 wheel = createWheel(ticksPerWheel); //用來計(jì)算槽位的輔助變量,一會(huì)兒會(huì)在Worker中尋找槽位時(shí)使用到 mask = wheel.length - 1; ... //初始化線程,是用threadFactory創(chuàng)建出來的一個(gè)worker線程 workerThread = threadFactory.newThread(worker); ... }
當(dāng)需要添加一個(gè)定時(shí)任務(wù)的時(shí)候,是通過newTimeout
方法添加的,添加的任務(wù)必須實(shí)現(xiàn)TimerTask
接口的run方法。任務(wù)添加之后,無需顯式的開啟任務(wù),添加之后任務(wù)會(huì)自動(dòng)開啟,等到了執(zhí)行的時(shí)間會(huì)被自動(dòng)執(zhí)行??蛻舳耸褂玫姆绞饺缦拢?/p>
@Test public void testRun() throws Exception{ final CountDownLatch latch = new CountDownLatch(1); HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(); hashedWheelTimer.newTimeout(timeout -> { System.out.println("hello world"); latch.countDown(); }, 5, TimeUnit.SECONDS); latch.await(); System.out.println("執(zhí)行結(jié)束"); }
5秒鐘之后會(huì)被輸出"hello world",然后任務(wù)執(zhí)行完畢。既然任務(wù)的添加和執(zhí)行入口都是通過newTimeout
這個(gè)方法搞定的,那就看一下這個(gè)方法里面有哪些秘密吧。
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { ... start(); ... /** * 可以看到任務(wù)存活的時(shí)間計(jì)算,當(dāng)前時(shí)間的毫秒數(shù)加上我們?cè)O(shè)定的時(shí)間,然后減去程序開始執(zhí)行的時(shí)間。這是一個(gè)時(shí)間段 */ long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
進(jìn)去看了之后,這個(gè)方法很簡(jiǎn)單,有兩個(gè)關(guān)鍵的方法調(diào)用 1、start(),這個(gè)方法主要是看當(dāng)前HashedWheelTimer
的狀態(tài)是否已經(jīng)啟動(dòng),如果沒有啟動(dòng)則會(huì)調(diào)用workThread線程的啟動(dòng)方法。2、計(jì)算超時(shí)時(shí)間和任務(wù)添加。我們傳進(jìn)來的任務(wù)會(huì)被包裝成一個(gè)HashWheelTimeout這個(gè)類,包裝之后會(huì)把這個(gè)包裝類放到timeouts這個(gè)阻塞隊(duì)列中去,實(shí)際上這時(shí)候任務(wù)并沒有放到某個(gè)具體槽位中,只是先放到阻塞隊(duì)列中,等待work從這個(gè)隊(duì)列中取值然后放到具體的槽位上,HashedWheelTimer
是一個(gè)雙向鏈表,上面圖中已經(jīng)有這個(gè)類的類圖結(jié)構(gòu),再貼一次:
我們傳進(jìn)來的任務(wù)就是它的task屬性,然后會(huì)根據(jù)當(dāng)前時(shí)間、過期時(shí)間和任務(wù)開始時(shí)間計(jì)算出它的deadline,同事計(jì)算出它剩余的輪數(shù)(remainingRounds)。
任務(wù)執(zhí)行實(shí)際上是調(diào)用的它的expire方法。當(dāng)expire的時(shí)候會(huì)調(diào)用具體的業(yè)務(wù)任務(wù)的run方法。
那HashedWheelTimer
的expire方法是什么時(shí)候被執(zhí)行的呢。上面也也說到在HashedWheelTimer中有一個(gè)workThread,這里面會(huì)運(yùn)行work。能讀到這個(gè)地方來的人應(yīng)該很少了吧,不過能到這個(gè)地方你是幸運(yùn)的,因?yàn)閣ork這個(gè)類也就是實(shí)現(xiàn)這個(gè)算法中最核心的一個(gè)類了,先來概覽一下這個(gè)類
這個(gè)類實(shí)現(xiàn)了Runable接口,也就說是一個(gè)線程類,然后它會(huì)被workTread調(diào)用執(zhí)行啟動(dòng)。
transferTimeoutsToBuckets 把新加入的定時(shí)任務(wù)從阻塞隊(duì)列中取出然后放到相應(yīng)的bucket中
processCancelledTasks 把取消的定時(shí)任務(wù)從阻塞隊(duì)列中取出,然后從相應(yīng)的bucket中remove掉
waitForNextTick 指針往下走的方法,經(jīng)過一個(gè)時(shí)間單位,指針會(huì)往下走,指向下一個(gè)bucket
run方法會(huì)一直循環(huán)從阻塞隊(duì)列中取值,然后放到bucket中,指針循環(huán)往下走,對(duì)remainderRounds對(duì)于0的任務(wù)進(jìn)行執(zhí)行,不是0的減一
do { /** * 里面是一個(gè)Thread.sleep操作,模擬指針一步一步往下走的操作。 */ final long deadline = waitForNextTick(); if (deadline > 0) { /** * 計(jì)算任務(wù)將要落到槽位,這本應(yīng)該是個(gè)取模運(yùn)算,不過這里用了一個(gè)小技巧,就是把取模運(yùn)算換為了“按位與”,因?yàn)椤鞍次慌c”要比取模運(yùn)算快的多, * 這個(gè)技巧就是當(dāng)mast的值為2的n次方-1時(shí),能達(dá)到取模的效果。這里要感謝一下王洪濤的分享 */ int idx = (int) (tick & mask); processCancelledTasks(); //取到具體bucket,然后把任務(wù)放從阻塞隊(duì)列中拿到,放到bucket中 HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); //這里面會(huì)調(diào)用所有HashedWheelTimeout的方法,就是看他的剩余的輪數(shù)是不是大于0,如果是的話則會(huì)被執(zhí)行,不是的話剩余輪數(shù)減1 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
當(dāng)然HashedWheelTimer這個(gè)類屬于全內(nèi)存任務(wù)計(jì)算,通常在我們真正的業(yè)務(wù)中,是不會(huì)把這些任務(wù)直接放到j(luò)vm內(nèi)存中的,要不然重啟之后任務(wù)不都會(huì)消失了么,這樣我們需要重寫HashedWheelTimer
,只需要對(duì)它任務(wù)的添加和獲取進(jìn)行重寫到相應(yīng)的持久化中間件中即可(例如數(shù)據(jù)庫(kù)或者es等等)
[1][redis的發(fā)布訂閱缺陷]
[[2]][Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil] [Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil]: http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf "Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil"
[[3]][Hashed and Hierarchical Timing Wheels] [Hashed and Hierarchical Timing Wheels]: http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt "Hashed and Hierarchical Timing Wheels"
感謝各位的閱讀,以上就是“HashedWheelTimer的作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)HashedWheelTimer的作用是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!