作為一個(gè)分布式異步計(jì)算框架,Celery雖然常用于Web框架中,但也可以單獨(dú)使用。 雖然常規(guī)搭配的消息隊(duì)列是RabbitMQ,但是由于某些情況下系統(tǒng)已經(jīng)包含了Redis,那就可以復(fù)用。
以下撇開Web框架,介紹基于Redis配置Celery任務(wù)的方法。
pip install celery[redis]
項(xiàng)目結(jié)構(gòu)
其中,main.py是觸發(fā)Task的業(yè)務(wù)代碼。當(dāng)然,文件名可以隨意改。celery.py是Celery的app定義的位置,tasks.py是Task定義的位置,文件名不建議修改。
配置Celery
在celery.py中寫入如下代碼:
其中,REDIS_URL從同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。這里既用Redis來當(dāng)broker,又用來當(dāng)backend。即,既當(dāng)消息隊(duì)列,又當(dāng)結(jié)果反饋的數(shù)據(jù)庫(默認(rèn)僅保存1天)。
在include=,需要填一個(gè)下游worker的包名列表。這里選擇了同一個(gè)包的tasks.py文件。
額外設(shè)置的task_track_started,是命令Worker反饋STARTED狀態(tài)。默認(rèn)情況下,是無法知道任務(wù)什么時(shí)候開始執(zhí)行的。
編寫任務(wù)并調(diào)用
在tasks.py文件中,添加異步任務(wù)的實(shí)現(xiàn)。
在需要發(fā)起任務(wù)的地方,用.apply_async可以觸發(fā)異步調(diào)用。即,實(shí)際只是向消息隊(duì)列發(fā)送消息,真正的執(zhí)行操作在遠(yuǎn)程。
運(yùn)行Worker:
celery -A your_project worker
運(yùn)行原理
一次Task從觸發(fā)到完成,序列圖如下:
其中,main代表業(yè)務(wù)代碼主進(jìn)程。它可能是Django、Flask這類Web服務(wù),也可能是一個(gè)其它類型的進(jìn)程。worker就是指Celery的Worker。
main發(fā)送消息后,會得到一個(gè)AsyncResult,其中包含task_id。僅通過task_id,也可以自己構(gòu)造一個(gè)AsyncResult,查詢相關(guān)信息。其中,代表運(yùn)行過程的,主要是state。
worker會持續(xù)保持對Redis(或其它消息隊(duì)列,如RabbitMQ)的關(guān)注,查詢新的消息。如果獲得新消息,將其消費(fèi)后,開始運(yùn)行do_sth。運(yùn)行完成會把返回值對應(yīng)的結(jié)果,以及一些運(yùn)行信息,回寫到Redis(或其它backend,如Django數(shù)據(jù)庫等)上。在系統(tǒng)的任何地方,通過對應(yīng)的AsyncResult(task_id)就可以查詢到結(jié)果。
Celery Task的狀態(tài)
以下是狀態(tài)圖:
其中,除SUCCESS外,還有失?。‵AILURE)、取消(REVOKED)兩個(gè)結(jié)束狀態(tài)。而RETRY則是在設(shè)置了重試機(jī)制后,進(jìn)入的臨時(shí)等待狀態(tài)。
另外,如果保存在Redis的結(jié)果信息被清理(默認(rèn)僅保存1天),那么任務(wù)狀態(tài)又會變成PENDING。這在設(shè)計(jì)上是個(gè)巨大的問題,使用時(shí)要做對應(yīng)容錯(cuò)。
常見控制操作
有時(shí),在業(yè)務(wù)主進(jìn)程中需要等待異步運(yùn)行的結(jié)果,這時(shí)需要使用wait。如果要取消一個(gè)排隊(duì)中、或已執(zhí)行的任務(wù),則可以使用revoke。即使任務(wù)已經(jīng)執(zhí)行完成,也可以使用revoke,但不會有任何變化。如果需要提前刪除任務(wù)記錄,可以使用forget。
責(zé)編AJX
-
Web
+關(guān)注
關(guān)注
2文章
1263瀏覽量
69515 -
分布式
+關(guān)注
關(guān)注
1文章
903瀏覽量
74542 -
Redis
+關(guān)注
關(guān)注
0文章
376瀏覽量
10882
發(fā)布評論請先 登錄
相關(guān)推薦
評論