0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Java多線程永動(dòng)任務(wù) 多線程異步任務(wù)項(xiàng)目解讀

jf_ro2CN3Fa ? 來源:樓仔 ? 作者:樓仔 ? 2022-10-19 11:46 ? 次閱讀

1. 功能說明

2. 多線程任務(wù)示例

2.1 線程池

2.2 單個(gè)任務(wù)

2.3 任務(wù)入口

2.4 結(jié)果分析

2.5 源碼地址

3. 寫在最后

大家好,今天教大家擼一個(gè) Java 的多線程永動(dòng)任務(wù),這個(gè)示例的原型是公司自研的多線程異步任務(wù)項(xiàng)目 ,我把里面涉及到多線程的代碼抽離出來,然后進(jìn)行一定的改造。

里面涉及的知識(shí)點(diǎn)非常多,特別適合有一定工作經(jīng)驗(yàn) 的同學(xué)學(xué)習(xí),或者可以直接拿到項(xiàng)目中使用。

文章結(jié)構(gòu)非常簡(jiǎn)單:

ae8ef34e-4cfe-11ed-a3b6-dac502259ad0.png

1. 功能說明

做這個(gè)多線程異步任務(wù),主要是因?yàn)槲覀冇泻芏嘤绖?dòng)的異步任務(wù),什么是永動(dòng)呢?就是任務(wù)跑起來后,需要一直跑下去。

比如消息 Push 任務(wù),因?yàn)橐恢庇邢⑦^來,所以需要一直去消費(fèi) DB 中的未推送消息,就需要整一個(gè) Push 的永動(dòng)異步任務(wù)。

我們的需求其實(shí)不難,簡(jiǎn)單總結(jié)一下:

能同時(shí)執(zhí)行多個(gè)永動(dòng)的異步任務(wù) ;

每個(gè)異步任務(wù),支持開多個(gè)線程 去消費(fèi)這個(gè)任務(wù)的數(shù)據(jù);

支持永動(dòng)異步任務(wù)的優(yōu)雅關(guān)閉 ,即關(guān)閉后,需要把所有的數(shù)據(jù)消費(fèi)完畢后,再關(guān)閉。

完成上面的需求,需要注意幾個(gè)點(diǎn):

每個(gè)永動(dòng)任務(wù) ,可以開一個(gè)線程去執(zhí)行;

每個(gè)子任務(wù) ,因?yàn)樾枰С植l(fā),需要用線程池控制;

永動(dòng)任務(wù)的關(guān)閉,需要通知子任務(wù)的并發(fā)線程,并支持永動(dòng)任務(wù)和并發(fā)子任務(wù)的優(yōu)雅關(guān)閉 。

2. 多線程任務(wù)示例

2.1 線程池

對(duì)于子任務(wù),需要支持并發(fā),如果每個(gè)并發(fā)都開一個(gè)線程,用完就關(guān)閉,對(duì)資源消耗太大,所以引入線程池:

publicclassTaskProcessUtil{
//每個(gè)任務(wù),都有自己?jiǎn)为?dú)的線程池
privatestaticMapexecutors=newConcurrentHashMap<>();

//初始化一個(gè)線程池
privatestaticExecutorServiceinit(StringpoolName,intpoolSize){
returnnewThreadPoolExecutor(poolSize,poolSize,
0L,TimeUnit.MILLISECONDS,
newLinkedBlockingQueue(),
newThreadFactoryBuilder().setNameFormat("Pool-"+poolName).setDaemon(false).build(),
newThreadPoolExecutor.CallerRunsPolicy());
}

//獲取線程池
publicstaticExecutorServicegetOrInitExecutors(StringpoolName,intpoolSize){
ExecutorServiceexecutorService=executors.get(poolName);
if(null==executorService){
synchronized(TaskProcessUtil.class){
executorService=executors.get(poolName);
if(null==executorService){
executorService=init(poolName,poolSize);
executors.put(poolName,executorService);
}
}
}
returnexecutorService;
}

//回收線程資源
publicstaticvoidreleaseExecutors(StringpoolName){
ExecutorServiceexecutorService=executors.remove(poolName);
if(executorService!=null){
executorService.shutdown();
}
}
}

這是一個(gè)線程池的工具類,這里初始化線程池和回收線程資源很簡(jiǎn)單,我們主要討論獲取線程池。

獲取線程池可能會(huì)存在并發(fā)情況,所以需要加一個(gè) synchronized 鎖,然后鎖住后,需要對(duì) executorService 進(jìn)行二次判空校驗(yàn)。

2.2 單個(gè)任務(wù)

為了更好講解單個(gè)任務(wù)的實(shí)現(xiàn)方式,我們的任務(wù)主要就是把 Cat 的數(shù)據(jù)打印出來,Cat 定義如下:

@Data
@Service
publicclassCat{
privateStringcatName;
publicCatsetCatName(Stringname){
this.catName=name;
returnthis;
}
}

單個(gè)任務(wù)主要包括以下功能:

獲取永動(dòng)任務(wù)數(shù)據(jù) :這里一般都是掃描 DB,我直接就簡(jiǎn)單用 queryData() 代替。

多線程執(zhí)行任務(wù) :需要把數(shù)據(jù)拆分成 4 份,然后分別由多線程并發(fā)執(zhí)行,這里可以通過線程池支持;

永動(dòng)任務(wù)優(yōu)雅停機(jī) :當(dāng)外面通知任務(wù)需要停機(jī),需要執(zhí)行完剩余任務(wù)數(shù)據(jù),并回收線程資源,退出任務(wù);

永動(dòng)執(zhí)行 :如果未收到停機(jī)命令,任務(wù)需要一直執(zhí)行下去。

直接看代碼:

publicclassChildTask{

privatefinalintPOOL_SIZE=3;//線程池大小
privatefinalintSPLIT_SIZE=4;//數(shù)據(jù)拆分大小
privateStringtaskName;

//接收jvm關(guān)閉信號(hào),實(shí)現(xiàn)優(yōu)雅停機(jī)
protectedvolatilebooleanterminal=false;

publicChildTask(StringtaskName){
this.taskName=taskName;
}

//程序執(zhí)行入口
publicvoiddoExecute(){
inti=0;
while(true){
System.out.println(taskName+":Cycle-"+i+"-Begin");
//獲取數(shù)據(jù)
Listdatas=queryData();
//處理數(shù)據(jù)
taskExecute(datas);
System.out.println(taskName+":Cycle-"+i+"-End");
if(terminal){
//只有應(yīng)用關(guān)閉,才會(huì)走到這里,用于實(shí)現(xiàn)優(yōu)雅的下線
break;
}
i++;
}
//回收線程池資源
TaskProcessUtil.releaseExecutors(taskName);
}

//優(yōu)雅停機(jī)
publicvoidterminal(){
//關(guān)機(jī)
terminal=true;
System.out.println(taskName+"shutdown");
}

//處理數(shù)據(jù)
privatevoiddoProcessData(Listdatas,CountDownLatchlatch){
try{
for(Catcat:datas){
System.out.println(taskName+":"+cat.toString()+",ThreadName:"+Thread.currentThread().getName());
Thread.sleep(1000L);
}
}catch(Exceptione){
System.out.println(e.getStackTrace());
}finally{
if(latch!=null){
latch.countDown();
}
}
}

//處理單個(gè)任務(wù)數(shù)據(jù)
privatevoidtaskExecute(ListsourceDatas){
if(CollectionUtils.isEmpty(sourceDatas)){
return;
}
//將數(shù)據(jù)拆成4份
List>splitDatas=Lists.partition(sourceDatas,SPLIT_SIZE);
finalCountDownLatchlatch=newCountDownLatch(splitDatas.size());

//并發(fā)處理拆分的數(shù)據(jù),共用一個(gè)線程池
for(finalListdatas:splitDatas){
ExecutorServiceexecutorService=TaskProcessUtil.getOrInitExecutors(taskName,POOL_SIZE);
executorService.submit(newRunnable(){
@Override
publicvoidrun(){
doProcessData(datas,latch);
}
});
}

try{
latch.await();
}catch(Exceptione){
System.out.println(e.getStackTrace());
}
}

//獲取永動(dòng)任務(wù)數(shù)據(jù)
privateListqueryData(){
Listdatas=newArrayList<>();
for(inti=0;i

簡(jiǎn)單解釋一下:

queryData :用于獲取數(shù)據(jù),實(shí)際應(yīng)用中其實(shí)是需要把 queryData 定為抽象方法,然后由各個(gè)任務(wù)實(shí)現(xiàn)自己的方法。

doProcessData :數(shù)據(jù)處理邏輯,實(shí)際應(yīng)用中其實(shí)是需要把 doProcessData 定為抽象方法,然后由各個(gè)任務(wù)實(shí)現(xiàn)自己的方法。

taskExecute :將數(shù)據(jù)拆分成 4 份,獲取該任務(wù)的線程池,并交給線程池并發(fā)執(zhí)行,然后通過 latch.await() 阻塞。當(dāng)這 4 份數(shù)據(jù)都執(zhí)行成功后,阻塞結(jié)束,該方法才返回。

terminal :僅用于接受停機(jī)命令,這里該變量定義為 volatile,所以多線程內(nèi)存可見;

doExecute :程序執(zhí)行入口,封裝了每個(gè)任務(wù)執(zhí)行的流程,當(dāng) terminal=true 時(shí),先執(zhí)行完任務(wù)數(shù)據(jù),然后回收線程池,最后退出。

2.3 任務(wù)入口

直接上代碼:

publicclassLoopTask{
privateListchildTasks;
publicvoidinitLoopTask(){
childTasks=newArrayList();
childTasks.add(newChildTask("childTask1"));
childTasks.add(newChildTask("childTask2"));
for(finalChildTaskchildTask:childTasks){
newThread(newRunnable(){
@Override
publicvoidrun(){
childTask.doExecute();
}
}).start();
}
}
publicvoidshutdownLoopTask(){
if(!CollectionUtils.isEmpty(childTasks)){
for(ChildTaskchildTask:childTasks){
childTask.terminal();
}
}
}
publicstaticvoidmain(Stringargs[])throwsException{
LoopTaskloopTask=newLoopTask();
loopTask.initLoopTask();
Thread.sleep(5000L);
loopTask.shutdownLoopTask();
}
}

每個(gè)任務(wù)都開一個(gè)單獨(dú)的 Thread,這里我初始化了 2 個(gè)永動(dòng)任務(wù),分別為 childTask1 和 childTask2,然后分別執(zhí)行,后面 Sleep 了 5 秒后,再關(guān)閉任務(wù),我們可以看看是否可以按照我們的預(yù)期優(yōu)雅退出。

2.4 結(jié)果分析

執(zhí)行結(jié)果如下:

childTask1:Cycle-0-Begin
childTask2:Cycle-0-Begin
childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1
childTask2:Cycle-0-End
childTask2:Cycle-1-Begin
childTask1:Cycle-0-End
childTask1:Cycle-1-Begin
childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1
childTask1shutdown
childTask2shutdown
childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2
childTask1:Cycle-1-End
childTask2:Cycle-1-End

輸出數(shù)據(jù):

“Pool-childTask” 是線程池名稱;

“childTask” 是任務(wù)名稱;

“Cat(catName=羅小黑)” 是執(zhí)行的結(jié)果;

“childTask shut down” 是關(guān)閉標(biāo)記;

“childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一輪循環(huán)的開始和結(jié)束標(biāo)記。

我們分析一下執(zhí)行結(jié)果:

childTask1 和 childTask2 分別執(zhí)行,在第一輪循環(huán)中都正常輸出了 5 條羅小黑數(shù)據(jù);

第二輪執(zhí)行過程中,我啟動(dòng)了關(guān)閉指令,這次第二輪執(zhí)行沒有直接停止,而是先執(zhí)行完任務(wù)中的數(shù)據(jù),再執(zhí)行退出,所以完全符合我們的優(yōu)雅退出結(jié)論。

2.5 源碼地址

GitHub 地址:

https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc

3. 寫在最后

對(duì)于這個(gè)經(jīng)典的線程池使用示例,原項(xiàng)目是我好友一灰 寫的,技術(shù)水平阿里 P7級(jí)別,實(shí)現(xiàn)得也非常優(yōu)雅,涉及的知識(shí)點(diǎn)非常多 ,非常值得大家學(xué)習(xí)。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2967

    瀏覽量

    104758
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3616

    瀏覽量

    93738
  • 多線程
    +關(guān)注

    關(guān)注

    0

    文章

    278

    瀏覽量

    19961
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4788

    瀏覽量

    68616
  • Thread
    +關(guān)注

    關(guān)注

    2

    文章

    83

    瀏覽量

    25927

原文標(biāo)題:新來個(gè)阿里 P7,僅花 2 小時(shí),擼出一個(gè)多線程永動(dòng)任務(wù),看完直接跪了,真牛逼!

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Java多線程的用法

    本文將介紹一下Java多線程的用法。 基礎(chǔ)介紹 什么是多線程 指的是在一個(gè)進(jìn)程中同時(shí)運(yùn)行多個(gè)線程,每個(gè)線程都可以獨(dú)立執(zhí)行不同的
    的頭像 發(fā)表于 09-30 17:07 ?953次閱讀

    Java基礎(chǔ)學(xué)習(xí)多線程使用指南

    黑馬程序員-----Java基礎(chǔ)學(xué)習(xí)多線程
    發(fā)表于 10-08 14:10

    介紹Arduino借助多線程SCoop庫(kù)如何輕松實(shí)現(xiàn)并發(fā)處理任務(wù)

    我們?cè)谟肁rduino開發(fā)復(fù)雜項(xiàng)目時(shí),或多或少會(huì)面臨多任務(wù)同時(shí)處理的工作場(chǎng)景,本篇簡(jiǎn)單介紹Arduino借助多線程SCoop庫(kù)如何輕松實(shí)現(xiàn)并發(fā)處理任務(wù)。
    發(fā)表于 08-24 06:57

    什么時(shí)候要使用多線程

    什么時(shí)候要使用多線程:cpu密集型:(比如一個(gè)while( true ){ i++;})IO密集型:(比如一個(gè)從磁盤拷貝數(shù)據(jù)到另一個(gè)磁盤的拷貝進(jìn)程)1)計(jì)算密集型任務(wù)。此時(shí)要盡量使用多線程,可以提高
    發(fā)表于 09-06 07:25

    請(qǐng)問CubeMX如何配置FreeRTOS跑多線程任務(wù)

    請(qǐng)問CubeMX如何配置FreeRTOS跑多線程任務(wù)?
    發(fā)表于 02-14 06:39

    如何使用多線程異步操作等并發(fā)設(shè)計(jì)方法來最大化程序的性能

    任務(wù)。  不過有個(gè)問題,異步有時(shí)優(yōu)先級(jí)比主線程還高。這個(gè)特點(diǎn)和多線程不同?! ≡髡撸簀ingjin221
    發(fā)表于 08-23 16:31

    java多線程編程實(shí)例 (源程序)

    java多線程編程實(shí)例 import java.awt.*;import javax.swing.*; public class CompMover extends Object { 
    發(fā)表于 10-22 11:48 ?0次下載

    java多線程設(shè)計(jì)模式_結(jié)城浩

    JAVA多線程設(shè)計(jì)模式》通過淺顯易懂的文字與實(shí)例來介紹JAVA線程相關(guān)的設(shè)計(jì)模式概念,并且通過實(shí)際的JAVA程序范例和UML圖示來一一解說
    發(fā)表于 01-05 16:15 ?0次下載
    <b class='flag-5'>java</b><b class='flag-5'>多線程</b>設(shè)計(jì)模式_結(jié)城浩

    多線程好還是單線程好?單線程多線程的區(qū)別 優(yōu)缺點(diǎn)分析

    摘要:如今單線程多線程已經(jīng)得到普遍運(yùn)用,那么到底多線程好還是單線程好呢?單線程多線程的區(qū)別又
    發(fā)表于 12-08 09:33 ?8.1w次閱讀

    什么是多線程編程?多線程編程基礎(chǔ)知識(shí)

    摘要:多線程編程是現(xiàn)代軟件技術(shù)中很重要的一個(gè)環(huán)節(jié)。要弄懂多線程,這就要牽涉到多進(jìn)程。本文主要以多線程編程以及多線程編程相關(guān)知識(shí)而做出的一些結(jié)論。
    發(fā)表于 12-08 16:30 ?1.3w次閱讀

    java學(xué)習(xí)——java面試【事務(wù)、鎖、多線程】資料整理

    本文檔內(nèi)容介紹了基于java學(xué)習(xí)java面試【事務(wù)、鎖、多線程】資料整理,供參考
    發(fā)表于 03-13 13:53 ?0次下載

    Linux下的多線程編程

    線程呢?使用多線程到底有哪些好處?什么的系統(tǒng)應(yīng)該選用多線程?我們首先必須回答這些問題?! ∈褂?b class='flag-5'>多線程的理由之一是和進(jìn)程相比,它是一種非常"節(jié)儉"的多
    發(fā)表于 04-02 14:43 ?606次閱讀

    多線程如何保證數(shù)據(jù)的同步

    多線程編程是一種并發(fā)編程的方法,意味著程序中同時(shí)運(yùn)行多個(gè)線程,每個(gè)線程可獨(dú)立執(zhí)行不同的任務(wù),共享同一份數(shù)據(jù)。由于多線程并發(fā)執(zhí)行的特點(diǎn),會(huì)引發(fā)
    的頭像 發(fā)表于 11-17 14:22 ?1237次閱讀

    java實(shí)現(xiàn)多線程的幾種方式

    Java實(shí)現(xiàn)多線程的幾種方式 多線程是指程序中包含了兩個(gè)或以上的線程,每個(gè)線程都可以并行執(zhí)行不同的任務(wù)
    的頭像 發(fā)表于 03-14 16:55 ?709次閱讀

    socket 多線程編程實(shí)現(xiàn)方法

    是指在同一個(gè)進(jìn)程中運(yùn)行多個(gè)線程,每個(gè)線程可以獨(dú)立執(zhí)行任務(wù)線程共享進(jìn)程的資源,如內(nèi)存空間和文件句柄,但每個(gè)線程有自己的程序計(jì)數(shù)器、寄存器集合
    的頭像 發(fā)表于 11-12 14:16 ?358次閱讀