java并發(fā)編程實戰(zhàn)之輔助類用法
Java并發(fā)編程:CountDownLatch、CyclicBarrier和Semaphore2017-09-18 13:07程序設計/58
在java 1.5中,提供了一些非常有用的輔助類來幫助我們進行并發(fā)編程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我們就來學習一下這三個輔助類的用法。
以下是本文目錄大綱:
一.CountDownLatch用法
二.CyclicBarrier用法
三.Semaphore用法
若有不正之處請多多諒解,并歡迎批評指正。
一.CountDownLatch用法
CountDownLatch類位于java.util.concurrent包下,利用它可以實現類似計數器的功能。比如有一個任務A,它要等待其他4個任務執(zhí)行完畢之后才能執(zhí)行,此時就可以利用CountDownLatch來實現這種功能了。
CountDownLatch類只提供了一個構造器:
publicCountDownLatch( intcount) { }; //參數count為計數值
然后下面這3個方法是CountDownLatch類中最重要的方法:
publicvoidawait() throwsInterruptedException { }; //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續(xù)執(zhí)行publicbooleanawait( longtimeout, TimeUnit unit)throwsInterruptedException { }; //和await()類似,只不過等待一定的時間后count值還沒變?yōu)?的話就會繼續(xù)執(zhí)行publicvoidcountDown() { }; //將count值減1
下面看一個例子大家就清楚CountDownLatch的用法了:
publicclassTest { publicstaticvoidmain(String[] args) { final CountDownLatch latch =newCountDownLatch( 2); newThread(){ publicvoidrun() { try{ System. out.println( “子線程”+Thread.currentThread().getName()+ “正在執(zhí)行”); Thread.sleep( 3000); System.out.println( “子線程”+Thread.currentThread().getName()+ “執(zhí)行完畢”); latch.countDown(); } catch(InterruptedException e) { e.printStackTrace(); } }; }.start(); newThread(){publicvoidrun() { try{ System. out.println( “子線程”+Thread.currentThread().getName()+ “正在執(zhí)行”); Thread.sleep( 3000); System. out.println( “子線程”+Thread.currentThread().getName()+ “執(zhí)行完畢”); latch.countDown(); }catch(InterruptedException e) { e.printStackTrace(); } }; }.start(); try{ System. out.println(“等待2個子線程執(zhí)行完畢。。.”); latch. await(); System. out.println( “2個子線程已經執(zhí)行完畢”); System. out.println( “繼續(xù)執(zhí)行主線程”); } catch(InterruptedException e) { e.printStackTrace(); } } }
執(zhí)行結果:
線程Thread- 0正在執(zhí)行 線程Thread- 1正在執(zhí)行 等待 2個子線程執(zhí)行完畢 。。.線程Thread- 0執(zhí)行完畢 線程Thread- 1執(zhí)行完畢 2個子線程已經執(zhí)行完畢 繼續(xù)執(zhí)行主線程二.CyclicBarrier用法
字面意思回環(huán)柵欄,通過它可以實現讓一組線程等待至某個狀態(tài)之后再全部同時執(zhí)行。叫做回環(huán)是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。我們暫且把這個狀態(tài)就叫做barrier,當調用await()方法之后,線程就處于barrier了。
CyclicBarrier類位于java.util.concurrent包下,CyclicBarrier提供2個構造器:
publicCyclicBarrier( intparties, Runnable barrierAction) { } publicCyclicBarrier( intparties) { }
參數parties指讓多少個線程或者任務等待至barrier狀態(tài);參數barrierAction為當這些線程都達到barrier狀態(tài)時會執(zhí)行的內容。
然后CyclicBarrier中最重要的方法就是await方法,它有2個重載版本:
publicintawait() throwsInterruptedException, BrokenBarrierException { }; publicintawait(longtimeout, TimeUnit unit)throwsInterruptedException,BrokenBarrierException,TimeoutException { };
第一個版本比較常用,用來掛起當前線程,直至所有線程都到達barrier狀態(tài)再同時執(zhí)行后續(xù)任務;
第二個版本是讓這些線程等待至一定的時間,如果還有線程沒有到達barrier狀態(tài)就直接讓到達barrier的線程執(zhí)行后續(xù)任務。
下面舉幾個例子就明白了:
假若有若干個線程都要進行寫數據操作,并且只有所有線程都完成寫數據操作之后,這些線程才能繼續(xù)做后面的事情,此時就可以利用CyclicBarrier了:
publicclassTest { publicstaticvoidmain(String[] args) { intN = 4; CyclicBarrier barrier =newCyclicBarrier(N); for( inti= 0;i《N;i++) newWriter(barrier).start(); } staticclass Writer extends Thread{ privateCyclicBarrier cyclicBarrier; publicWriter(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override publicvoidrun() { System. out.println( “線程”+Thread.currentThread().getName()+ “正在寫入數據。。.”); try{ Thread.sleep( 5000); //以睡眠來模擬寫入數據操作System. out.println( “線程”+Thread.currentThread().getName()+“寫入數據完畢,等待其他線程寫入完畢”); cyclicBarrier. await(); }catch(InterruptedException e) { e.printStackTrace(); } catch(BrokenBarrierException e){ e.printStackTrace(); } System. out.println( “所有線程寫入完畢,繼續(xù)處理其他任務。。.”); } } }
執(zhí)行結果:
線程Thread- 0正在寫入數據 。。.線程Thread- 3正在寫入數據 。。.線程Thread- 2正在寫入數據。。.線程Thread- 1正在寫入數據 。。.線程Thread- 2寫入數據完畢,等待其他線程寫入完畢 線程Thread- 0寫入數據完畢,等待其他線程寫入完畢 線程Thread- 3寫入數據完畢,等待其他線程寫入完畢 線程Thread- 1寫入數據完畢,等待其他線程寫入完畢 所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.
從上面輸出結果可以看出,每個寫入線程執(zhí)行完寫數據操作之后,就在等待其他線程寫入操作完畢。
當所有線程線程寫入操作完畢之后,所有線程就繼續(xù)進行后續(xù)的操作了。
如果說想在所有線程寫入操作完之后,進行額外的其他操作可以為CyclicBarrier提供Runnable參數:
publicclassTest { publicstaticvoidmain(String[] args) { intN = 4; CyclicBarrier barrier =newCyclicBarrier(N, newRunnable() { @Override publicvoidrun() { System. out.println( “當前線程”+Thread.currentThread().getName()); } }); for( inti= 0;i《N;i++)newWriter(barrier).start(); } staticclass Writer extends Thread{ privateCyclicBarrier cyclicBarrier; publicWriter(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override publicvoidrun() { System. out.println( “線程”+Thread.currentThread().getName()+ “正在寫入數據。。.”); try{ Thread.sleep( 5000); //以睡眠來模擬寫入數據操作System. out.println( “線程”+Thread.currentThread().getName()+“寫入數據完畢,等待其他線程寫入完畢”); cyclicBarrier. await(); }catch(InterruptedException e) { e.printStackTrace(); } catch(BrokenBarrierException e){ e.printStackTrace(); } System. out.println( “所有線程寫入完畢,繼續(xù)處理其他任務。。.”); } } }
運行結果:
線程Thread- 0正在寫入數據 。。.線程Thread- 1正在寫入數據 。。.線程Thread- 2正在寫入數據。。.線程Thread- 3正在寫入數據 。。.線程Thread- 0寫入數據完畢,等待其他線程寫入完畢 線程Thread- 1寫入數據完畢,等待其他線程寫入完畢 線程Thread- 2寫入數據完畢,等待其他線程寫入完畢 線程Thread- 3寫入數據完畢,等待其他線程寫入完畢 當前線程Thread- 3所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.所有線程寫入完畢,繼續(xù)處理其他任務 。。.
從結果可以看出,當四個線程都到達barrier狀態(tài)后,會從四個線程中選擇一個線程去執(zhí)行Runnable。
下面看一下為await指定時間的效果:
publicclassTest { publicstaticvoidmain(String[] args) { intN = 4; CyclicBarrier barrier =newCyclicBarrier(N); for( inti= 0;i《N;i++) { if(i《N- 1) newWriter(barrier).start(); else{ try{ Thread.sleep( 5000); } catch(InterruptedException e) { e.printStackTrace(); }newWriter(barrier).start(); } } } staticclass Writer extends Thread{ privateCyclicBarrier cyclicBarrier; publicWriter(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override publicvoidrun() { System. out.println( “線程”+Thread.currentThread().getName()+ “正在寫入數據。。.”); try{ Thread.sleep( 5000); //以睡眠來模擬寫入數據操作System. out.println( “線程”+Thread.currentThread().getName()+“寫入數據完畢,等待其他線程寫入完畢”); try{ cyclicBarrier. await( 2000, TimeUnit.MILLISECONDS); } catch(TimeoutException e) { // TODO Auto-generated catch blocke.printStackTrace(); } } catch(InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+ “所有線程寫入完畢,繼續(xù)處理其他任務。。.”); } } }
執(zhí)行結果:
線程Thread- 0正在寫入數據 。。.線程Thread- 2正在寫入數據 。。.線程Thread- 1正在寫入數據。。.線程Thread- 2寫入數據完畢,等待其他線程寫入完畢 線程Thread- 0寫入數據完畢,等待其他線程寫入完畢 線程Thread- 1寫入數據完畢,等待其他線程寫入完畢 線程Thread- 3正在寫入數據 。。.java.util.concurrent.TimeoutException Thread- 1所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 0所有線程寫入完畢,繼續(xù)處理其他任務 。。.at java.util.concurrent.CyclicBarrier.dowait(Unknown Source) at java.util.concurrent.CyclicBarrier.await(Unknown Source) at com.cxh.test1.Test$Writer.run(Test.java: 58) java.util.concurrent.BrokenBarrierException at java.util.concurrent.CyclicBarrier.dowait(Unknown Source) at java.util.concurrent.CyclicBarrier.await(Unknown Source) at com.cxh.test1.Test$Writer.run(Test.java: 58) java.util.concurrent.BrokenBarrierException at java.util.concurrent.CyclicBarrier.dowait(Unknown Source) at java.util.concurrent.CyclicBarrier.await(Unknown Source) at com.cxh.test1.Test$Writer.run(Test.java: 58) Thread- 2所有線程寫入完畢,繼續(xù)處理其他任務 。。.java.util.concurrent.BrokenBarrierException 線程Thread- 3寫入數據完畢,等待其他線程寫入完畢 at java.util.concurrent.CyclicBarrier.dowait(Unknown Source) at java.util.concurrent.CyclicBarrier.await(Unknown Source) at com.cxh.test1.Test$Writer.run(Test.java: 58) Thread- 3所有線程寫入完畢,繼續(xù)處理其他任務 。。.
上面的代碼在main方法的for循環(huán)中,故意讓最后一個線程啟動延遲,因為在前面三個線程都達到barrier之后,等待了指定的時間發(fā)現第四個線程還沒有達到barrier,就拋出異常并繼續(xù)執(zhí)行后面的任務。
另外CyclicBarrier是可以重用的,看下面這個例子:
/** * Java學習交流QQ群:589809992 我們一起學Java! */publicclassTest{publicstaticvoidmain(String[] args) { intN = 4; CyclicBarrier barrier =newCyclicBarrier(N); for( inti= 0;i《N;i++) { newWriter(barrier).start(); } try{ Thread.sleep(25000); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println(“CyclicBarrier重用”); for( inti= 0;i《N;i++) { newWriter(barrier).start(); } } staticclass Writer extends Thread{ privateCyclicBarrier cyclicBarrier; publicWriter(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Overridepublicvoidrun() { System.out.println( “線程”+Thread.currentThread().getName()+ “正在寫入數據。。.”); try{ Thread.sleep( 5000); //以睡眠來模擬寫入數據操作System.out.println( “線程”+Thread.currentThread().getName()+“寫入數據完畢,等待其他線程寫入完畢”); cyclicBarrier.await(); }catch(InterruptedException e) { e.printStackTrace(); } catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+ “所有線程寫入完畢,繼續(xù)處理其他任務。。.”); } } }
執(zhí)行結果:
線程Thread- 0正在寫入數據 。。.線程Thread- 1正在寫入數據 。。.線程Thread- 3正在寫入數據。。.線程Thread- 2正在寫入數據 。。.線程Thread- 1寫入數據完畢,等待其他線程寫入完畢 線程Thread- 3寫入數據完畢,等待其他線程寫入完畢 線程Thread- 2寫入數據完畢,等待其他線程寫入完畢 線程Thread- 0寫入數據完畢,等待其他線程寫入完畢 Thread- 0所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 3所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 1所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 2所有線程寫入完畢,繼續(xù)處理其他任務。。.CyclicBarrier重用 線程Thread- 4正在寫入數據 。。.線程Thread- 5正在寫入數據 。。.線程Thread- 6正在寫入數據 。。.線程Thread- 7正在寫入數據 。。.線程Thread- 7寫入數據完畢,等待其他線程寫入完畢 線程Thread- 5寫入數據完畢,等待其他線程寫入完畢 線程Thread- 6寫入數據完畢,等待其他線程寫入完畢 線程Thread- 4寫入數據完畢,等待其他線程寫入完畢 Thread- 4所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 5所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 6所有線程寫入完畢,繼續(xù)處理其他任務 。。.Thread- 7所有線程寫入完畢,繼續(xù)處理其他任務 。。.
從執(zhí)行結果可以看出,在初次的4個線程越過barrier狀態(tài)后,又可以用來進行新一輪的使用。而CountDownLatch無法進行重復使用。
三.Semaphore用法
Semaphore翻譯成字面意思為 信號量,Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
Semaphore類位于java.util.concurrent包下,它提供了2個構造器:
publicSemaphore( intpermits) { //參數permits表示許可數目,即同時可以允許多少線程進行訪問sync = newNonfairSync(permits); } publicSemaphore( intpermits, booleanfair) { //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可sync = (fair)?newFairSync(permits) : newNonfairSync(permits); }
下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:
publicvoidacquire() throwsInterruptedException { } //獲取一個許可publicvoidacquire(intpermits) throwsInterruptedException { } //獲取permits個許可publicvoidrelease() { } //釋放一個許可publicvoidrelease( intpermits) { } //釋放permits個許可
acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。
release()用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。
這4個方法都會被阻塞,如果想立即得到執(zhí)行結果,可以使用下面幾個方法:
publicbooleantryAcquire() { }; //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回falsepublicbooleantryAcquire( longtimeout, TimeUnit unit)throwsInterruptedException { }; //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回falsepublicbooleantryAcquire( intpermits) { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回falsepublicbooleantryAcquire( intpermits, longtimeout, TimeUnit unit)throwsInterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false
另外還可以通過availablePermits()方法得到可用的許可數目。
下面通過一個例子來看一下Semaphore的具體使用:
假若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續(xù)使用。那么我們就可以通過Semaphore來實現:
/** * Java學習交流QQ群:589809992 我們一起學Java! */publicclassTest{publicstaticvoidmain(String[] args) { intN = 8; //工人數Semaphore semaphore = newSemaphore( 5); //機器數目for( inti= 0;i《N;i++)newWorker(i,semaphore).start(); } staticclass Worker extends Thread{ privateintnum;privateSemaphore semaphore; publicWorker( intnum,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Overridepublicvoidrun() { try{ semaphore.acquire(); System.out.println( “工人”+ this.num+ “占用一個機器在生產。。.”); Thread.sleep( 2000); System.out.println( “工人”+ this.num+ “釋放出機器”); semaphore.release(); } catch(InterruptedException e) { e.printStackTrace(); } } } }
執(zhí)行結果:
工人 0占用一個機器在生產 。。.工人 1占用一個機器在生產 。。.工人 2占用一個機器在生產 。。.工人 4占用一個機器在生產 。。.工人 5占用一個機器在生產 。。.工人 0釋放出機器 工人 2釋放出機器 工人 3占用一個機器在生產 。。.工人 7占用一個機器在生產 。。.工人 4釋放出機器 工人 5釋放出機器 工人 1釋放出機器 工人 6占用一個機器在生產 。。.工人 3釋放出機器 工人 7釋放出機器 工人 6釋放出機器
下面對上面說的三個輔助類進行一個總結:
1)CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同:
CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務之后,它才執(zhí)行;
而CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行;
另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。
2)Semaphore其實和鎖有點類似,它一般用于控制對某組資源的訪問權限。
非常好我支持^.^
(0) 0%
不好我反對
(0) 0%