- 通信底層介紹
- 通信整體流程
- 驚艷的設(shè)計(jì)
通信底層介紹
xxl-job 使用 netty http 的方式進(jìn)行通信,雖然也支持 Mina,jetty,netty tcp 等方式,但是代碼里面固定寫死的是 netty http。
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
- 項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 視頻教程:https://doc.iocoder.cn/video/
通信整體流程
我以調(diào)度器通知執(zhí)行器執(zhí)行任務(wù)為例,繪制的活動(dòng)圖:
活動(dòng)圖基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
驚艷的設(shè)計(jì)
看完了整個(gè)處理流程代碼,設(shè)計(jì)上可以說獨(dú)具匠心,將 netty,多線程的知識(shí)運(yùn)用得行云流水。
我現(xiàn)在就將這些設(shè)計(jì)上出彩的點(diǎn)總結(jié)如下:
使用動(dòng)態(tài)代理模式,隱藏通信細(xì)節(jié)
xxl-job 定義了兩個(gè)接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發(fā)執(zhí)行等操作,AdminBiz 封裝了回調(diào),注冊,取消注冊操作,接口的實(shí)現(xiàn)類中,并沒有通信相關(guān)的處理。
XxlRpcReferenceBean 類的 getObject() 方法會(huì)生成一個(gè)代理類,這個(gè)代理類會(huì)進(jìn)行遠(yuǎn)程通信。
全異步處理
執(zhí)行器收到消息進(jìn)行反序列化,并沒有同步執(zhí)行任務(wù)代碼,而是將任務(wù)信息存儲(chǔ)在 LinkedBlockingQueue 中,異步線程從這個(gè)隊(duì)列中獲取任務(wù)信息,然后執(zhí)行。
而任務(wù)的處理結(jié)果,也不是說處理完之后,同步返回的,也是放到回調(diào)線程的阻塞隊(duì)列中,異步的將處理結(jié)果返回回去。
這樣處理的好處就是減少了 netty 工作線程的處理時(shí)間,提升了吞吐量。
對異步處理的包裝
對異步處理進(jìn)行了包裝,代碼看起來是同步調(diào)用的。
我們看下調(diào)度器,XxlJobTrigger 類觸發(fā)任務(wù)執(zhí)行的代碼:
publicstaticReturnTrunExecutor(TriggerParamtriggerParam,Stringaddress) {
ReturnTrunResult=null;
try{
ExecutorBizexecutorBiz=XxlJobScheduler.getExecutorBiz(address);
//這里面做了很多異步處理,最終同步得到處理結(jié)果
runResult=executorBiz.run(triggerParam);
}catch(Exceptione){
logger.error(">>>>>>>>>>>xxl-jobtriggererror,pleasecheckiftheexecutor[{}]isrunning.",address,e);
runResult=newReturnT(ReturnT.FAIL_CODE,ThrowableUtil.toString(e));
}
StringBufferrunResultSB=newStringBuffer(I18nUtil.getString("jobconf_trigger_run")+":");
runResultSB.append("
address:").append(address);
runResultSB.append("
code:").append(runResult.getCode());
runResultSB.append("
msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
returnrunResult;
}
ExecutorBiz.run 方法我們說過了,是走的動(dòng)態(tài)代理,和執(zhí)行器進(jìn)行通信,執(zhí)行器執(zhí)行結(jié)果也是異步處理完,才返回的,而這里看到的 run 方法是同步等待處理結(jié)果返回。
我們看下xxl-job是如何同步獲取處理結(jié)果的:調(diào)度器向執(zhí)行器發(fā)出消息后,該線程阻塞。等到執(zhí)行器處理完畢后,將處理結(jié)果返回,喚醒被阻塞的線程,調(diào)用處拿到返回值。
動(dòng)態(tài)代理代碼如下:
//代理類中的觸發(fā)調(diào)用
if(CallType.SYNC==callType){
//future-responseset
XxlRpcFutureResponsefutureResponse=newXxlRpcFutureResponse(invokerFactory,xxlRpcRequest,null);
try{
//doinvoke
client.asyncSend(finalAddress,xxlRpcRequest);
//futureget
XxlRpcResponsexxlRpcResponse=futureResponse.get(timeout,TimeUnit.MILLISECONDS);
if(xxlRpcResponse.getErrorMsg()!=null){
thrownewXxlRpcException(xxlRpcResponse.getErrorMsg());
}
returnxxlRpcResponse.getResult();
}catch(Exceptione){
logger.info(">>>>>>>>>>>xxl-rpc,invokeerror,address:{},XxlRpcRequest{}",finalAddress,xxlRpcRequest);
throw(einstanceofXxlRpcException)?e:newXxlRpcException(e);
}finally{
//future-responseremove
futureResponse.removeInvokerFuture();
}
}
XxlRpcFutureResponse 類中實(shí)現(xiàn)了線程的等待,和線程喚醒的處理:
//返回結(jié)果,喚醒線程
publicvoidsetResponse(XxlRpcResponseresponse){
this.response=response;
synchronized(lock){
done=true;
lock.notifyAll();
}
}
@Override
publicXxlRpcResponseget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{
if(!done){
synchronized(lock){
try{
if(timeout0){
//線程阻塞
lock.wait();
}else{
longtimeoutMillis=(TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout,unit);
lock.wait(timeoutMillis);
}
}catch(InterruptedExceptione){
throwe;
}
}
}
if(!done){
thrownewXxlRpcException("xxl-rpc,requesttimeoutat:"+System.currentTimeMillis()+",request:"+request.toString());
}
returnresponse;
}
有的同學(xué)可能會(huì)問了,調(diào)度器接收到返回結(jié)果,怎么確定喚醒哪個(gè)線程呢?
每一次遠(yuǎn)程調(diào)用,都會(huì)生成 uuid 的請求 id,這個(gè) id 是在整個(gè)調(diào)用過程中一直傳遞的,就像一把鑰匙,在你回家的的時(shí)候,拿著它就帶開門。
這里拿著請求 id 這把鑰匙,就能找到對應(yīng)的 XxlRpcFutureResponse,然后調(diào)用 setResponse 方法,設(shè)置返回值,喚醒線程。
publicvoidnotifyInvokerFuture(StringrequestId,finalXxlRpcResponsexxlRpcResponse){
//通過requestId找到XxlRpcFutureResponse,
finalXxlRpcFutureResponsefutureResponse=futureResponsePool.get(requestId);
if(futureResponse==null){
return;
}
if(futureResponse.getInvokeCallback()!=null){
//callbacktype
try{
executeResponseCallback(newRunnable(){
@Override
publicvoidrun(){
if(xxlRpcResponse.getErrorMsg()!=null){
futureResponse.getInvokeCallback().onFailure(newXxlRpcException(xxlRpcResponse.getErrorMsg()));
}else{
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
}catch(Exceptione){
logger.error(e.getMessage(),e);
}
}else{
//里面調(diào)用lock的notify方法
futureResponse.setResponse(xxlRpcResponse);
}
//doremove
futureResponsePool.remove(requestId);
}
審核編輯 :李倩
-
通信
+關(guān)注
關(guān)注
18文章
6036瀏覽量
136080 -
代碼
+關(guān)注
關(guān)注
30文章
4791瀏覽量
68694
原文標(biāo)題:xxl-job驚艷的設(shè)計(jì),怎能叫人不愛
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論