作為在日常開(kāi)發(fā)生產(chǎn)中非常實(shí)用的語(yǔ)言,有必要掌握一些python用法,比如爬蟲、網(wǎng)絡(luò)請(qǐng)求等場(chǎng)景,很是實(shí)用。但python是單線程的,如何提高python的處理速度,是一個(gè)很重要的問(wèn)題,這個(gè)問(wèn)題的一個(gè)關(guān)鍵技術(shù),叫協(xié)程。本篇文章,講講python協(xié)程的理解與使用,主要是針對(duì)網(wǎng)絡(luò)請(qǐng)求這個(gè)模塊做一個(gè)梳理,希望能幫到有需要的同學(xué)。
概念篇
在理解協(xié)程這個(gè)概念及其作用場(chǎng)景前,先要了解幾個(gè)基本的關(guān)于操作系統(tǒng)的概念,主要是進(jìn)程、線程、同步、異步、阻塞、非阻塞,了解這幾個(gè)概念,不僅是對(duì)協(xié)程這個(gè)場(chǎng)景,諸如消息隊(duì)列、緩存等,都有一定的幫助。接下來(lái),編者就自己的理解和網(wǎng)上查詢的材料,做一個(gè)總結(jié)。
進(jìn)程
在面試的時(shí)候,我們都會(huì)記住一個(gè)概念,進(jìn)程是系統(tǒng)資源分配的最小單位。是的,系統(tǒng)由一個(gè)個(gè)程序,也就是進(jìn)程組成的,一般情況下,分為文本區(qū)域、數(shù)據(jù)區(qū)域和堆棧區(qū)域。
文本區(qū)域存儲(chǔ)處理器執(zhí)行的代碼(機(jī)器碼),通常來(lái)說(shuō),這是一個(gè)只讀區(qū)域,防止運(yùn)行的程序被意外修改。
數(shù)據(jù)區(qū)域存儲(chǔ)所有的變量和動(dòng)態(tài)分配的內(nèi)存,又細(xì)分為初始化的數(shù)據(jù)區(qū)(所有初始化的全局、靜態(tài)、常量,以及外部變量)和為初始化的數(shù)據(jù)區(qū)(初始化為0的全局變量和靜態(tài)變量),初始化的變量最初保存在文本區(qū),程序啟動(dòng)后被拷貝到初始化的數(shù)據(jù)區(qū)。
堆棧區(qū)域存儲(chǔ)著活動(dòng)過(guò)程調(diào)用的指令和本地變量,在地址空間里,棧區(qū)緊連著堆區(qū),他們的增長(zhǎng)方向相反,內(nèi)存是線性的,所以我們代碼放在低地址的地方,由低向高增長(zhǎng),棧區(qū)大小不可預(yù)測(cè),隨開(kāi)隨用,因此放在高地址的地方,由高向低增長(zhǎng)。當(dāng)堆和棧指針重合的時(shí)候,意味著內(nèi)存耗盡,造成內(nèi)存溢出。
進(jìn)程的創(chuàng)建和銷毀都是相對(duì)于系統(tǒng)資源,非常消耗資源,是一種比較昂貴的操作。進(jìn)程為了自身能得到運(yùn)行,必須要搶占式的爭(zhēng)奪CPU。對(duì)于單核CPU來(lái)說(shuō),在同一時(shí)間只能執(zhí)行一個(gè)進(jìn)程的代碼,所以在單核CPU上實(shí)現(xiàn)多進(jìn)程,是通過(guò)CPU快速的切換不同進(jìn)程,看上去就像是多個(gè)進(jìn)程在同時(shí)進(jìn)行。
由于進(jìn)程間是隔離的,各自擁有自己的內(nèi)存內(nèi)存資源,相比于線程的共同共享內(nèi)存來(lái)說(shuō),相對(duì)安全,不同進(jìn)程之間的數(shù)據(jù)只能通過(guò) IPC(Inter-Process Communication) 進(jìn)行通信共享。
線程
線程是CPU調(diào)度的最小單位。如果進(jìn)程是一個(gè)容器,線程就是運(yùn)行在容器里面的程序,線程是屬于進(jìn)程的,同個(gè)進(jìn)程的多個(gè)線程共享進(jìn)程的內(nèi)存地址空間。
線程間的通信可以直接通過(guò)全局變量進(jìn)行通信,所以相對(duì)來(lái)說(shuō),線程間通信是不太安全的,因此引入了各種鎖的場(chǎng)景,不在這里闡述。
當(dāng)一個(gè)線程崩潰了,會(huì)導(dǎo)致整個(gè)進(jìn)程也崩潰了,即其他線程也掛了, 但多進(jìn)程而不會(huì),一個(gè)進(jìn)程掛了,另一個(gè)進(jìn)程依然照樣運(yùn)行。
在多核操作系統(tǒng)中,默認(rèn)進(jìn)程內(nèi)只有一個(gè)線程,所以對(duì)多進(jìn)程的處理就像是一個(gè)進(jìn)程一個(gè)核心。
同步和異步
同步和異步關(guān)注的是消息通信機(jī)制,所謂同步,就是在發(fā)出一個(gè)函數(shù)調(diào)用時(shí),在沒(méi)有得到結(jié)果之前,該調(diào)用不會(huì)返回。一旦調(diào)用返回,就立即得到執(zhí)行的返回值,即調(diào)用者主動(dòng)等待調(diào)用結(jié)果。所謂異步,就是在請(qǐng)求發(fā)出去后,這個(gè)調(diào)用就立即返回,沒(méi)有返回結(jié)果,通過(guò)回調(diào)等方式告知該調(diào)用的實(shí)際結(jié)果。
同步的請(qǐng)求,需要主動(dòng)讀寫數(shù)據(jù),并且等待結(jié)果;異步的請(qǐng)求,調(diào)用者不會(huì)立刻得到結(jié)果。而是在調(diào)用發(fā)出后,被調(diào)用者通過(guò)狀態(tài)、通知來(lái)通知調(diào)用者,或通過(guò)回調(diào)函數(shù)處理這個(gè)調(diào)用。
阻塞和非阻塞
關(guān)注的是程序在等待調(diào)用結(jié)果(消息,返回值)時(shí)的狀態(tài)。
阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當(dāng)前線程會(huì)被掛起。調(diào)用線程只有在得到結(jié)果之后才會(huì)返回。非阻塞調(diào)用指在不能立刻得到結(jié)果之前,該調(diào)用不會(huì)阻塞當(dāng)前線程。所以,區(qū)分的條件在于,進(jìn)程/線程要訪問(wèn)的數(shù)據(jù)是否就緒,進(jìn)程/線程是否需要等待。
非阻塞一般通過(guò)多路復(fù)用實(shí)現(xiàn),多路復(fù)用有 select、poll、epoll幾種實(shí)現(xiàn)方式。
協(xié)程
在了解前面的幾個(gè)概念后,我們?cè)賮?lái)看協(xié)程的概念。
協(xié)程是屬于線程的,又稱微線程,纖程,英文名Coroutine。舉個(gè)例子,在執(zhí)行函數(shù)A時(shí),我希望隨時(shí)中斷去執(zhí)行函數(shù)B,然后中斷B的執(zhí)行,切換回來(lái)執(zhí)行A。這就是協(xié)程的作用,由調(diào)用者自由切換。這個(gè)切換過(guò)程并不是等同于函數(shù)調(diào)用,因?yàn)樗鼪](méi)有調(diào)用語(yǔ)句。執(zhí)行方式與多線程類似,但是協(xié)程只有一個(gè)線程執(zhí)行。
協(xié)程的優(yōu)點(diǎn)是執(zhí)行效率非常高,因?yàn)閰f(xié)程的切換由程序自身控制,不需要切換線程,即沒(méi)有切換線程的開(kāi)銷。同時(shí),由于只有一個(gè)線程,不存在沖突問(wèn)題,不需要依賴鎖(加鎖與釋放鎖存在很多資源消耗)。
協(xié)程主要的使用場(chǎng)景在于處理IO密集型程序,解決效率問(wèn)題,不適用于CPU密集型程序的處理。然而實(shí)際場(chǎng)景中這兩種場(chǎng)景非常多,如果要充分發(fā)揮CPU利用率,可以結(jié)合多進(jìn)程+協(xié)程的方式。后續(xù)我們會(huì)講到結(jié)合點(diǎn)。
原理篇
根據(jù)wikipedia的定義,協(xié)程是一個(gè)無(wú)優(yōu)先級(jí)的子程序調(diào)度組件,允許子程序在特點(diǎn)的地方掛起恢復(fù)。所以理論上,只要內(nèi)存足夠,一個(gè)線程中可以有任意多個(gè)協(xié)程,但同一時(shí)刻只能有一個(gè)協(xié)程在運(yùn)行,多個(gè)協(xié)程分享該線程分配到的計(jì)算機(jī)資源。協(xié)程是為了充分發(fā)揮異步調(diào)用的優(yōu)勢(shì),異步操作則是為了避免IO操作阻塞線程。
知識(shí)準(zhǔn)備
在了解原理前,我們先做一個(gè)知識(shí)的準(zhǔn)備工作。
1)現(xiàn)代主流的操作系統(tǒng)幾乎都是分時(shí)操作系統(tǒng),即一臺(tái)計(jì)算機(jī)采用時(shí)間片輪轉(zhuǎn)的方式為多個(gè)用戶服務(wù),系統(tǒng)資源分配的基本單位是進(jìn)程,CPU調(diào)度的基本單位是線程。
2)運(yùn)行時(shí)內(nèi)存空間分為變量區(qū),棧區(qū),堆區(qū)。內(nèi)存地址分配上,堆區(qū)從低地到高,棧區(qū)從高往低。
3)計(jì)算機(jī)執(zhí)行時(shí)一條條指令讀取執(zhí)行,執(zhí)行到當(dāng)前指令時(shí),下一條指令的地址在指令寄存器的IP中,ESP寄存值指向當(dāng)前棧頂?shù)刂?,EBP指向當(dāng)前活動(dòng)棧幀的基地址。
4)系統(tǒng)發(fā)生函數(shù)調(diào)用時(shí)操作為:先將入?yún)挠彝笠来螇簵?,然后把返回地址壓棧,最后將?dāng)前EBP寄存器的值壓棧,修改ESP寄存器的值,在棧區(qū)分配當(dāng)前函數(shù)局部變量所需的空間。
5)協(xié)程的上下文包含屬于當(dāng)前協(xié)程的棧區(qū)和寄存器里面存放的值。
事件循環(huán)
在python3.3中,通過(guò)關(guān)鍵字yield from使用協(xié)程,在3.5中,引入了關(guān)于協(xié)程的語(yǔ)法糖async和await,我們主要看async/await的原理解析。其中,事件循環(huán)是一個(gè)核心所在,編寫過(guò) js的同學(xué),會(huì)對(duì)事件循環(huán)Eventloop更加了解, 事件循環(huán)是一種等待程序分配事件或消息的編程架構(gòu)(維基百科)。在python中,asyncio.coroutine 修飾器用來(lái)標(biāo)記作為協(xié)程的函數(shù), 這里的協(xié)程是和asyncio及其事件循環(huán)一起使用的,而在后續(xù)的發(fā)展中,async/await被使用的越來(lái)越廣泛。
async/await
async/await是使用python協(xié)程的關(guān)鍵,從結(jié)構(gòu)上來(lái)看,asyncio 實(shí)質(zhì)上是一個(gè)異步框架,async/await 是為異步框架提供的 API已方便使用者調(diào)用,所以使用者要想使用async/await 編寫協(xié)程代碼,目前必須機(jī)遇 asyncio 或其他異步庫(kù)。
Future
在實(shí)際開(kāi)發(fā)編寫異步代碼時(shí),為了避免太多的回調(diào)方法導(dǎo)致的回調(diào)地獄,但又需要獲取異步調(diào)用的返回結(jié)果結(jié)果,聰明的語(yǔ)言設(shè)計(jì)者設(shè)計(jì)了一個(gè) 叫Future的對(duì)象,封裝了與loop 的交互行為。其大致執(zhí)行過(guò)程為:程序啟動(dòng)后,通過(guò)add_done_callback 方法向 epoll 注冊(cè)回調(diào)函數(shù),當(dāng) result 屬性得到返回值后,主動(dòng)運(yùn)行之前注冊(cè)的回調(diào)函數(shù),向上傳遞給 coroutine。這個(gè)Future對(duì)象為asyncio.Future。
但是,要想取得返回值,程序必須恢復(fù)恢復(fù)工作狀態(tài),而由于Future 對(duì)象本身的生存周期比較短,每一次注冊(cè)回調(diào)、產(chǎn)生事件、觸發(fā)回調(diào)過(guò)程后工作可能已經(jīng)完成,所以用 Future 向生成器 send result 并不合適。所以這里又引入一個(gè)新的對(duì)象 Task,保存在Future 對(duì)象中,對(duì)生成器協(xié)程進(jìn)行狀態(tài)管理。
Python 里另一個(gè) Future 對(duì)象是 concurrent.futures.Future,與 asyncio.Future 互不兼容,容易產(chǎn)生混淆。區(qū)別點(diǎn)在于,concurrent.futures 是線程級(jí)的 Future 對(duì)象,當(dāng)使用 concurrent.futures.Executor 進(jìn)行多線程編程時(shí),該對(duì)象用于在不同的 thread 之間傳遞結(jié)果。
Task
上文中提到,Task是維護(hù)生成器協(xié)程狀態(tài)處理執(zhí)行邏輯的的任務(wù)對(duì)象,Task 中有一個(gè)_step 方法,負(fù)責(zé)生成器協(xié)程與 EventLoop 交互過(guò)程的狀態(tài)遷移,整個(gè)過(guò)程可以理解為:Task向協(xié)程 send 一個(gè)值,恢復(fù)其工作狀態(tài)。當(dāng)協(xié)程運(yùn)行到斷點(diǎn)后,得到新的Future對(duì)象,再處理 future 與 loop 的回調(diào)注冊(cè)過(guò)程。
Loop
在日常開(kāi)發(fā)中,會(huì)有一個(gè)誤區(qū),認(rèn)為每個(gè)線程都可以有一個(gè)獨(dú)立的 loop。實(shí)際運(yùn)行時(shí),主線程才能通過(guò) asyncio.get_event_loop() 創(chuàng)建一個(gè)新的 loop,而在其他線程時(shí),使用 get_event_loop() 卻會(huì)拋錯(cuò)。正確的做法為通過(guò) asyncio.set_event_loop() ,將當(dāng)前線程與 主線程的loop 顯式綁定。
Loop有一個(gè)很大的缺陷,就是 loop 的運(yùn)行狀態(tài)不受 Python 代碼控制,所以在業(yè)務(wù)處理中,無(wú)法穩(wěn)定的將協(xié)程拓展到多線程中運(yùn)行。
總結(jié)
實(shí)戰(zhàn)篇
介紹完概念和原理,我來(lái)看看如何使用,這里,舉一個(gè)實(shí)際場(chǎng)景的例子,來(lái)看看如何使用python的協(xié)程。
場(chǎng)景
外部接收一些文件,每個(gè)文件里有一組數(shù)據(jù),其中,這組數(shù)據(jù)需要通過(guò)http的方式,發(fā)向第三方平臺(tái),并獲得結(jié)果。
分析
由于同一個(gè)文件的每一組數(shù)據(jù)沒(méi)有前后的處理邏輯,在之前通過(guò)Requests庫(kù)發(fā)送的網(wǎng)絡(luò)請(qǐng)求,串行執(zhí)行,下一組數(shù)據(jù)的發(fā)送需要等待上一組數(shù)據(jù)的返回,顯得整個(gè)文件的處理時(shí)間長(zhǎng),這種請(qǐng)求方式,完全可以由協(xié)程來(lái)實(shí)現(xiàn)。
為了更方便的配合協(xié)程發(fā)請(qǐng)求,我們使用aiohttp庫(kù)來(lái)代替requests庫(kù),關(guān)于aiohttp,這里不做過(guò)多剖析,僅做下簡(jiǎn)單介紹。
aiohttp
aiohttp是asyncio和Python的異步HTTP客戶端/服務(wù)器,由于是異步的,經(jīng)常用在服務(wù)區(qū)端接收請(qǐng)求,和客戶端爬蟲應(yīng)用,發(fā)起異步請(qǐng)求,這里我們主要用來(lái)發(fā)請(qǐng)求。
aiohttp支持客戶端和HTTP服務(wù)器,可以實(shí)現(xiàn)單線程并發(fā)IO操作,無(wú)需使用Callback Hell即可支持Server WebSockets和Client WebSockets,且具有中間件。
代碼實(shí)現(xiàn)
直接上代碼了,talk is cheap, show me the code~
import aiohttp
import asyncio
from inspect import isfunction
import time
import logger
@logging_utils.exception(logger)
def request(pool, data_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(exec(pool, data_list))
async def exec(pool, data_list):
tasks = []
sem = asyncio.Semaphore(pool)
tasks.append(
control_sem(sem,
item.get(“method”, “GET”),
item.get(“url”),
item.get(“data”),
item.get(“headers”),
item.get(“callback”)))
await asyncio.wait(tasks)
async def control_sem(sem, method, url, data, headers, callback):
async with sem:
count = 0
flag = False
while not flag and count 《 4:
flag = await fetch(method, url, data, headers, callback)
count = count + 1
print(“flag:{},count:{}”.format(flag, count))
if count == 4 and not flag:
raise Exception(‘EAS service not responding after 4 times of retry.’)
async def fetch(method, url, data, headers, callback):
async with aiohttp.request(method, url=url, data=data, headers=headers) as resp:
try:
json = await resp.read()
print(json)
if resp.status != 200:
return False
if isfunction(callback):
callback(json)
return True
except Exception as e:
print(e)
這里,我們封裝了對(duì)外發(fā)送批量請(qǐng)求的request方法,接收一次性發(fā)送的數(shù)據(jù)多少,和數(shù)據(jù)綜合,在外部使用時(shí),只需要構(gòu)建好網(wǎng)絡(luò)請(qǐng)求對(duì)象的數(shù)據(jù),設(shè)定好請(qǐng)求池大小即可,同時(shí),設(shè)置了重試功能,進(jìn)行了4次重試,防止在網(wǎng)絡(luò)抖動(dòng)的時(shí)候,單個(gè)數(shù)據(jù)的網(wǎng)絡(luò)請(qǐng)求發(fā)送失敗。
最終效果
在使用協(xié)程重構(gòu)網(wǎng)絡(luò)請(qǐng)求模塊之后,當(dāng)數(shù)據(jù)量在1000的時(shí)候,由之前的816s,提升到424s,快了一倍,且請(qǐng)求池大小加大的時(shí)候,效果更明顯,由于第三方平臺(tái)同時(shí)建立連接的數(shù)據(jù)限制,我們?cè)O(shè)定了40的閥值??梢钥吹?,優(yōu)化的程度很顯著。
責(zé)任編輯:ct
評(píng)論
查看更多