一、CountDownLatch:
1、什么是 CountDownLatch:
CountDownLatch,閉鎖,就是一個基于 AQS 共享模式的同步計數(shù)器,它內(nèi)部的方法都是圍繞 AQS 實現(xiàn)的。主要作用是使一個或一組線程在其他線程執(zhí)行完畢之前,一直處于等待狀態(tài),直到其他線程執(zhí)行完成后再繼續(xù)執(zhí)行。
CountDownLatch 利用 AQS 的 state 變量充當計數(shù)器(由 volatile 修飾并使用 CAS 進行更新的),計數(shù)器的初始值就是線程的數(shù)量,每當一個線程執(zhí)行完成,計數(shù)器的值就會減一,當計數(shù)器的值為 0 時,表示所有的線程都已經(jīng)完成任務了,那么接下來就喚醒在 CountDownLatch 上等待的線程執(zhí)行后面的任務。
那么當計數(shù)器的值為 0 時,主線程是如何被喚醒的呢?這就要從 CountDownLatch 的工作流程來說明了,CountDownLatch 的工作流程可以看成在一開始只在 CLH 隊列中放入一個主線程,然后不停的喚醒,喚醒之后如果發(fā)現(xiàn) state 還是不為0,則繼續(xù)等待。而主線程什么時候會被喚醒呢?
當每個子線程執(zhí)行完畢的時候,會調(diào)用 countDown() 并基于 CAS 將計數(shù)器 state 的值減一,減一成功釋放資源后,就會調(diào)用 unparkSuccessor() 喚醒主線程,當所有的子線程都執(zhí)行完了,也就是 state 為 0 時,這時候主線程被喚醒之后就可以繼續(xù)執(zhí)行了。
state 被減成了 0 之后,就無法繼續(xù)使用這個 CountDownLatch 了,需要重新 new 一個,因為 state 的數(shù)量只有在初始化 CountDownLatch 的時候才可以設置,這也是 CountDownLatch 不可重用的原因。
2、CountDownLatch 的源碼簡單說明:
從代碼層面上來看,CountDownLatch 基于內(nèi)部類 Sync 實現(xiàn),而 Sync 繼承自 AQS。CountDownLatch 最主要有兩個方法:await() 和 countDown()
await(): 調(diào)用該方法的線程會被掛起,直到 CountDownLatch 計數(shù)器的值為 0 才繼續(xù)執(zhí)行,底層使用的是 AQS 的 tryAcquireShared()
countDown(): 用于減少計數(shù)器的數(shù)量,如果計數(shù)減為 0 的話,就會喚醒主線程,底層使用的是 AQS 的 releaseShared()
countDown() 方法詳細流程:
二、CyclicBarrier:
1、什么是CyclicBarrier:
CyclicBarrier,循環(huán)柵欄,通過 CyclicBarrier 可以實現(xiàn)一組線程之間的相互等待,當所有線程都到達屏障點之后再執(zhí)行后續(xù)的操作。通過 await() 方法可以實現(xiàn)等待,當最后一個線程執(zhí)行完,會使得所有在相應 CyclicBarrier 實例上的等待的線程被喚醒,而最后一個線程自身不會被暫停。
CyclicBarrier 沒有像 CountDownLatch 和 ReentrantLock 使用 AQS 的 state 變量,它是直接借助 ReentrantLock 加上 Condition 等待喚醒的功能進而實現(xiàn)的。在構(gòu)建 CyclicBarrier 的時候,傳入的值會賦值給 CyclicBarrier 內(nèi)部維護的變量 count,同時也會賦值給 parties 變量(這是可以復用的關鍵)。
線程調(diào)用 await() 表示線程已經(jīng)到達柵欄,每次調(diào)用 await() 時,會將 count 減一,操作 count 值是直接使用 ReentrantLock 來保證線程安全性的,如果 count 不為 0,則添加到 condition 隊列中,如果 count 等于 0,則把節(jié)點從 condition 隊列中移除并添加到 AQS 隊列中進行全部喚醒,并且將 parties 的值重新賦值給 count 從而實現(xiàn)復用。
2、CyclicBarrier 的源碼分析:
(1)成員變量:
//同步操作鎖 private?final?ReentrantLock?lock?=?new?ReentrantLock(); //線程攔截器 private?final?Condition?trip?=?lock.newCondition(); //每次攔截的線程數(shù) private?final?int?parties; //換代前執(zhí)行的任務 private?final?Runnable?barrierCommand; //表示柵欄的當前代 private?Generation?generation?=?new?Generation(); //計數(shù)器 private?int?count; ? //靜態(tài)內(nèi)部類Generation private?static?class?Generation?{ ??boolean?broken?=?false; }
CyclicBarrier 是通過獨占鎖實現(xiàn)的,底層包含了 “ReentrantLock 對象 lock” 和 “Condition 對象 trip”,通過條件隊列 trip 來對線程進行阻塞的,并且其內(nèi)部維護了兩個 int 型的變量 parties 和 count:
parties 表示每次攔截的線程數(shù),該值在構(gòu)造時進行賦值,用于實現(xiàn) CyclicBarrier 的復用;
count 是內(nèi)部計數(shù)器,它的初始值和 parties 相同,以后隨著每次 await 方法的調(diào)用而減 1,直到減為 0 就將所有線程喚醒。
CyclicBarrier 有一個靜態(tài)內(nèi)部類 Generation,該類的對象代表柵欄的當前代,利用它可以實現(xiàn)循環(huán)等待,當 count 減為 0 會將所有阻塞的線程喚醒,并設置成下一代。
barrierCommand 表示換代前執(zhí)行的任務,在喚醒所有線程前可以通過 barrierCommand 來執(zhí)行指定的任務
(2)await() 方法:
CyclicBarrier 類最主要的功能就是使先到達屏障點的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。
//非定時等待 public?int?await()?throws?InterruptedException,?BrokenBarrierException?{ ??try?{ ????return?dowait(false,?0L); ??}?catch?(TimeoutException?toe)?{ ????throw?new?Error(toe); ??} } ? //定時等待 public?int?await(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{ ??return?dowait(true,?unit.toNanos(timeout)); }
BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時。
可以看到不管是定時等待還是非定時等待,它們都調(diào)用了 dowait() 方法,只不過是傳入的參數(shù)不同而已,下面我們就來看看 dowait() 方法都做了些什么。
//核心等待方法 private?int?dowait(boolean?timed,?long?nanos)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{ ??//?顯示鎖 ??final?ReentrantLock?lock?=?this.lock; ??lock.lock(); ??try?{ ????final?Generation?g?=?generation; ????//檢查當前柵欄是否被打翻 ????if?(g.broken)?{ ??????throw?new?BrokenBarrierException(); ????} ????//檢查當前線程是否被中斷 ????if?(Thread.interrupted())?{ ??????//如果當前線程被中斷會做以下三件事 ??????//1.打翻當前柵欄 ??????//2.喚醒攔截的所有線程 ??????//3.拋出中斷異常 ??????breakBarrier(); ??????throw?new?InterruptedException(); ????} ????//每次都將計數(shù)器的值減1 ????int?index?=?--count; ????//計數(shù)器的值減為0則需喚醒所有線程并轉(zhuǎn)換到下一代 ????if?(index?==?0)?{ ??????boolean?ranAction?=?false; ??????try?{ ????????//喚醒所有線程前先執(zhí)行指定的任務 ????????final?Runnable?command?=?barrierCommand; ????????if?(command?!=?null)?{ ??????????command.run(); ????????} ????????ranAction?=?true; ????????//喚醒所有線程并轉(zhuǎn)到下一代 ????????nextGeneration(); ????????return?0; ??????}?finally?{ ????????//確保在任務未成功執(zhí)行時能將所有線程喚醒 ????????if?(!ranAction)?{ ??????????breakBarrier(); ????????} ??????} ????} ? ????//如果計數(shù)器不為0則執(zhí)行此循環(huán) ????for?(;;)?{ ??????try?{ ????????//根據(jù)傳入的參數(shù)來決定是定時等待還是非定時等待 ????????if?(!timed)?{ ??????????trip.await(); ????????}else?if?(nanos?>?0L)?{ ??????????nanos?=?trip.awaitNanos(nanos); ????????} ??????}?catch?(InterruptedException?ie)?{ ????????//若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程 ????????if?(g?==?generation?&&?!?g.broken)?{ ??????????breakBarrier(); ??????????throw?ie; ????????}?else?{ ??????????//若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,?則直接調(diào)用中斷操作 ??????????Thread.currentThread().interrupt(); ????????} ??????} ??????//如果線程因為打翻柵欄操作而被喚醒則拋出異常 ??????if?(g.broken)?{ ????????throw?new?BrokenBarrierException(); ??????} ??????//如果線程因為換代操作而被喚醒則返回計數(shù)器的值 ??????if?(g?!=?generation)?{ ????????return?index; ??????} ??????//如果線程因為時間到了而被喚醒則打翻柵欄并拋出異常 ??????if?(timed?&&?nanos?<=?0L)?{ ????????breakBarrier(); ????????throw?new?TimeoutException(); ??????} ????} ??}?finally?{ ????lock.unlock(); ??} }
上面執(zhí)行的代碼相對比較容易看懂,我們再來看一下執(zhí)行流程:
執(zhí)行 dowait() 方法時,先獲得顯示鎖,判斷當前線程狀態(tài)是否被中斷,如果是,則執(zhí)行 breakBarrier() 方法,喚醒之前阻塞的所有線程,并將計數(shù)器重置,否則,往下執(zhí)行;
計數(shù)器 count 減 1,如果 count == 0,表示最后一個線程達到柵欄,接著執(zhí)行之前指定的 Runnable 接口,同時執(zhí)行 nextGeneration() 方法進入下一代;
否則,進入自旋,判斷當前線程是進入定時等待還是非定時等待,如果在等待過程中被中斷,執(zhí)行 breakBarrier() 方法,喚醒之前阻塞的所有線程;
判斷是否是因為執(zhí)行 breakBarrier() 方法而被喚醒,如果是,則拋出異常;
判斷是否是正常的換代操作而被喚醒,如果是,則返回計數(shù)器的值;
判斷是否是超時而被喚醒,如果是,則喚醒之前阻塞的所有線程,并拋出異常;
釋放鎖。
(3)breakBarrier() 方法:
private?void?breakBarrier()?{ ?generation.broken?=?true;//柵欄被打破 ?count?=?parties;//重置count ?trip.signalAll();//喚醒之前阻塞的線程 }
(4)nextGeneration() 方法:
private?void?nextGeneration()?{ ?//喚醒所以的線程 ?trip.signalAll(); ?//重置計數(shù)器 ?count?=?parties; ?//重新開始 ?generation?=?new?Generation(); }
(5)reset()方法:
//?重置barrier到初始狀態(tài),所有還在等待中的線程最終會拋出BrokenBarrierException。 public?void?reset()?{ ?final?ReentrantLock?lock?=?this.lock; ????lock.lock(); ????try?{ ?????breakBarrier();???//?break?the?current?generation ????????nextGeneration();?//?start?a?new?generation ????}?finally?{ ?????lock.unlock(); ????} }
三、Semaphore:
1、什么是 Semaphore:
Semaphore 信號量,主要用于控制并發(fā)訪問共享資源的線程數(shù)量,底層基于 AQS 共享模式,并依賴 AQS 的變量 state 作為許可證 permit,通過控制許可證的數(shù)量,來保證線程之間的配合。線程使用 acquire() 獲取訪問許可,只有拿到 “許可證” 后才能繼續(xù)運行,當 Semaphore 的 permit 不為 0 的時候,對請求資源的線程放行,同時 permit 的值減1,當 permit 的值為 0 時,那么請求資源的線程會被阻塞直到其他線程釋放訪問許可,當線程對共享資源操作完成后,使用 release() 歸還訪問許可。
不同于 CyclicBarrier 和 ReentrantLock,Semaphore 不會使用到 AQS 的 Condition 條件隊列,都是在 CLH 同步隊列中操作,只是當前線程會被 park。另外 Semaphore 是不可重入的。
2、Semaphore 的公平和非公平兩種模式:
Semaphore 通過自定義兩種不同的同步器(FairSync 和 NonfairSync)提供了公平和非公平兩種工作模式,兩種模式下分別提供了限時/不限時、響應中斷/不響應中斷的獲取資源的方法(限時獲取總是及時響應中斷的),而所有的釋放資源的 release() 操作是統(tǒng)一的。
公平模式: 遵循 FIFO,調(diào)用 acquire() 方法獲取許可證的順序時,先判斷同步隊列中是不是存在其他的等待線程,如果存在就將請求線程封裝成 Node 結(jié)點加入同步隊列,從而保證每個線程獲取同步狀態(tài)都是按照先到先得的順序執(zhí)行的,否則對 state 值進行減操作并返回剩下的信號量
非公平模式: 是搶占式的,通過競爭的方式獲取,不管同步隊列中是否存在等待線程,有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
框架流程圖如下:
3、嘗試獲取資源 acquire()方法的執(zhí)行流程圖:
編輯:黃飛
?
評論
查看更多