最近在開(kāi)發(fā) 延保服務(wù) 頻道頁(yè)時(shí),為了提高查詢(xún)效率,使用到了多線(xiàn)程技術(shù)。為了對(duì)多線(xiàn)程方案設(shè)計(jì)有更加充分的了解,在業(yè)余時(shí)間讀完了《圖解 Java 多線(xiàn)程設(shè)計(jì)模式》這本書(shū),覺(jué)得收獲良多。本篇文章將介紹其中提到的 Future 模式,以及在實(shí)際業(yè)務(wù)開(kāi)發(fā)中對(duì)該模式的應(yīng)用,而這些內(nèi)容對(duì)于本書(shū)來(lái)說(shuō)只是冰山一角,還是推薦大家有時(shí)間去閱讀原書(shū)。
1. Future 模式:“先給您提貨單”
我們先來(lái)看一個(gè)場(chǎng)景:假如我們?nèi)サ案獾曩I(mǎi)蛋糕,下單后,店員會(huì)遞給我們提貨單并告知“請(qǐng)您傍晚來(lái)取蛋糕”。到了傍晚我們拿著提貨單去取蛋糕,店員會(huì)先和我們說(shuō)“您的蛋糕已經(jīng)做好了”,然后將蛋糕拿給我們。
如果將下單蛋糕到取蛋糕的過(guò)程抽象成一個(gè)方法的話(huà),那么意味著這個(gè)方法需要花很長(zhǎng)的時(shí)間才能獲取執(zhí)行結(jié)果,與其一直等待結(jié)果,不如先拿著一張“提貨單”,到我們需要取貨的時(shí)候,再通過(guò)它去取,而獲取“提貨單”的過(guò)程是幾乎不耗時(shí)的,而這個(gè)提貨單對(duì)象就被稱(chēng)為 Future
,后續(xù)便可以通過(guò)它來(lái)獲取方法的返回值。用 Java 來(lái)表示這個(gè)過(guò)程的話(huà),需要使用到 FutureTask
和 Callable
兩個(gè)類(lèi),如下:
public class Example {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 預(yù)定蛋糕,并定義“提貨單”
System.out.println("我:預(yù)定蛋糕");
FutureTask future = new FutureTask?>(() -> {
System.out.println("店員:請(qǐng)您傍晚來(lái)取蛋糕");
Thread.sleep(2000);
System.out.println("店員:您的蛋糕已經(jīng)做好了");
return "Holiland";
});
// 開(kāi)始做蛋糕
new Thread(future).start();
// 去做其他事情
Thread.sleep(1000);
System.out.println("我:忙碌中...");
// 取蛋糕
System.out.println("我:取蛋糕 " + future.get());
}
}
// 運(yùn)行結(jié)果:
// 我:預(yù)定蛋糕
// 店員:請(qǐng)您傍晚來(lái)取蛋糕
// 我:忙碌中...
// 店員:您的蛋糕已經(jīng)做好了
// 我:取蛋糕 Holiland
方法的調(diào)用者可以將任務(wù)交給其他線(xiàn)程去處理,無(wú)需阻塞等待方法的執(zhí)行,這樣調(diào)用者便可以繼續(xù)執(zhí)行其他任務(wù),并能通過(guò) Future
對(duì)象獲取執(zhí)行結(jié)果。
它的運(yùn)行原理如下:創(chuàng)建 FutureTask
實(shí)例時(shí),Callable
對(duì)象會(huì)被傳遞給構(gòu)造函數(shù),當(dāng)線(xiàn)程調(diào)用 FutureTask
的 run
方法時(shí),Callable
對(duì)象的 call
方法也會(huì)被執(zhí)行。調(diào)用 call
方法的線(xiàn)程會(huì)同步地獲取結(jié)果,并通過(guò) FutureTask
的 set
方法來(lái)記錄結(jié)果對(duì)象,如果 call
方法執(zhí)行期間發(fā)生了異常,則會(huì)調(diào)用 setException
方法記錄異常。最后,通過(guò)調(diào)用 get
方法獲取方法的結(jié)果,注意這里可能會(huì)拋出方法執(zhí)行時(shí)產(chǎn)生的異常。
public void run() {
// ...
try {
// “提貨任務(wù)”
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調(diào)用 callable 的 call 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 捕獲并設(shè)置異常
setException(ex);
}
if (ran)
// 為結(jié)果賦值
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將結(jié)果賦值給 outcome 全局變量,供 get 時(shí)獲取
outcome = v;
// 修改狀態(tài)為 NORMAL
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將異常賦值給 outcome 變量,供 get 時(shí)拋出
outcome = t;
// 修改狀態(tài)為 EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 未完成時(shí)阻塞等一等
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結(jié)束的話(huà)能正常獲取到結(jié)果
if (s == NORMAL)
return (V)x;
// 否則會(huì)拋出異常,注意如果執(zhí)行中出現(xiàn)異常,調(diào)用 get 時(shí)會(huì)被拋出
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
現(xiàn)在對(duì) Future 模式 已經(jīng)有了基本的了解:它通過(guò) Future
接口來(lái)表示未來(lái)的結(jié)果,實(shí)現(xiàn) 調(diào)用者與執(zhí)行者之間的解耦,提高系統(tǒng)的吞吐量和響應(yīng)速度,那在實(shí)踐中對(duì)該模式是如何使用的呢?
2. 對(duì) Future 模式的實(shí)踐
因?yàn)?延保服務(wù) 頻道頁(yè)訪問(wèn)量大且對(duì)接口性能要求較高,單線(xiàn)程處理并不能滿(mǎn)足性能要求,所以應(yīng)用了 Future 模式 來(lái)提高查詢(xún)效率,但是并沒(méi)有借助上文所述的 FutureTask
來(lái)實(shí)現(xiàn),而是使用了 CompletableFuture
工具類(lèi),它們的實(shí)現(xiàn)原理基本一致,但是后者提供的方法和對(duì) 鏈?zhǔn)?a target="_blank">編程 的支持使代碼更加簡(jiǎn)潔,實(shí)現(xiàn)更加容易(相關(guān) API 參考見(jiàn)文末)。
如下是使用 CompletableFuture
異步多線(xiàn)程查詢(xún)訂單列表的邏輯,根據(jù)配置的 pageNo
分多條線(xiàn)程查詢(xún)各頁(yè)的訂單數(shù)據(jù):
List result = new ArrayList?>();
// 并發(fā)查詢(xún)訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢(xún)的頁(yè)數(shù) pageNo,并發(fā)查詢(xún)不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture.supplyAsync(
() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);
futureList.add(future);
}
// 等待所有線(xiàn)程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢(xún)用戶(hù)訂單信息異常", e);
}
這段代碼中對(duì)異常的處理能進(jìn)行優(yōu)化:第 15 行代碼,如果某條線(xiàn)程查詢(xún)訂單列表時(shí)發(fā)生異常,那么在調(diào)用 get
方法時(shí)會(huì)拋出該異常,被 catch
后返回空結(jié)果,即使有其他線(xiàn)程查詢(xún)成功,這些訂單結(jié)果值也會(huì)被忽略掉,可以針對(duì)這一點(diǎn)進(jìn)行優(yōu)化,如下:
List result = new ArrayList?>();
// 并發(fā)查詢(xún)訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢(xún)的頁(yè)數(shù) pageNo,并發(fā)查詢(xún)不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢(xún)用戶(hù)訂單信息異常", e);
return Collections.emptyList();
});
futureList.add(future);
}
// 等待所有線(xiàn)程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢(xún)用戶(hù)訂單信息異常", e);
}
優(yōu)化后針對(duì)查詢(xún)發(fā)生異常的任務(wù)打印異常日志,并返回空集合,這樣即使單線(xiàn)程查詢(xún)失敗,也不會(huì)影響到其他線(xiàn)程查詢(xún)成功的結(jié)果。
CompletableFuture
還提供了 allOf
方法,它返回的 CompletableFuture
對(duì)象在所有 CompletableFuture
執(zhí)行完成時(shí)完成,相比于對(duì)每個(gè)任務(wù)都調(diào)用 get
阻塞等待任務(wù)完成的實(shí)現(xiàn)可讀性更好,改造后代碼如下:
List result = new ArrayList?>();
// 并發(fā)查詢(xún)訂單列表
CompletableFuture>[] futures = new CompletableFuture[pageNo];
// 配置需要查詢(xún)的頁(yè)數(shù) pageNo,并發(fā)查詢(xún)不同頁(yè)碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢(xún)用戶(hù)訂單信息異常", e);
return Collections.emptyList();
});
futures[i - 1] = future;
}
try {
// 等待所有線(xiàn)程處理完畢
CompletableFuture.allOf(futures).get();
for (CompletableFuture> future : futures) {
List orderInfoList = future.get();
if (CollectionUtils.isEmpty(orderInfoList)) {
result.addAll(orderInfoList);
}
}
} catch (Exception e) {
log.error("處理用戶(hù)訂單結(jié)果信息異常", e);
}
Tips:
CompletableFuture
的設(shè)計(jì)初衷是支持異步編程,所以應(yīng)盡量避免在CompletableFuture
鏈中使用get()/join()
方法,因?yàn)檫@些方法會(huì)阻塞當(dāng)前線(xiàn)程直到CompletableFuture
完成,應(yīng)該在必須使用該結(jié)果值時(shí)才調(diào)用它們。
相關(guān)的模式:命令模式
命令模式能將操作的調(diào)用者和執(zhí)行者解耦,它能很容易的與 Future 模式 結(jié)合,以查詢(xún)訂單的任務(wù)為例,我們可以將該任務(wù)封裝為“命令”對(duì)象的形式,執(zhí)行時(shí)為每個(gè)線(xiàn)程提交一個(gè)命令,實(shí)現(xiàn)解耦并提高擴(kuò)展性。在命令模式中,命令對(duì)象需要 支持撤銷(xiāo)和重做,那么這便在查詢(xún)出現(xiàn)異常時(shí),提供了補(bǔ)償處理的可能,命令模式類(lèi)圖關(guān)系如下:
3.《圖解Java多線(xiàn)程設(shè)計(jì)模式》書(shū)籍推薦
我覺(jué)得本書(shū)算得上是一本老書(shū):05 年出版的基于 JDK1.5 的Java多線(xiàn)程書(shū)籍,相比于目前我們常用的 JDK1.8 和時(shí)髦的 JDK21,在讀之前總會(huì)讓人覺(jué)得有一種過(guò)時(shí)的感覺(jué)。但是當(dāng)我讀完時(shí),發(fā)現(xiàn)其中的模式能對(duì)應(yīng)上代碼中的處理邏輯:對(duì) CompletableFuture
的使用正對(duì)應(yīng)了其中的 Future 模式(異步獲取其他線(xiàn)程的執(zhí)行結(jié)果)等等,所以我覺(jué)得模式的應(yīng)用不會(huì)局限于技術(shù)的新老,它是在某種情況下,研發(fā)人員共識(shí)或通用的解決方案,在知曉某種模式,采用已有的技術(shù)實(shí)現(xiàn)它是容易的,而反過(guò)來(lái)在只掌握技術(shù)去探索模式是困難且沒(méi)有方向的。
同時(shí),我也在考慮一個(gè)問(wèn)題:對(duì)于新人學(xué)習(xí)多線(xiàn)程技術(shù)來(lái)說(shuō),究竟適不適合直接從模式入門(mén)呢?因?yàn)槲覍?duì)設(shè)計(jì)模式有了比較多的實(shí)踐經(jīng)驗(yàn),所以對(duì)“模式”相關(guān)的內(nèi)容足夠敏感,如果新人沒(méi)有這些經(jīng)驗(yàn)的話(huà),這對(duì)他們來(lái)說(shuō)會(huì)不會(huì)更像是一個(gè)個(gè)知識(shí)點(diǎn)的堆砌呢?好在的是,本書(shū)除了模式相關(guān)的內(nèi)容,對(duì)基礎(chǔ)知識(shí)也做足了鋪墊,而且提出的關(guān)于多線(xiàn)程編程的思考點(diǎn)也是非常值得參考和學(xué)習(xí)的,以線(xiàn)程互斥和協(xié)同為例,書(shū)中談到:在對(duì)線(xiàn)程進(jìn)行互斥處理時(shí)需要考慮 “要保護(hù)的東西是什么”,這樣便能夠 清晰的確定鎖的粒度;對(duì)于線(xiàn)程的協(xié)同,書(shū)中提到的是需要考慮 “放在中間的東西是什么”,直接的拋出這個(gè)觀點(diǎn)是不容易理解的,“中間的東西”是在多線(xiàn)程的 生產(chǎn)者和消費(fèi)者模式 中提出的,部分線(xiàn)程負(fù)責(zé)生產(chǎn),生產(chǎn)完成后將對(duì)象放在“中間”,部分線(xiàn)程負(fù)責(zé)消費(fèi),消費(fèi)時(shí)取的便是“中間”的對(duì)象,而合理規(guī)劃這些中間的東西便能 消除生產(chǎn)者和消費(fèi)者之間的速度差異,提高系統(tǒng)的吞吐量和響應(yīng)速度。而再深入考慮這兩個(gè)角度時(shí),線(xiàn)程的互斥和協(xié)同其實(shí)是內(nèi)外統(tǒng)一的:為了讓線(xiàn)程協(xié)調(diào)運(yùn)行,必須執(zhí)行互斥處理,以防止共享的內(nèi)容被破壞,而線(xiàn)程的互斥是為了線(xiàn)程的協(xié)調(diào)運(yùn)行才進(jìn)行的必要操作。
附:CompletableFuture 常用 API
使用 supplyAsync 方法異步執(zhí)行任務(wù),并返回 CompletableFuture 對(duì)象
如下代碼所示,調(diào)用 CompletableFuture.supplyAsync
靜態(tài)方法異步執(zhí)行查詢(xún)邏輯,并返回一個(gè)新的 CompletableFuture
對(duì)象
CompletableFuture> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);
使用 join 方法阻塞獲取完成結(jié)果
如下代碼所示,在封裝結(jié)果前,調(diào)用 join
方法阻塞等待獲取結(jié)果
futureList.forEach(CompletableFuture::join);
它與 get
方法的主要區(qū)別在于,join
方法拋出的是未經(jīng)檢查的異常 CompletionException
,并將原始異常作為其原因,這意味著我們可以不需要在方法簽名中聲明它或在調(diào)用 join
方法的地方進(jìn)行異常處理,而 get
方法會(huì)拋出 InterruptedException
和 ExecutionException
異常,我們必須對(duì)它進(jìn)行處理,get
方法源碼如下:
public T get() throws InterruptedException, ExecutionException {
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
}
用 thenApply(Function) 和 thenAccept(Consumer) 等回調(diào)函數(shù)處理結(jié)果
如下是使用 thenApply()
方法對(duì) CompletableFuture
的結(jié)果進(jìn)行轉(zhuǎn)換的操作:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(greeting -> greeting + " World");
使用 exceptionally() 處理 CompletableFuture 中的異常
CompletableFuture
提供了exceptionally()
方法來(lái)處理異常,這是一個(gè)非常重要的步驟。如果在 CompletableFuture
的運(yùn)行過(guò)程中拋出異常,那么這個(gè)異常會(huì)被傳遞到最終的結(jié)果中。如果沒(méi)有適當(dāng)?shù)漠惓L幚恚敲丛谡{(diào)用 get()
或 join()
方法時(shí)可能會(huì)拋出異常。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Exception occurred");
}
return "Hello, World!";
}).exceptionally(e -> "An error occurred");
使用 allOf() 和 anyOf() 處理多個(gè) CompletableFuture
如果有多個(gè) CompletableFuture
需要處理,可以使用 CompletableFuture.allOf()
或者 CompletableFuture.anyOf()
。allOf()
在所有的 CompletableFuture
完成時(shí)完成,而 anyOf()
則會(huì)在任意一個(gè) CompletableFuture
完成時(shí)完成。
complete()、completeExceptionally()、cancel() 方法
CompletableFuture
的運(yùn)行是在調(diào)用了 complete()
、completeExceptionally()
、cancel()
等方法后才會(huì)被標(biāo)記為完成。如果沒(méi)有正確地完成 CompletableFuture
,那么在調(diào)用 get()
方法時(shí)可能會(huì)永久阻塞。這三個(gè)方法在 Java 并發(fā)編程中有著重要的應(yīng)用。以下是這三個(gè)方法的常見(jiàn)使用場(chǎng)景:
complete(T value)
: 此方法用于顯式地完成一個(gè) CompletableFuture
,并設(shè)置它的結(jié)果值。這在你需要在某個(gè)計(jì)算完成時(shí),手動(dòng)設(shè)置 CompletableFuture
的結(jié)果值的場(chǎng)景中非常有用。例如,你可能在一個(gè)異步操作完成時(shí),需要設(shè)置 CompletableFuture
的結(jié)果值。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.complete("Operation Result");
completeExceptionally(Throwable ex)
: 此方法用于顯式地以異常完成一個(gè) CompletableFuture
。這在你需要在某個(gè)計(jì)算失敗時(shí),手動(dòng)設(shè)置 CompletableFuture
的異常的場(chǎng)景中非常有用。例如,你可能在一個(gè)異步操作失敗時(shí),需要設(shè)置 CompletableFuture
的異常。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
cancel(boolean mayInterruptIfRunning)
: 此方法用于取消與 CompletableFuture
關(guān)聯(lián)的計(jì)算。這在你需要取消一個(gè)長(zhǎng)時(shí)間運(yùn)行的或者不再需要的計(jì)算的場(chǎng)景中非常有用。例如,你可能在用戶(hù)取消操作或者超時(shí)的情況下,需要取消 CompletableFuture
的計(jì)算。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Long running operation
});
// Some condition
future.cancel(true);
這些方法都是線(xiàn)程安全的,可以從任何線(xiàn)程中調(diào)用。
使用 thenCompose() 處理嵌套的 CompletableFuture
如果在處理 CompletableFuture
的結(jié)果時(shí)又創(chuàng)建了新的CompletableFuture
,那么就會(huì)產(chǎn)生嵌套的 CompletableFuture
。這時(shí)可以使用 thenCompose()
方法來(lái)避免 CompletableFuture
的嵌套,如下代碼所示:
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
使用 thenCombine() 處理兩個(gè) CompletableFuture 的結(jié)果
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
審核編輯 黃宇
-
JAVA
+關(guān)注
關(guān)注
19文章
2967瀏覽量
104751 -
API
+關(guān)注
關(guān)注
2文章
1501瀏覽量
62017 -
多線(xiàn)程
+關(guān)注
關(guān)注
0文章
278瀏覽量
19956
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論