本篇內(nèi)容主要講解“數(shù)據(jù)庫周期性線程池與主要源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“數(shù)據(jù)庫周期性線程池與主要源碼分析”吧!
網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、成都微信小程序、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了銅川免費(fèi)建站歡迎大家使用!
ScheduledThreadPoolExecutor:用來處理延時(shí)任務(wù)或定時(shí)任務(wù) 定時(shí)線程池類的類結(jié)構(gòu)圖
ScheduledThreadPoolExecutor接收ScheduleFutureTask類型的任務(wù),是線程池調(diào)度任務(wù)的最小單位。 它采用DelayQueue存儲(chǔ)等待的任務(wù): 1、DelayQueue內(nèi)部封裝成一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間順序,如果time相同則根絕sequenceNumber排序; 2、DelayQueue是無界隊(duì)列;
接收的參數(shù):
private final long sequenceNumber;//任務(wù)的序號(hào) private long time;//任務(wù)開始的時(shí)間 private final long period;//任務(wù)執(zhí)行的時(shí)間間隔
工作線程的的執(zhí)行過程: 工作線程會(huì)從DelayQueue取出已經(jīng)到期的任務(wù)去執(zhí)行; 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue;
ScheduledThreadPoolExecutor會(huì)把待執(zhí)行的任務(wù)放到工作隊(duì)列DelayQueue中,DelayQueue封裝了一個(gè)PriorityQueue,PriorityQueue會(huì)對隊(duì)列中的ScheduledFutureTask進(jìn)行排序,具體的排序算法實(shí)現(xiàn)如下:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; //首先按照time排序,time小的排到前面,time大的排到后面 long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; //time相同,按照sequenceNumber排序; //sequenceNumber小的排在前面,sequenceNumber大的排在后面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
接下來看看ScheduledFutureTask的run方法實(shí)現(xiàn), run方法是調(diào)度task的核心,task的執(zhí)行實(shí)際上是run方法的執(zhí)行。
public void run() { //是否是周期性的 boolean periodic = isPeriodic(); //線程池是shundown狀態(tài)不支持處理新任務(wù),直接取消任務(wù) if (!canRunInCurrentRunState(periodic)) cancel(false); //如果不需要執(zhí)行執(zhí)行周期性任務(wù),直接執(zhí)行run方法結(jié)束 else if (!periodic) ScheduledFutureTask.super.run(); //如果需要周期性執(zhí)行,則在執(zhí)行任務(wù)完成后,設(shè)置下一次執(zhí)行時(shí)間 else if (ScheduledFutureTask.super.runAndReset()) { //設(shè)置下一次執(zhí)行該任務(wù)的時(shí)間 setNextRunTime(); //重復(fù)執(zhí)行該任務(wù) reExecutePeriodic(outerTask); } }
run方法的執(zhí)行步驟:
1、如果線程池是shundown狀態(tài)不支持處理新任務(wù),直接取消任務(wù),否則步驟2;
2、如果不是周期性任務(wù),直接調(diào)用ScheduledFutureTask的run方法執(zhí)行,會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則步驟3;
3、如果是周期性任務(wù),調(diào)用ScheduledFutureTask的runAndset方法執(zhí)行,不會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4和步驟5;
4、計(jì)算下一次執(zhí)行該任務(wù)的時(shí)間;
5、重復(fù)執(zhí)行該任務(wù);
接下來看下reExecutePeriodic方法的執(zhí)行步驟:
void reExecutePeriodic(RunnableScheduledFuture> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
由于已經(jīng)執(zhí)行過一次周期性任務(wù),所以不會(huì)reject當(dāng)前任務(wù),同時(shí)傳入的任務(wù)一定是周期性任務(wù)。
周期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和源碼兩個(gè)方面進(jìn)行說明,首先是如果提交任務(wù):
pool.schedule(new Runnable() { @Override public void run() { System.out.println("延遲執(zhí)行"); } },1, TimeUnit.SECONDS); /** * 這個(gè)執(zhí)行周期是固定,不管任務(wù)執(zhí)行多長時(shí)間,每過3秒中就會(huì)產(chǎn)生一個(gè)新的任務(wù) */ pool.scheduleAtFixedRate(new Runnable() { @Override public void run() { //這個(gè)業(yè)務(wù)邏輯需要很長的時(shí)間,超過了3秒 System.out.println("重復(fù)執(zhí)行"); } },1,3,TimeUnit.SECONDS); pool.shutdown(); /** * 假如run方法30min后執(zhí)行完成,然后間隔3秒,再周期性執(zhí)行下一個(gè)任務(wù) */ pool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //30min System.out.println("重復(fù)執(zhí)行"); } },1,3,TimeUnit.SECONDS);
知道了如何提交周期性任務(wù),接下來源碼是如何執(zhí)行的,首先是schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā),只會(huì)執(zhí)行一次。
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //把任務(wù)封裝成ScheduledFutureTask,之后調(diào)用decorateTask進(jìn)行包裝; //decorateTask方法是空方法,留給用戶去實(shí)現(xiàn)的; RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); //包裝好任務(wù)之后,進(jìn)行任務(wù)的提交 delayedExecute(t); return t; }
任務(wù)提交方法:
private void delayedExecute(RunnableScheduledFuture> task) { //如果線程池不是RUNNING狀態(tài),則使用拒絕策略把提交任務(wù)拒絕掉 if (isShutdown()) reject(task); else { //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列 super.getQueue().add(task); //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //和ThreadPoolExecutor不一樣,corePoolSize沒有達(dá)到會(huì)增加Worker; //增加Worker,確保提交的任務(wù)能夠被執(zhí)行 ensurePrestart(); } }
到此,相信大家對“數(shù)據(jù)庫周期性線程池與主要源碼分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!