前言
ScheduledThreadPoolExecutor
可以用來很方便實現(xiàn)我們的調(diào)度任務(wù),具體使用可以參考調(diào)度線程池ScheduledThreadPoolExecutor的正確使用姿勢這篇文章,那大家知道它是怎么實現(xiàn)的嗎,本文就帶大家來揭曉謎底。
實現(xiàn)機制分析
我們先思考下,如果讓大家去實現(xiàn)ScheduledThreadPoolExecutor
可以周期性執(zhí)行任務(wù)的功能,需要考慮哪些方面呢?
ScheduledThreadPoolExecutor
的整體實現(xiàn)思路是什么呢?
答:我們是不是可以繼承線程池類,按照線程池的思路,將任務(wù)先丟到阻塞隊列中,等到時間到了,工作線程就從阻塞隊列獲取任務(wù)執(zhí)行。
- 如何實現(xiàn)等到了未來的時間點就開始執(zhí)行呢?
答:我們可以根據(jù)參數(shù)獲取這個任務(wù)還要多少時間執(zhí)行,那么我們是不是可以從阻塞隊列中獲取任務(wù)的時候,通過條件隊列的的awaitNanos(delay)
方法,阻塞一定時間。
- 如何實現(xiàn) 任務(wù)的重復(fù)性執(zhí)行呢?
答:這就更加簡單了,任務(wù)執(zhí)行完成后,把它再次加入到隊列不就行了嗎。
源碼解析
類結(jié)構(gòu)圖
ScheduledThreadPoolExecutor
的類結(jié)構(gòu)圖如上圖所示,很明顯它是在我們的線程池ThreadPoolExecutor
框架基礎(chǔ)上擴展的。
ScheduledExecutorService
:實現(xiàn)了該接口,封裝了調(diào)度相關(guān)的APIThreadPoolExecutor
:繼承了該類,保留了線程池的能力和整個實現(xiàn)的框架DelayedWorkQueue
:內(nèi)部類,延遲阻塞隊列。ScheduledFutureTask
:延遲任務(wù)對象,包含了任務(wù)、任務(wù)狀態(tài)、剩余的時間、結(jié)果等信息。
重要屬性
通過ScheduledThreadPoolExecutor
類的成員屬性,我們可以了解它的數(shù)據(jù)結(jié)構(gòu)。
shutdown
后是否繼續(xù)執(zhí)行周期任務(wù)(重復(fù)執(zhí)行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
shutdown
后是否繼續(xù)執(zhí)行延遲任務(wù)(只執(zhí)行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
- 調(diào)用
cancel()
方法后,是否將該任務(wù)從隊列中移除,默認(rèn)false
private volatile boolean removeOnCancel = false;
- 任務(wù)的序列號,保證FIFO隊列的順序,用來比較優(yōu)先級
private static final AtomicLong sequencer = new AtomicLong()
ScheduledFutureTask
延遲任務(wù)類
ScheduledFutureTask
繼承FutureTask
,實現(xiàn)RunnableScheduledFuture
接口,無論是runnable
還是callable
,無論是否需要延遲和定時,所有的任務(wù)都會被封裝成ScheduledFutureTask
。- 該類具有延遲執(zhí)行的特點, 覆蓋
FutureTask
的run
方法來實現(xiàn)對延時執(zhí)行、周期執(zhí)行的支持。 - 對于延時任務(wù)調(diào)用
FutureTask#run
,而對于周期性任務(wù)則調(diào)用FutureTask#runAndReset
并且在成功之后根據(jù)fixed-delay/fixed-rate
模式來設(shè)置下次執(zhí)行時間并重新將任務(wù)塞到工作隊列。 - 成員屬性如下:
// 任務(wù)序列號
private final long sequenceNumber;
// 任務(wù)可以被執(zhí)行的時間,交付時間,以納秒表示
private long time;
// 0 表示非周期任務(wù)
// 正數(shù)表示 fixed-rate(兩次開始啟動的間隔)模式的周期,
// 負數(shù)表示 fixed-delay(一次執(zhí)行結(jié)束到下一次開始啟動) 模式
private final long period;
// 執(zhí)行的任務(wù)對象
RunnableScheduledFuture
DelayedWorkQueue
延遲隊列
DelayedWorkQueue
是支持延時獲取元素的阻塞隊列, 內(nèi)部采用優(yōu)先隊列 PriorityQueue(小根堆、滿二叉樹)存儲元素。- 內(nèi)部數(shù)據(jù)結(jié)構(gòu)是數(shù)組,所以延遲隊列出隊頭元素后需要讓其他元素(尾)替換到頭節(jié)點,防止空指針異常。
- 成員屬性如下:
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 節(jié)點數(shù)量
private int size = 0;
// 存放任務(wù)的數(shù)組
private RunnableScheduledFuture?[] queue =
new RunnableScheduledFuture?[INITIAL_CAPACITY];
// 控制并發(fā)用的鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件隊列
private final Condition available = lock.newCondition();
//指定用于等待隊列頭節(jié)點任務(wù)的線程
private Thread leader = null;
提交延遲任務(wù)schedule()
原理
延遲執(zhí)行方法,并指定延遲執(zhí)行的時間,只會執(zhí)行一次。
schedule()
方法是延遲任務(wù)方法的入口。
public ScheduledFuture? schedule(Runnable command,
long delay,
TimeUnit unit) {
// 判空處理
if (command == null || unit == null)
throw new NullPointerException();
// 將外部傳入的任務(wù)封裝成延遲任務(wù)對象ScheduledFutureTask
RunnableScheduledFuture? t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 執(zhí)行延遲任務(wù)
delayedExecute(t);
return t;
}
decorateTask(...)
該方法是封裝延遲任務(wù)
- 調(diào)用
triggerTime(delay, unit)
方法計算延遲的時間。
// 返回【當(dāng)前時間 + 延遲時間】,就是觸發(fā)當(dāng)前任務(wù)執(zhí)行的時間
private long triggerTime(long delay, TimeUnit unit) {
// 設(shè)置觸發(fā)的時間
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,則下次執(zhí)行時間為當(dāng)前時間 +delay
// 否則為了避免隊列中出現(xiàn)由于溢出導(dǎo)致的排序紊亂,需要調(diào)用overflowFree來修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 下面這種情況很少,大家看不懂可以不用強行理解
// 如果某個任務(wù)的 delay 為負數(shù),說明當(dāng)前可以執(zhí)行(其實早該執(zhí)行了)。
// 阻塞隊列中維護任務(wù)順序是基于 compareTo 比較的,比較兩個任務(wù)的順序會用 time 相減。
// 那么可能出現(xiàn)一個 delay 為正數(shù)減去另一個為負數(shù)的 delay,結(jié)果上溢為負數(shù),則會導(dǎo)致 compareTo 產(chǎn)生錯誤的結(jié)果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判斷一下隊首的delay是不是負數(shù),如果是正數(shù)就不用管,怎么減都不會溢出
// 否則拿當(dāng)前 delay 減去隊首的 delay 來比較看,如果不出現(xiàn)上溢,排序不會亂
// 不然就把當(dāng)前 delay 值給調(diào)整為 Long.MAX_VALUE + 隊首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
- 調(diào)用
RunnableScheduledFuture
的構(gòu)造方法封裝為延遲任務(wù)
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任務(wù)的觸發(fā)時間
this.time = ns;
// 任務(wù)的周期, 延遲任務(wù)的為0,因為不需要重復(fù)執(zhí)行
this.period = 0;
// 任務(wù)的序號 + 1
this.sequenceNumber = sequencer.getAndIncrement();
}
- 調(diào)用
decorateTask()
方法裝飾延遲任務(wù)
// 沒有做任何操作,直接將 task 返回,該方法主要目的是用于子類擴展
protected
提交周期任務(wù)scheduleAtFixedRate()
原理
按照固定的頻率周期性的執(zhí)行任務(wù),捕手renwu,一次任務(wù)的啟動到下一次任務(wù)的啟動的間隔
public ScheduledFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任務(wù)封裝,【指定初始的延遲時間和周期時間】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默認(rèn)返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執(zhí)行這個任務(wù)
delayedExecute(t);
return t;
}
提交周期任務(wù)scheduleWithFixedDelay()
原理
按照指定的延時周期性執(zhí)行任務(wù),上一個任務(wù)執(zhí)行完畢后,延時一定時間,再次執(zhí)行任務(wù)。
public ScheduledFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任務(wù)封裝,【指定初始的延遲時間和周期時間】,周期時間為 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開始執(zhí)行這個任務(wù)
delayedExecute(t);
return t;
}
執(zhí)行任務(wù)delayedExecute(t)
原理
上面多種提交任務(wù)的方式,殊途同歸,最終都會調(diào)用delayedExecute()
方法執(zhí)行延遲或者周期任務(wù)。
delayedExecute()
方法是執(zhí)行延遲任務(wù)的入口
private void delayedExecute(RunnableScheduledFuture? task) {
// 線程池是 SHUTDOWN 狀態(tài),執(zhí)行拒絕策略
if (isShutdown())
// 調(diào)用拒絕策略的方法
reject(task);
else {
// 把當(dāng)前任務(wù)放入阻塞隊列
super.getQueue().add(task);
// 線程池狀態(tài)為 SHUTDOWN 并且不允許執(zhí)行任務(wù)了,就從隊列刪除該任務(wù),并設(shè)置任務(wù)的狀態(tài)為取消狀態(tài)
// 非主流程,可以跳過,不重點看了
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 開始執(zhí)行了哈
ensurePrestart();
}
}
ensurePrestart()
方法開啟線程執(zhí)行
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker數(shù)目小于corePoolSize,則添加一個worker。
if (wc < corePoolSize)
// 第二個參數(shù) true 表示采用核心線程數(shù)量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情況,至少開啟一個線程,【擔(dān)保機制】
else if (wc == 0)
addWorker(null, false);
}
addWorker()
方法實際上父類ThreadPoolExecutor
的方法,這個方法在該文章 Java線程池源碼深度解析中詳細介紹過,這邊做個總結(jié):
- 如果線程池中工作線程數(shù)量小于最大線程數(shù),創(chuàng)建工作線程,執(zhí)行任務(wù)。
- 如果線程池中工作線程數(shù)量大于最大線程數(shù),直接返回。
獲取延遲任務(wù)take()原理
目前工作線程已經(jīng)創(chuàng)建好了,工作線程開始工作了,它會從阻塞隊列中獲取延遲任務(wù)執(zhí)行,這部分也是線程池里面的原理,不做展開,那我們看下它是如何實現(xiàn)延遲執(zhí)行的? 主要關(guān)注如何從阻塞隊列中獲取任務(wù)。
DelayedWorkQueue#take()
方法獲取延遲任務(wù)
- 該方法會在上面的
addWoker()
方法創(chuàng)建工作線程后,工作線程中循環(huán)持續(xù)調(diào)用workQueue.take()
方法獲取延遲任務(wù)。 - 該方法主要獲取延遲隊列中任務(wù)延遲時間小于等于0 的任務(wù)。
- 如果延遲時間不小于0,那么調(diào)用條件隊列的
awaitNanos(delay)
阻塞方法等待一段時間,等時間到了,延遲時間自然小于等于0了。 - 獲取到任務(wù)后,工作線程就可以開始執(zhí)行調(diào)度任務(wù)了。
// DelayedWorkQueue#take()
public RunnableScheduledFuture? take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加可中斷鎖
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 獲取阻塞隊列中的頭結(jié)點
RunnableScheduledFuture? first = queue[0];
// 如果阻塞隊列沒有數(shù)據(jù),為空
if (first == null)
// 等待隊列不空,直至有任務(wù)通過 offer 入隊并喚醒
available.await();
else {
// 獲取頭節(jié)點的的任務(wù)還剩余多少時間才執(zhí)行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到達觸發(fā)時間,獲取頭節(jié)點并調(diào)整堆,重新選擇延遲時間最小的節(jié)點放入頭部
return finishPoll(first);
// 邏輯到這說明頭節(jié)點的延遲時間還沒到
first = null;
// 說明有 leader 線程在等待獲取頭節(jié)點,當(dāng)前線程直接去阻塞等待
if (leader != null)
// 當(dāng)前線程阻塞
available.await();
else {
// 沒有 leader 線程,【當(dāng)前線程作為leader線程,并設(shè)置頭結(jié)點的延遲時間作為阻塞時間】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當(dāng)前線程通過awaitNanos方法等待delay時間后,會自動喚醒,往后面繼續(xù)執(zhí)行
available.awaitNanos(delay);
// 到達阻塞時間時,當(dāng)前線程會從這里醒來,進入下一輪循環(huán),就有可能執(zhí)行了
} finally {
// t堆頂更新,leader 置為 null,offer 方法釋放鎖后,
// 有其它線程通過 take/poll 拿到鎖,讀到 leader == null,然后將自身更新為leader。
if (leader == thisThread)
// leader 置為 null 用以接下來判斷是否需要喚醒后繼線程
leader = null;
}
}
}
}
} finally {
// 沒有 leader 線程并且頭結(jié)點不為 null,喚醒阻塞獲取頭節(jié)點的線程,
// 【如果沒有這一步,就會出現(xiàn)有了需要執(zhí)行的任務(wù),但是沒有線程去執(zhí)行】
if (leader == null && queue[0] != null)
available.signal();
// 解鎖
lock.unlock();
}
}
finishPoll()
方法獲取到任務(wù)后執(zhí)行
該方法主要做兩個事情, 獲取頭節(jié)點并調(diào)整堆,重新選擇延遲時間最小的節(jié)點放入頭部。
private RunnableScheduledFuture?</span> finishPoll(RunnableScheduledFuture?span> f) {
// 獲取尾索引
int s = --size;
// 獲取尾節(jié)點
RunnableScheduledFuture? x = queue[s];
// 將堆結(jié)構(gòu)最后一個節(jié)點占用的 slot 設(shè)置為 null,因為該節(jié)點要嘗試升級成堆頂,會根據(jù)特性下調(diào)
queue[s] = null;
// s == 0 說明 當(dāng)前堆結(jié)構(gòu)只有堆頂一個節(jié)點,此時不需要做任何的事情
if (s != 0)
// 從索引處 0 開始向下調(diào)整
siftDown(0, x);
// 出隊的元素索引設(shè)置為 -1
setIndex(f, -1);
return f;
}
延遲任務(wù)運行的原理
從延遲隊列中獲取任務(wù)后,工作線程會調(diào)用延遲任務(wù)的run()方法執(zhí)行任務(wù)。
ScheduledFutureTask#run()
方法運行任務(wù)
- 調(diào)用
isPeriodic()
方法判斷任務(wù)是否是周期性任務(wù)還是非周期性任務(wù) - 如果任務(wù)是非周期任務(wù),就調(diào)用父類的
FutureTask#run()
執(zhí)行一次 - 如果任務(wù)是非周期任務(wù),就調(diào)用父類的
FutureTask#runAndReset()
, 返回true會設(shè)置下一次的執(zhí)行時間,重新放入線程池的阻塞隊列中,等待下次獲取執(zhí)行
public void run() {
// 是否周期性,就是判斷 period 是否為 0
boolean periodic = isPeriodic();
// 根據(jù)是否是周期任務(wù)檢查當(dāng)前狀態(tài)能否執(zhí)行任務(wù),不能執(zhí)行就取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任務(wù),直接調(diào)用 FutureTask#run 執(zhí)行一次
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任務(wù)的執(zhí)行,返回 true 表示執(zhí)行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置周期任務(wù)的下一次執(zhí)行時間
setNextRunTime();
// 任務(wù)的下一次執(zhí)行安排,如果當(dāng)前線程池狀態(tài)可以執(zhí)行周期任務(wù),加入隊列,并開啟新線程
reExecutePeriodic(outerTask);
}
}
FutureTask#runAndReset()
執(zhí)行周期性任務(wù)
- 周期任務(wù)正常完成后任務(wù)的狀態(tài)不會變化,依舊是 NEW,不會設(shè)置 outcome 屬性。
- 但是如果本次任務(wù)執(zhí)行出現(xiàn)異常,會進入 setException 方法將任務(wù)狀態(tài)置為異常,把異常保存在 outcome 中。
- 方法返回 false,后續(xù)的該任務(wù)將不會再周期的執(zhí)行
protected boolean runAndReset() {
// 任務(wù)不是新建的狀態(tài)了,或者被別的線程執(zhí)行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable
ScheduledFutureTask#setNextRunTime()
設(shè)置下次執(zhí)行時間
- 如果屬性period大于0,表示
fixed-rate
模式,直接加上period時間即可。 - 如果屬性period小于等于0, 表示是
fixed-delay
模式, 調(diào)用triggerTime重新計算下次時間。
// 任務(wù)下一次的觸發(fā)時間
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【時間設(shè)置為上一次執(zhí)行任務(wù)的時間 + p】,兩次任務(wù)執(zhí)行的時間差
time += p;
else
// fixed-delay 模式,下一次執(zhí)行時間是【當(dāng)前這次任務(wù)結(jié)束的時間(就是現(xiàn)在) + delay 值】
time = triggerTime(-p);
}
ScheduledFutureTask#reExecutePeriodic()
,重新放入阻塞任務(wù)隊列,等待獲取,進行下一輪執(zhí)行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture? task) {
if (canRunInCurrentRunState(true)) {
// 【放入任務(wù)隊列】
super.getQueue().add(task);
// 如果提交完任務(wù)之后,線程池狀態(tài)變?yōu)榱?shutdown 狀態(tài),需要再次檢查是否可以執(zhí)行,
// 如果不能執(zhí)行且任務(wù)還在隊列中未被取走,則取消任務(wù)
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 當(dāng)前線程池狀態(tài)可以執(zhí)行周期任務(wù),加入隊列,并【根據(jù)線程數(shù)量是否大于核心線程數(shù)確定是否開啟新線程】
ensurePrestart();
}
}
-
線程池
+關(guān)注
關(guān)注
0文章
57瀏覽量
6868
發(fā)布評論請先 登錄
相關(guān)推薦
評論