本文主要介紹了一個(gè)在 GreptimeDB 中遇到的一個(gè)關(guān)于異步取消 (async cancellation) 的“奇怪”問題[1]。
The Problem
針對這個(gè)問題,我們首先描述一個(gè)簡化的場景:在一個(gè)長時(shí)間運(yùn)行的測試中存在元信息損壞的問題,有一個(gè)應(yīng)該單調(diào)遞增的序列號出現(xiàn)了重復(fù)。
序列號的更新邏輯非常簡單:從一個(gè)原子變量中讀取當(dāng)前值,然后通過異步 I/O 方法 persist_number()將新值寫入文件里,最后更新這個(gè)原子變量。整個(gè)流程都是串行化的(file 是一個(gè)獨(dú)占引用)。
asyncfnupdate_metadata(file:&mutFile,counter:AtomicU64)->Result<()>{
letnext_number=counter.load(Ordering::Relaxed)+1;
persist_number(file,next_number).await?;
counter.fetch_add(1,Ordering::Relaxed);
}
由于一些原因,我們在這里使用了 load 函數(shù)而非 fetch_add(雖然單就這里來說可以用fetch_add,并且用了還不會(huì)引發(fā)這次的問題)。當(dāng)這個(gè)更新流程在中間出現(xiàn)錯(cuò)誤時(shí),我們不希望更新內(nèi)存中的計(jì)數(shù)器。我們清楚如果persist_number() 寫入文件時(shí)失敗就能夠從 ?提前返回,并且會(huì)提早結(jié)束執(zhí)行來傳播錯(cuò)誤,所以編碼的時(shí)候會(huì)注意這些問題。
但是到了 .await 這里事情就變得奇妙了起來,因?yàn)?async cancellation 帶來了一個(gè)隱藏的控制流。
Async Cancellation
async task and runtime
如果這時(shí)候你已經(jīng)猜到了到底是什么引發(fā)了這個(gè)問題,可以跳過這一章節(jié)。如果沒有,就讓我從一些偽代碼開始解釋在 await point 那里到底發(fā)生了什么,以及 runtime 是如何參與其中的。
-
poll_future
首先是poll_future,對應(yīng)到Future的 poll[2] 方法。我們寫的異步方法都會(huì)被轉(zhuǎn)化成類似這樣子的一個(gè)匿名的Future實(shí)現(xiàn)。
fnpoll_future()->FutureOutput{
matchstatus_of_the_task{
Ready(output)=>{
//thetaskisfinished,andwehaveitoutput.
//somelogic
returnour_output;
},
Pending=>{
//itisnotready,wedon'thavetheoutput.
//thuswecannotmakeprogressandneedtowait
returnPending;
}
}
}
?
async塊通常包含其他的異步方法,比如update_metadata和persist_number。這里把persist_number稱為update_metadata的子異步任務(wù)。每個(gè).await都會(huì)被展開成類似poll_future的東西,等待子任務(wù)的結(jié)果并繼續(xù)執(zhí)行。在這個(gè)例子中就是等待persist_number的結(jié)果返回Ready再更新計(jì)數(shù)器,否則不更新。
[2] poll:
https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
-
runtime
第二段偽代碼是一個(gè)簡化的 runtime,它負(fù)責(zé)輪詢 (poll) 異步任務(wù)直到它們完成(考慮到接下來的文章內(nèi)容,“直到……完成”這種表述并不適合于所有情況)。在 GreptimeDB 中我們使用tokio[3] 作為 runtime?,F(xiàn)在的異步 runtime 可能有很多特性和功能,其中最基礎(chǔ)的就是輪詢這些任務(wù)。
fnruntime(&self){
loop{
letfuture_tasks:Vec=self.get_tasks();
fortaskintasks{
matchtask.poll_future(){
Ready(output)=>{
//thistaskisfinished.wakeitwiththeresult
task.wake(output);
},
Pending=>{
//thistaskneedssometimetorun.pollitlater
self.poll_later(task);
}
}
}
}
}
通過結(jié)合上述兩個(gè)簡化的 future 和 runtime 模型,我們得到如下這個(gè)循環(huán)(真實(shí)的 runtime 非常復(fù)雜,這里為了內(nèi)容集中省略了很多)。
fnrun()->Output{
loop{
ifletReady(result)=task.poll(){
returnresult;
}
}
}
需要強(qiáng)調(diào)的是,每個(gè) .await 都代表著一個(gè)或者多個(gè)函數(shù)調(diào)用 (調(diào)用到 poll() 或者說是 poll_future() )。這就是標(biāo)題中“隱藏的控制流”,以及 cancellation 發(fā)生的地方。
我們再看一段簡單的程序來探測 runtime 的行為(可以直接在 playground[4]里面運(yùn)行這段代碼):
usetokio::{sleep,Duration,timeout};
#[tokio::main]
asyncfnmain(){
letf=async{
print(1).await;
println!("1isdone");
print(2).await;
println!("2isdone");
print(3).await;
println!("3isdone");
};
ifletErr(_)=timeout(Duration::from_millis(150),f).await{
println!("timeout");
}
sleep(Duration::from_millis(300)).await;
println!("exit")
}
asyncfnprint(val:u32){
sleep(Duration::from_millis(100)).await;
println!("valis{}",val);
}
只要花幾分鐘時(shí)間猜測一下上方代碼的輸出結(jié)果,如果和下面的一致,相信你已經(jīng)知道問題出在哪里。
valis1
1isdone
timeout
exit
之后的語句都因?yàn)槌瑫r(shí)而被 runtime 取消執(zhí)行了。
這個(gè)問題其中的原理并不復(fù)雜,但是(對我來說)能夠定位到它并不輕易。在把其他問題都排除掉之后我知道問題就發(fā)生在這里,在這個(gè) .await 上。也許是太多次成功的異步函數(shù)調(diào)用麻痹了注意,亦或是我的心智模型中沒有把這兩點(diǎn)聯(lián)系起來,聯(lián)想到這點(diǎn)著實(shí)費(fèi)了一番心思。
[3]tokio:
https://docs.rs/tokio/latest/tokio/
[4]playground:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=40220605392e951e833a0b45719ed1e1
cancellation
目前為止的內(nèi)容是問題復(fù)盤的標(biāo)準(zhǔn)流程,接下來,讓我們來展開討論一下 cancellation,它是與 runtime 的行為相關(guān)的。
雖然 Rust 中的很多 runtime 都有類似的行為,但是這不是一個(gè)必須的特性,比如這個(gè)自己寫的runtime[5] 就不支持 cancellation。因?yàn)閱栴}發(fā)生在 tokio 上,因此這里會(huì)以它為例,而其他的 runtime 也是類似的。在 tokio 中,可以使用 JoinHandle::abort()[6] 來取消一個(gè) task。task 結(jié)構(gòu)中有一個(gè)“cancel marker bit”來跟蹤一個(gè)任務(wù)是否被取消了。如果它發(fā)現(xiàn)一個(gè) task 被取消了,就會(huì)停止執(zhí)行這個(gè) task。
//Ifthetaskisrunning,wemarkitascancelled.Thethread
//runningthetaskwillnoticethecancelledbitwhenit
//stopspollinganditwillkillthetask.
//
//Theset_notified()callisnotstrictlynecessarybutitwill
//insomecasesletawake_by_refcallreturnwithouthaving
//toperformacompare_exchange.
snapshot.set_notified();
snapshot.set_cancelled();
背后的邏輯也很簡單,就是 runtime 放棄了繼續(xù)輪詢你的 task,就和 ? 差不多。某種程度上可能更棘手一點(diǎn),因?yàn)槲覀儾荒芟?Err 那樣處理這個(gè) cancellation。不過這代表我們需要考慮每一個(gè) .await都有可能隨時(shí)被 cancel 掉嗎?這也太麻煩了。
以本文這個(gè) metadata 更新的情況為例,如果把 cancel 納入考慮范圍,我們需要檢查文件是否和內(nèi)存中的狀態(tài)一致,如果不一致就要回滾持久化的改動(dòng)等等。壞消息是,在某些方面答案是肯定的,runtime 可以對你的 future 做任何事情。不過好在大多數(shù)情況下它們都還是很遵守規(guī)矩的。
[5]試驗(yàn) runtime:
https://github.com/waynexia/texn
[6] JoinHandle::abort():
https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort
Current Solution
explicit detach
現(xiàn)在是否有手段能防止 task 被取消呢?在 tokio 中我們可以通過 drop JoinHandle來 detach 一個(gè)任務(wù)到后臺(tái)。一個(gè) detached task 意味著沒有前臺(tái)的 handle 來控制這個(gè)任務(wù),從某種意義上來說也就使得其他人不能在外面套一層timeout或select,從而間接地使它不會(huì)被取消執(zhí)行。并且開頭提到的問題就是通過這種方式解決的。
JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to joinon it.
不過雖然有辦法能夠?qū)崿F(xiàn)這個(gè)功能,是否像 glommio[7]一樣有一個(gè)顯式的 detach 方法,類似一個(gè)不返回 JoinHandle 的 spawn 方法會(huì)更好。但這些都是瑣碎的事情,一個(gè) runtime 通常不會(huì)完全沒有理由就取消一個(gè) task,并且在大多數(shù)情況下都是出于用戶的要求,只不過有時(shí)候可能沒有注意到,就像 select 中的那些“未選中的分支”或者 tonic 中請求處理的邏輯那樣。所以如果我們確定一個(gè) task 是不能被取消的話,顯式地 detach 可能能預(yù)防某些悲劇的發(fā)生。
Our solution
目前為止所有問題都清晰了,讓我們開始修復(fù)這個(gè) bug 吧!
首先,為什么我們的 future 會(huì)被取消呢?通過函數(shù)調(diào)用鏈路很容易就能發(fā)現(xiàn)整個(gè)處理過程都是在tonic的請求執(zhí)行邏輯中就地執(zhí)行的,而對于一個(gè)網(wǎng)絡(luò)請求來說有一個(gè)超時(shí)行為是很常見的。解決方案也很簡單,就是將服務(wù)器處理邏輯提交到另一個(gè) runtime 中執(zhí)行,從而防止它被取消。只需要幾行代碼[8]就能完成。
@@-30,12+40,24@@implBatchHandler{
}
batch_resp.admins.push(admin_resp);
-fordb_reqinbatch_req.databases{
-forobj_exprindb_req.exprs{
-letobject_resp=self.query_handler.do_query(obj_expr).await?;
-db_resp.results.push(object_resp);
+let(tx,rx)=oneshot::channel();
+letquery_handler=self.query_handler.clone();
+let_=self.runtime.spawn(asyncmove{
+//executerequestinanotherruntimetopreventtheexecutionfrombeingcancelledunexpectedbytonicruntime.
+letmutresult=vec![];
+fordb_reqinbatch_req.databases{
+forobj_exprindb_req.exprs{
+letobject_resp=query_handler.do_query(obj_expr).await;
+
+result.push(object_resp);
+}
}
這個(gè)問題到這里就修復(fù)完了,不過并不是從根本上解決 async cancellation 帶來的 bug,而是采用間接手段去規(guī)避任務(wù)由于超時(shí)而被提前取消的問題,畢竟我們的這些異步邏輯還是需要被完整執(zhí)行的。
但是這樣的處理會(huì)放大另外一些問題,比如我們也無法提前取消掉對于已經(jīng)不用執(zhí)行或資源消耗特別大的任務(wù),從而導(dǎo)致系統(tǒng)資源的浪費(fèi)。這些是我們之后需要持續(xù)改進(jìn)的地方。接下來會(huì)就這方面繼續(xù)展開,從 async 生態(tài)的方面討論有哪些可能能提升 async cancellation 的使用體驗(yàn)。
[7] glommio's:
https://docs.rs/glommio/0.7.0/glommio/struct.Task.html#method.detach
[8] 解決方案代碼:
https://github.com/GreptimeTeam/greptimedb/pull/376/files#diff-9756dcef86f5ba1d60e01e41bf73c65f72039f9aaa057ffd03f3fc2f7dadfbd0R46-R54
Runtime Behavior
-
marker trait
首先,我們自然希望 runtime 不要無條件地取消我的 task,而是嘗試通過類型系統(tǒng)來變得更友好,比如借助類似 CancelSafe 的 marker trait 。對于 cancellation safety 這個(gè)詞,tokio 在它的文檔[9]中有提到:
To determine whether your own methods are cancellation safe, look for the location of uses of.await. This is because when an asynchronous method is cancelled, that always happens at an.await. If your function behaves correctly even if it is restarted while waiting at an.await, then it is cancellation safe.
簡單來說就是用來描述一個(gè) task 是否可以安全地被取消掉,這是 async task 的屬性之一。tokio 維護(hù)了一個(gè)很長的列表,列出了哪些是安全的以及哪些是不安全的??雌饋磉@和UnwindSafe[10]這個(gè) marker trait 很像。兩者都是描述“這種控制流程并不總是被預(yù)料到的”,并且“有可能導(dǎo)致一些微妙的 bug”的這樣一種屬性。
如果有這樣一個(gè) CancelSafe 的 trait,我們就有途徑可以告訴 runtime 我們的異步任務(wù)是否可以安全地被取消掉,同時(shí)也是一種方式讓用戶承諾 cancelling 這個(gè)控制流程是被仔細(xì)處理過的。如果發(fā)現(xiàn)沒有實(shí)現(xiàn)這個(gè) trait,那就意味著我們不希望這個(gè) task 被取消掉,簡單而清晰。以 timeout()為例:
///Themarkertrait
traitCancelSafe{}
///Onlycancellabletaskcanbetimeout-ed
pubfntimeout(duration:Duration,future:F)->Timeoutwhere
F:Future+CancelSafe
{}
-
volunteer cancel
另一個(gè)方式是讓任務(wù)自愿地取消。就像 Kotlin 中的 cooperative cancellation[11] 一樣,它有一個(gè) isActive 方法來檢查一個(gè) task 是否被取消掉。這只是一個(gè)檢測方法,是否要取消完全取決于 task 本身。下面是 Kotlin 文檔中的一個(gè)例子, cooperative cancellation 發(fā)生在第 5 行。這種方式把“隱藏的控制流程”放在了明面上,讓我們能以一種更自然的方式來考慮和處理 cancellation,就像 Option 或 Result 一樣。
valstartTime=System.currentTimeMillis()
valjob=launch(Dispatchers.Default){
varnextPrintTime=startTime
vari=0
while(isActive){//cancellablecomputationloop
//printamessagetwiceasecond
if(System.currentTimeMillis()>=nextPrintTime){
println("job:I'msleeping${i++}...")
nextPrintTime+=500L
}
}
}
delay(1300L)//delayabit
println("main:I'mtiredofwaiting!")
job.cancelAndJoin()//cancelsthejobandwaitsforitscompletion
println("main:NowIcanquit.")
并且我認(rèn)為這也不難實(shí)現(xiàn),Tokio 現(xiàn)在已經(jīng)有了 Cancelled bit[12] 和 CancellationToken,只是看起來和期望的還有點(diǎn)不一樣。最后還是需要 runtime 把 cancellation 的權(quán)利交給 task,否則情況可能沒有什么大的不同。
[9] tokio 文檔:
https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
[10]UnwindSafe:
https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html
[11]cooperative cancellation:
https://kotlinlang.org/docs/cancellation-and-timeouts.html#cancellation-is-cooperative
[12]Cancelledbit:
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/is-active.html
異步取消問題作為 Rust 語言中一個(gè)經(jīng)典問題,暫時(shí)還沒有很完美的解法。如果你正被這個(gè)問題困擾,或者有一些自己的見解,歡迎在評論區(qū)留言。期待隨著社區(qū)生態(tài)的共同努力,很快能共建一個(gè)更優(yōu)的解決方案~
關(guān)于 Greptime
Greptime 格睿科技于2022年創(chuàng)立,目前正在完善和打造時(shí)序性數(shù)據(jù)庫 GreptimeDB 和格睿云 Greptime Cloud 這兩款產(chǎn)品。GreptimeDB 是款用 Rust 語言編寫的時(shí)序數(shù)據(jù)庫。具有分布式,開源,云原生,兼容性強(qiáng)等特點(diǎn),幫助企業(yè)實(shí)時(shí)讀寫,處理和分析時(shí)序數(shù)據(jù)的同時(shí)降低長期存儲(chǔ)的成本。
-
官網(wǎng):https://greptime.com/
-
GitHub:https://github.com/GreptimeTeam/greptimedb
-
文檔:https://docs.greptime.com/
-
Twitter:https://twitter.com/Greptime
-
Slack:https://greptime.com/slack
-
LinkedIn:https://www.linkedin.com/company/greptime/
審核編輯 :李倩
-
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3817瀏覽量
64492 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4333瀏覽量
62721 -
Rust
+關(guān)注
關(guān)注
1文章
229瀏覽量
6619
原文標(biāo)題:看不見的控制流 — Rust 異步取消的幾點(diǎn)思考
文章出處:【微信號:Rust語言中文社區(qū),微信公眾號:Rust語言中文社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論