基礎(chǔ)準備
ResponseMsg
TaskService
阻塞調(diào)用
Callable異步調(diào)用
DeferredResult異步調(diào)用
后記
大家都知道,Callable和DeferredResult可以用來進行異步請求處理。利用它們,我們可以異步生成返回值,在具體處理的過程中,我們直接在controller中返回相應(yīng)的Callable或者DeferredResult,在這之后,servlet線程將被釋放,可用于其他連接;DeferredResult另外會有線程來進行結(jié)果處理,并setResult。
基礎(chǔ)準備
在正式開始之前,我們先做一點準備工作,在項目中新建了一個base模塊。其中包含一些提供基礎(chǔ)支持的java類,在其他模塊中可能會用到。
ResponseMsg
我們定義了一個ResponseMsg的實體類來作為我們的返回值類型:
@Data @NoArgsConstructor @AllArgsConstructor publicclassResponseMsg{ privateintcode; privateStringmsg; privateTdata; }
非常簡單,里面包含了code、msg和data三個字段,其中data為泛型類型。另外類的注解Data、NoArgsConstructor和AllArgsConstructor都是lombok提供的簡化我們開發(fā)的,主要功能分別是,為我們的類生成set和get方法,生成無參構(gòu)造器和生成全參構(gòu)造器。
使用idea進行開發(fā)的童鞋可以裝一下lombok的支持插件。另外,lombok的依賴參見:
org.projectlombok lombok-maven 1.16.16.0 pom
TaskService
我們建立了一個TaskService,用來為阻塞調(diào)用和Callable調(diào)用提供實際結(jié)果處理的。代碼如下:
@Service publicclassTaskService{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskService.class); publicResponseMsggetResult(){ log.info("任務(wù)開始執(zhí)行,持續(xù)等待中..."); try{ Thread.sleep(30000L); }catch(InterruptedExceptione){ e.printStackTrace(); } log.info("任務(wù)處理完成"); returnnewResponseMsg (0,"操作成功","success"); } }
可以看到,里面實際提供服務(wù)的是getResult方法,這邊直接返回一個new ResponseMsg(0,“操作成功”,“success”)。但是其中又特意讓它sleep了30秒,模擬一個耗時較長的請求。
阻塞調(diào)用
平時我們用的最普遍的還是阻塞調(diào)用,通常請求的處理時間較短,在并發(fā)量較小的情況下,使用阻塞調(diào)用問題也不是很大。 阻塞調(diào)用實現(xiàn)非常簡單,我們首先新建一個模塊blockingtype,里面只包含一個controller類,用來接收請求并利用TaskService來獲取結(jié)果。
@RestController publicclassBlockController{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(BlockController.class); @Autowired privateTaskServicetaskService; @RequestMapping(value="/get",method=RequestMethod.GET) publicResponseMsggetResult(){ log.info("接收請求,開始處理..."); ResponseMsg result=taskService.getResult(); log.info("接收任務(wù)線程完成并退出"); returnresult; } }
我們請求的是getResult方法,其中調(diào)用了taskService,這個taskService我們是注入得到的。關(guān)于怎么跨模塊注入的,其實也非常簡單,在本模塊,加入對其他模塊的依賴就可以了。比如這里我們在blockingtype的pom.xml文件中加入對base模塊的依賴:
com.sunny base 1.0-SNAPSHOT
然后我們看一下實際調(diào)用效果,這里我們設(shè)置端口號為8080,啟動日志如下:
2018-06-2419:02:48.514INFO11207---[main]com.sunny.BlockApplication:StartingBlockApplicationonxdeMacBook-Pro.localwithPID11207(/Users/zsunny/IdeaProjects/asynchronoustask/blockingtype/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask) 2018-06-2419:02:48.519INFO11207---[main]com.sunny.BlockApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default 2018-06-2419:02:48.762INFO11207---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:02:48CST2018];rootofcontexthierarchy 2018-06-2419:02:50.756INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8080(http) 2018-06-241950.778INFO11207---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat] 2018-06-241950.780INFO11207---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23 2018-06-241950.922INFO11207---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext 2018-06-241950.922INFO11207---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2200ms 2018-06-241951.156INFO11207---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/] 2018-06-241951.162INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*] 2018-06-241951.620INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241948CST2018];rootofcontexthierarchy 2018-06-241951.724INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopubliccom.sunny.entity.ResponseMsgcom.sunny.controller.BlockController.getResult() 2018-06-241951.730INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity >org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-241951.731INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241951.838INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241952.126INFO11207---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup 2018-06-241952.205INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8080(http) 2018-06-241952.211INFO11207---[main]com.sunny.BlockApplication:StartedBlockApplicationin5.049seconds(JVMrunningfor6.118)
可以看到順利啟動了,那么我們就來訪問一下:
http://localhost:8080/get
等待了大概30秒左右,得到j(luò)son數(shù)據(jù):
{"code":0,"msg":"操作成功","data":"success"}
然后我們來看看控制臺的日志:
2018-06-2419:04:07.315INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收請求,開始處理... 2018-06-2419:04:07.316INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任務(wù)開始執(zhí)行,持續(xù)等待中... 2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任務(wù)處理完成 2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收任務(wù)線程完成并退出
可以看到在“ResponseMsg result = taskService.getResult();”的時候是阻塞了大約30秒鐘,隨后才執(zhí)行它后面的打印語句“l(fā)og.info(“接收任務(wù)線程完成并退出”);”。
Callable異步調(diào)用
涉及到較長時間的請求處理的話,比較好的方式是用異步調(diào)用,比如利用Callable返回結(jié)果。異步主要表現(xiàn)在,接收請求的servlet可以不用持續(xù)等待結(jié)果產(chǎn)生,而可以被釋放去處理其他事情。當然,在調(diào)用者來看的話,其實還是表現(xiàn)在持續(xù)等待30秒。這有利于服務(wù)端提供更大的并發(fā)處理量。
這里我們新建一個callabledemo模塊,在這個模塊中,我們一樣只包含一個TaskController,另外也是需要加入base模塊的依賴。只不過這里我們的返回值不是ResponseMsg類型了,而是一個Callable類型。
@RestController publicclassTaskController{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class); @Autowired privateTaskServicetaskService; @RequestMapping(value="/get",method=RequestMethod.GET) publicCallable>getResult(){ log.info("接收請求,開始處理..."); Callable >result=(()->{ returntaskService.getResult(); }); log.info("接收任務(wù)線程完成并退出"); returnresult; } }
在里面,我們創(chuàng)建了一個Callable類型的變量result,并實現(xiàn)了它的call方法,在call方法中,我們也是調(diào)用taskService的getResult方法得到返回值并返回。
下一步我們就運行一下這個模塊,這里我們在模塊的application.yml中設(shè)置端口號為8081:
server: port:8081
啟動,可以看到控制臺的消息:
2018-06-2419:38:14.658INFO11226---[main]com.sunny.CallableApplication:StartingCallableApplicationonxdeMacBook-Pro.localwithPID11226(/Users/zsunny/IdeaProjects/asynchronoustask/callabledemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask) 2018-06-2419:38:14.672INFO11226---[main]com.sunny.CallableApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default 2018-06-2419:38:14.798INFO11226---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:38:14CST2018];rootofcontexthierarchy 2018-06-2419:38:16.741INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8081(http) 2018-06-241916.762INFO11226---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat] 2018-06-241916.764INFO11226---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23 2018-06-241916.918INFO11226---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext 2018-06-241916.919INFO11226---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2126ms 2018-06-241917.144INFO11226---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/] 2018-06-241917.149INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*] 2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*] 2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*] 2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*] 2018-06-241917.632INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241914CST2018];rootofcontexthierarchy 2018-06-241917.726INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicjava.util.concurrent.Callable>com.sunny.controller.TaskController.getResult() 2018-06-241917.731INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity >org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-241917.733INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241917.825INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241918.084INFO11226---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup 2018-06-241918.176INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8081(http) 2018-06-241918.183INFO11226---[main]com.sunny.CallableApplication:StartedCallableApplicationin4.538seconds(JVMrunningfor5.327)
完美啟動了,然后我們還是一樣,訪問一下:
http://localhost:8081/get
在大約等待了30秒左右,我們在瀏覽器上得到j(luò)son數(shù)據(jù):
{"code":0,"msg":"操作成功","data":"success"}
和阻塞調(diào)用的結(jié)果一樣——當然一樣啦,都是同taskService中得到的結(jié)果。
然后我們看看控制臺的消息:
2018-06-2419:39:07.738INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2419:39:07.740INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出 2018-06-2419:39:07.753INFO11226---[MvcAsync1]com.sunny.service.TaskService:任務(wù)開始執(zhí)行,持續(xù)等待中... 2018-06-2419:39:37.756INFO11226---[MvcAsync1]com.sunny.service.TaskService:任務(wù)處理完成
很顯然,這里的消息出現(xiàn)的順序和阻塞模式有所不同了,這里在“接收請求,開始處理…”之后直接打印了“接收任務(wù)線程完成并退出”。而不是先出現(xiàn)“任務(wù)處理完成”后再出現(xiàn)“接收任務(wù)線程完成并退出”。
這就說明,這里沒有阻塞在從taskService中獲得數(shù)據(jù)的地方,controller中直接執(zhí)行后面的部分(這里可以做其他很多事,不僅僅是打印日志)。
DeferredResult異步調(diào)用
前面鋪墊了那么多,還是主要來說DeferredResult的;和Callable一樣,DeferredResult也是為了支持異步調(diào)用。兩者的主要差異,Sunny覺得主要在DeferredResult需要自己用線程來處理結(jié)果setResult,而Callable的話不需要我們來維護一個結(jié)果處理線程。
總體來說,Callable的話更為簡單,同樣的也是因為簡單,靈活性不夠;相對地,DeferredResult更為復雜一些,但是又極大的靈活性。在可以用Callable的時候,直接用Callable;而遇到Callable沒法解決的場景的時候,可以嘗試使用DeferredResult。
這里Sunny將會設(shè)計兩個DeferredResult使用場景。
場景一:
創(chuàng)建一個持續(xù)在隨機間隔時間后從任務(wù)隊列中獲取任務(wù)的線程
訪問controller中的方法,創(chuàng)建一個DeferredResult,設(shè)定超時時間和超時返回對象
設(shè)定DeferredResult的超時回調(diào)方法和完成回調(diào)方法
將DeferredResult放入任務(wù)中,并將任務(wù)放入任務(wù)隊列
步驟1中的線程獲取到任務(wù)隊列中的任務(wù),并產(chǎn)生一個隨機結(jié)果返回
場景其實非常簡單,接下來我們來看看具體的實現(xiàn)。首先,我們還是來看任務(wù)實體類是怎么樣的。
/** *任務(wù)實體類 */ @Data @NoArgsConstructor @AllArgsConstructor publicclassTask{ privateinttaskId; privateDeferredResult>taskResult; @Override publicStringtoString(){ return"Task{"+ "taskId="+taskId+ ",taskResult"+"{responseMsg="+taskResult.getResult()+"}"+ '}'; } }
看起來非常簡單,成員變量又taskId和taskResult,前者是int類型,后者為我們的DeferredResult類型,它的泛型類型為ResponseMsg,注意這里用到ResponseMsg,所以也需要導入base模塊的依賴。
另外注解之前已經(jīng)說明了,不過這里再提一句,@Data注解也包含了toString的重寫,但是這里為了知道具體的ResponseMsg的內(nèi)容,Sunny特意手動重寫。
看完Task類型,我們再來看看任務(wù)隊列。
@Component publicclassTaskQueue{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskQueue.class); privatestaticfinalintQUEUE_LENGTH=10; privateBlockingQueuequeue=newLinkedBlockingDeque<>(QUEUE_LENGTH); privateinttaskId=0; /** *加入任務(wù) *@paramdeferredResult */ publicvoidput(DeferredResult >deferredResult){ taskId++; log.info("任務(wù)加入隊列,id為:{}",taskId); queue.offer(newTask(taskId,deferredResult)); } /** *獲取任務(wù) *@return *@throwsInterruptedException */ publicTasktake()throwsInterruptedException{ Tasktask=queue.poll(); log.info("獲得任務(wù):{}",task); returntask; } }
這里我們將它作為一個bean,之后會在其他bean中注入,這里實際的隊列為成員變量queue,它是LinkedBlockingDeque類型的。還有一個成員變量為taskId,是用于自動生成任務(wù)id的,并且在加入任務(wù)的方法中實現(xiàn)自增,以確保每個任務(wù)的id唯一性。方法的話又put和take方法,分別用于向隊列中添加任務(wù)和取出任務(wù);其中,對queue的操作,分別用了offer和poll,這樣是實現(xiàn)一個非阻塞的操作,并且在隊列為空和隊列已滿的情況下不會拋出異常。
另外,大家實現(xiàn)的時候,可以考慮使用ConcurrentLinkedQueue來高效處理并發(fā),因為它屬于無界非阻塞隊列,使用過程中需要考慮可能造成的OOM問題。Sunny這里選擇阻塞隊列LinkedBlockingDeque,它底層使用加鎖進行了同步;但是這里使用了TaskQueue進行封裝,處理過程中有一些額外操作,調(diào)用時需要加鎖以防發(fā)生某些意料之外的問題。
然后我們來看步驟1中的,啟動一個持續(xù)從任務(wù)隊列中獲取任務(wù)的線程的具體實現(xiàn)。
@Component publicclassTaskExecute{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecute.class); privatestaticfinalRandomrandom=newRandom(); //默認隨機結(jié)果的長度 privatestaticfinalintDEFAULT_STR_LEN=10; //用于生成隨機結(jié)果 privatestaticfinalStringstr="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; @Autowired privateTaskQueuetaskQueue; /** *初始化啟動 */ @PostConstruct publicvoidinit(){ log.info("開始持續(xù)處理任務(wù)"); newThread(this::execute).start(); } /** *持續(xù)處理 *返回執(zhí)行結(jié)果 */ privatevoidexecute(){ while(true){ try{ //取出任務(wù) Tasktask; synchronized(taskQueue){ task=taskQueue.take(); } if(task!=null){ //設(shè)置返回結(jié)果 StringrandomStr=getRandomStr(DEFAULT_STR_LEN); ResponseMsgresponseMsg=newResponseMsg (0,"success",randomStr); log.info("返回結(jié)果:{}",responseMsg); task.getTaskResult().setResult(responseMsg); } inttime=random.nextInt(10); log.info("處理間隔:{}秒",time); Thread.sleep(time*1000L); }catch(InterruptedExceptione){ e.printStackTrace(); } } } /** *獲取長度為len的隨機串 *@paramlen *@return */ privateStringgetRandomStr(intlen){ intmaxInd=str.length(); StringBuildersb=newStringBuilder(); intind; for(inti=0;i
這里,我們注入了TaskQueue,成員變量比較簡單并且有注釋,不再說明,主要來看方法。先看一下最后一個方法getRandomStr,很顯然,這是一個獲得長度為len的隨機串的方法,訪問限定為private,為類中其他方法服務(wù)的。然后我們看init方法,它執(zhí)行的其實就是開啟了一個線程并且執(zhí)行execute方法,注意一下它上面的@PostContruct注解,這個注解就是在這個bean初始化的時候就執(zhí)行這個方法。
所以我們需要關(guān)注的實際邏輯在execute方法中??梢钥吹?,在execute方法中,用了一個while(true)來保證線程持續(xù)運行。因為是并發(fā)環(huán)境下,考慮對taskQueue加鎖,從中取出任務(wù);如果任務(wù)不為空,獲取用getRandomStr生成一個隨機結(jié)果并用setResult方法進行返回。
最后可以看到,利用random生成來一個[0,10)的隨機數(shù),并讓線程sleep相應(yīng)的秒數(shù)。這里注意一下,需要設(shè)定一個時間間隔,否則,先線程持續(xù)跑會出現(xiàn)CPU負載過高的情況。
接下來我們就看看controller是如何處理的。
@RestController publicclassTaskController{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class); //超時結(jié)果 privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超時","outoftime"); //超時時間 privatestaticfinallongOUT_OF_TIME=3000L; @Autowired privateTaskQueuetaskQueue; @RequestMapping(value="/get",method=RequestMethod.GET) publicDeferredResult >getResult(){ log.info("接收請求,開始處理..."); //建立DeferredResult對象,設(shè)置超時時間,以及超時返回超時結(jié)果 DeferredResult >result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT); result.onTimeout(()->{ log.info("調(diào)用超時"); }); result.onCompletion(()->{ log.info("調(diào)用完成"); }); //并發(fā),加鎖 synchronized(taskQueue){ taskQueue.put(result); } log.info("接收任務(wù)線程完成并退出"); returnresult; } }
這里我們同樣注入了taskQueue。請求方法就只有一個getResult,返回值為DeferredResult
。這里我們首先創(chuàng)建了DeferredResult對象result并且設(shè)定超時時間和超時返回結(jié)果;隨后設(shè)定result的onTimeout和onCompletion方法,其實就是傳入兩個Runnable對象來實現(xiàn)回調(diào)的效果;之后就是加鎖并且將result加入任務(wù)隊列中。 總體來說,場景不算非常復雜,看到這里大家應(yīng)該都能基本了解了。然后我們來跑一下測試一下。
我們在application.yml中設(shè)定端口為8082:
server: port:8082
啟動模塊,控制臺信息如下:
2018-06-2421:49:28.815INFO11322---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID11322(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask) 2018-06-2421:49:28.821INFO11322---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default 2018-06-2421:49:29.010INFO11322---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun2421:49:28CST2018];rootofcontexthierarchy 2018-06-2421:49:30.971INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8082(http) 2018-06-242130.980INFO11322---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat] 2018-06-242130.981INFO11322---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23 2018-06-242131.062INFO11322---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext 2018-06-242131.063INFO11322---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2066ms 2018-06-242131.207INFO11322---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/] 2018-06-242131.212INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*] 2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*] 2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*] 2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*] 2018-06-242131.247INFO11322---[main]com.sunny.bean.TaskExecute:開始持續(xù)處理任務(wù) 2018-06-242131.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null 2018-06-242131.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒 2018-06-242131.498INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun242128CST2018];rootofcontexthierarchy 2018-06-242131.572INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult() 2018-06-242131.576INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity >org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-242131.577INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-242131.628INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-242131.811INFO11322---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup 2018-06-242131.892INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8082(http) 2018-06-242131.897INFO11322---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin3.683seconds(JVMrunningfor4.873) 2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null 2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒
首先程序完美啟動,這沒有問題,然后我們注意這幾條信息:
2018-06-2421:49:31.247INFO11322---[main]com.sunny.bean.TaskExecute:開始持續(xù)處理任務(wù) 2018-06-2421:49:31.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null 2018-06-2421:49:31.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒
這說明我們TaskExecute中已經(jīng)成功啟動了持續(xù)獲取任務(wù)的線程。
接著,我們還是訪問一下:
http://localhost:8082/get
這一回等待了若干秒就出現(xiàn)了結(jié)果:
{"code":0,"msg":"success","data":"CEUO2lmMJr"}
可以看到我們的隨機串是CEUO2lmMJr。再一次請求又會出現(xiàn)不同的隨機串。再看一下我們控制臺的相關(guān)信息:
2018-06-2421:51:04.303INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.bean.TaskQueue:任務(wù)加入隊列,id為:1 2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出 2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):Task{taskId=1,taskResult{responseMsg=null}} 2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskExecute:返回結(jié)果:ResponseMsg(code=0,msg=success,data=CEUO2lmMJr)
也是符合我們的預期,請求進來進入隊列中,由TaskExecute獲取請求并進行處理結(jié)果返回。
場景二
用戶發(fā)送請求到TaskController的getResult方法,該方法接收到請求,創(chuàng)建一個DeferredResult,設(shè)定超時時間和超時返回對象
設(shè)定DeferredResult的超時回調(diào)方法和完成回調(diào)方法,超時和完成都會將本次請求產(chǎn)生的DeferredResult從集合中remove
將DeferredResult放入集合中
另有一個TaskExecuteController,訪問其中一個方法,可取出集合中的等待返回的DeferredResult對象,并將傳入的參數(shù)設(shè)定為結(jié)果
首先我們來看看DeferredResult的集合類:
@Component @Data publicclassTaskSet{ privateSet>>set=newHashSet<>(); }
非常簡單,只包含了一個HashSet的成員變量。這里可以考慮用ConcurrentHashMap來實現(xiàn)高效并發(fā),Sunny這里簡單實用HashSet,配合加鎖實現(xiàn)并發(fā)處理。
然后我們看看發(fā)起調(diào)用的Controller代碼:
@RestController publicclassTaskController{ privateLoggerlog=LoggerFactory.getLogger(TaskController.class); //超時結(jié)果 privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超時","outoftime"); //超時時間 privatestaticfinallongOUT_OF_TIME=60000L; @Autowired privateTaskSettaskSet; @RequestMapping(value="/get",method=RequestMethod.GET) publicDeferredResult >getResult(){ log.info("接收請求,開始處理..."); //建立DeferredResult對象,設(shè)置超時時間,以及超時返回超時結(jié)果 DeferredResult >result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT); result.onTimeout(()->{ log.info("調(diào)用超時,移除任務(wù),此時隊列長度為{}",taskSet.getSet().size()); synchronized(taskSet.getSet()){ taskSet.getSet().remove(result); } }); result.onCompletion(()->{ log.info("調(diào)用完成,移除任務(wù),此時隊列長度為{}",taskSet.getSet().size()); synchronized(taskSet.getSet()){ taskSet.getSet().remove(result); } }); //并發(fā),加鎖 synchronized(taskSet.getSet()){ taskSet.getSet().add(result); } log.info("加入任務(wù)集合,集合大小為:{}",taskSet.getSet().size()); log.info("接收任務(wù)線程完成并退出"); returnresult; } }
和場景一中有些類似,但是注意這里在onTimeout和onCompletion中都多了一個移除元素的操作,這也就是每次調(diào)用結(jié)束,需要將集合中的DeferredResult對象移除,即集合中保存的都是等待請求結(jié)果的DeferredResult對象。
然后我們看處理請求結(jié)果的Controller:
@RestController publicclassTaskExecuteController{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecuteController.class); @Autowired privateTaskSettaskSet; @RequestMapping(value="/set/{result}",method=RequestMethod.GET) publicStringsetResult(@PathVariable("result")Stringresult){ ResponseMsgres=newResponseMsg<>(0,"success",result); log.info("結(jié)果處理開始,得到輸入結(jié)果為:{}",res); Set >>set=taskSet.getSet(); synchronized(set){ set.forEach((deferredResult)->{deferredResult.setResult(res);}); } return"Successfullysetresult:"+result; } }
看起來非常簡單,只是做了兩個操作,接收得到的參數(shù)并利用參數(shù)生成一個ResponseMsg對象,隨后將集合中的所有DeferredResult都設(shè)定結(jié)果為根據(jù)參數(shù)生成的ResponseMsg對象。最后返回一個提示:成功設(shè)置結(jié)果…
好了,話不多說,我們來啟動測試驗證一下。我們說一下驗證的過程,我們同時打開兩個請求,然后再設(shè)定一個結(jié)果,最后兩個請求都會得到這個結(jié)果。當然同時多個或者一個請求也是一樣。這里有一個地方需要注意一下:
瀏覽器可能會對相同的url請求有緩存策略,也就是同時兩個標簽向同一個url發(fā)送請求,瀏覽器只會先發(fā)送一個請求,等一個請求結(jié)束才會再發(fā)送另外一個請求。
這樣,我們考慮從兩個瀏覽器中發(fā)送請求:
localhost:8083/get
然后隨便找其中一個,發(fā)送請求來設(shè)置結(jié)果:
http://localhost:8083/set/aaa
首先我們先啟動模塊,可以從控制臺中看到完美啟動管理了:
2018-06-2500:18:44.379INFO12688---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID12688(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo2/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask) 2018-06-2500:18:44.382INFO12688---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default 2018-06-2500:18:44.489INFO12688---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun2500:18:44CST2018];rootofcontexthierarchy 2018-06-2500:18:45.650INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8083(http) 2018-06-250045.658INFO12688---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat] 2018-06-250045.659INFO12688---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23 2018-06-250045.722INFO12688---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext 2018-06-250045.723INFO12688---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin1241ms 2018-06-250045.817INFO12688---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/] 2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*] 2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*] 2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*] 2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*] 2018-06-250046.150INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun250044CST2018];rootofcontexthierarchy 2018-06-250046.197INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult() 2018-06-250046.199INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/set/{result}],methods=[GET]}"ontopublicjava.lang.Stringcom.sunny.controller.TaskExecuteController.setResult(java.lang.String) 2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity >org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-250046.237INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-250046.238INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-250046.262INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-250046.362INFO12688---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup 2018-06-250046.467INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8083(http) 2018-06-250046.472INFO12688---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin2.675seconds(JVMrunningfor3.623)
完美啟動,接下來Sunny在火狐中發(fā)起一個請求
可以看到正在等待請求結(jié)果。隨后我們在谷歌瀏覽器中發(fā)起請求
兩個請求同時處于等待狀態(tài),這時候我們看一下控制臺信息:
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:1 2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出 2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:2 2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出
可以看到兩個請求都已經(jīng)接收到了,并且加入了隊列。這時候,我們再發(fā)送一個設(shè)置結(jié)果的請求。
隨后我們查看兩個調(diào)用請求的頁面,發(fā)現(xiàn)頁面已經(jīng)不在等待狀態(tài)中了,都已經(jīng)得到了結(jié)果。
另外,再給大家展示一下超時的結(jié)果,即我們發(fā)起調(diào)用請求,但是不發(fā)起設(shè)置結(jié)果的請求,等待時間結(jié)束。
查看控制臺信息:
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:1 2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出 2018-06-2500:27:16.014INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:調(diào)用超時,移除任務(wù),此時隊列長度為1 2018-06-2500:27:16.018INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:調(diào)用完成,移除任務(wù),此時隊列長度為0
后記
想要完整代碼的童鞋,點這里:
https://gitee.com/sunnymore/asynchronous_task
-
吞吐量
+關(guān)注
關(guān)注
0文章
47瀏覽量
12344 -
代碼
+關(guān)注
關(guān)注
30文章
4791瀏覽量
68694 -
異步請求
+關(guān)注
關(guān)注
0文章
2瀏覽量
1129
原文標題:提高系統(tǒng)吞吐量的一把利器:DeferredResult 到底有多強?
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論