前言
最近常常需要處理大量的crash數(shù)據(jù),對(duì)這些數(shù)據(jù)進(jìn)行分析,在此之前需要將存量的數(shù)據(jù)導(dǎo)入自己的數(shù)據(jù)庫(kù),開始一天一天的去導(dǎo),發(fā)現(xiàn)太慢了,后來嘗試通過python多線程并行導(dǎo)入多天數(shù)據(jù),以此記錄對(duì)于Python多線程的使用。
進(jìn)程與線程
在介紹Python的多線程之前,我們需要先明確一下線程和進(jìn)程的概念,其實(shí)線程和進(jìn)程是操作系統(tǒng)的基本概念,都是實(shí)現(xiàn)并發(fā)的方式,其二者的區(qū)別可以用一句話概括:進(jìn)程是資源分配的最小單位,而線程是調(diào)度的最小單位。 線程是進(jìn)程的一部分,一個(gè)進(jìn)程含有一個(gè)或多個(gè)線程。
threading的使用
Python提供了threading庫(kù)來實(shí)現(xiàn)多線程,其使用多線程的方式有兩種,一種是直接調(diào)用如下:
import threading
import time
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
threading.Thread(target=say, args=(1,)).start()
?需要注意的是函數(shù)入?yún)⒌膫魅胧峭ㄟ^元組實(shí)現(xiàn)的,如果只有一個(gè)參數(shù),","是不能省略的。
?
除了以上方法,還可以通過繼承threading.Thread來實(shí)現(xiàn),代碼如下。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, index):
threading.Thread.__init__(self) # 必須的步驟
self.index = index
def run(self):
print("thread%s is running" % self.index)
time.sleep(1)
print("thread%s is over" % self.index)
if __name__ == "__main__":
myThread = MyThread(1)
myThread.start()
在threading中提供了很多方法,主要可以分為兩個(gè)部分,一部分是關(guān)于線程信息的函數(shù),一部分是線程對(duì)象的函數(shù)。
線程信息的函數(shù)
函數(shù) | 說明 |
---|---|
threading.active_count() | 活躍線程Thread的數(shù)量 |
threading.current_thread() | 返回當(dāng)前線程的thread對(duì)象 |
threading.enumerate() | 返回當(dāng)前存活線程的列表 |
threading.main_thread() | 返回當(dāng)前主線程的Thread對(duì)象 |
線程對(duì)象Thread的函數(shù)和屬性
函數(shù) | 說明 |
---|---|
Thread.name | 線程名,可相同 |
Thread.ident | 線程標(biāo)識(shí)符,非零整數(shù) |
Thread.Daemon | 是否為守護(hù)線程 |
Thread.is_alive() | 是否存活 |
Thread.start() | 開啟線程,多次調(diào)用會(huì)報(bào)錯(cuò) |
Thread.join(timeout=None) | 等待線程結(jié)束 |
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None) | 構(gòu)造函數(shù) |
Thread.run() | 用來重載 |
線程池
線程可以提高程序的并行性,提高程序執(zhí)行的效率,雖然python的多線程只是一種假象的多線程,但是在一些io密集的程序中還是可以提高執(zhí)行效率,其中的細(xì)節(jié)會(huì)在后面詳細(xì)解釋。在多線程中線程的調(diào)度也會(huì)造成一定的開銷,線程數(shù)量越多,調(diào)度開銷越大,所以我們需要控制線程的數(shù)量,使用join可以在主線程等待子線程執(zhí)行結(jié)束,從而控制線程的數(shù)量。其代碼如下
import threading
import time
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
for i in range(1, 4, 2):
thread1 = threading.Thread(target=say, args=(i,))
thread2 = threading.Thread(target=say, args=(i + 1,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
結(jié)果如下
thread1 is running
thread2 is running
thread1 is over
thread2 is over
thread3 is running
thread4 is running
thread3 is over
thread4 is over
如果不使用join其結(jié)果如下:
thread1 is running
thread2 is running
thread3 is running
thread4 is running
thread1 is over
thread2 is over
thread4 is over
thread3 is over
這時(shí)候是同時(shí)啟動(dòng)了四個(gè)線程
使用join來控制線程數(shù)量雖然可以達(dá)到目的,但是這樣的實(shí)現(xiàn)確實(shí)很不優(yōu)雅,而且線程的創(chuàng)建和銷毀也是很大的開銷,所以針對(duì)一些執(zhí)行頻率高且執(zhí)行時(shí)間短的情況,可以使用線程池,線程池顧名思義就是一個(gè)包含固定數(shù)量線程的池子,線程池里面的線程是可以重復(fù)利用的,執(zhí)行完任務(wù)后不會(huì)立刻銷毀而且返回線程池中等待,如果有任務(wù)則立即執(zhí)行下一個(gè)任務(wù)。
python中的concurrent.futures模塊提供了ThreadPoolExector類來創(chuàng)建線程池,其提供了以下方法:
函數(shù) | 說明 |
---|---|
submit(fn, *args, **kwargs) | 將 fn 函數(shù)提交給線程池。args 代表傳給 fn 函數(shù)的參數(shù),kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。 |
map(func, *iterables, timeout=None, chunksize=1) | 該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。超時(shí)拋出TimeoutError錯(cuò)誤。返回每個(gè)函數(shù)的結(jié)果,注意不是返回future。 |
shutdown(wait=True) | 關(guān)閉線程池。關(guān)閉之后線程池不再接受新任務(wù),但會(huì)將之前提交的任務(wù)完成。 |
有一些函數(shù)的執(zhí)行是有返回值的,將任務(wù)通過submit提交給線程池后,會(huì)返回一個(gè)Future對(duì)象,F(xiàn)uture有以下幾個(gè)方法:
函數(shù) | 說明 |
---|---|
cancel() | 取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會(huì)取消該任務(wù),并返回 True。 |
cancelled() | 如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。 |
running() | 如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。 |
done() | 如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。 |
result(timeout=None) | 獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。超時(shí)拋出TimeoutError,取消拋出CancelledError。 |
exception(timeout=None) | 獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒有異常,則該方法返回 None。 |
add_done_callback(fn) | 為該 Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù),參數(shù)是future。 |
之前的問題可以用線程池,代碼如下
import time
from concurrent.futures import ThreadPoolExecutor
def say(index):
print("thread%s is running" % index)
time.sleep(1)
print("thread%s is over" % index)
if __name__ == "__main__":
params = tuple()
for i in range(1, 11):
params = params + (i,)
pool = ThreadPoolExecutor(max_workers=2)
pool.map(say, params)
線程安全與鎖
正如之前所提到的,線程之間是共享資源的,所以當(dāng)多個(gè)線程同時(shí)訪問或處理同一資源時(shí)會(huì)產(chǎn)生一定的問題,會(huì)造成資源損壞或者與預(yù)期不一致。例如以下程序最后執(zhí)行結(jié)果是157296且每次結(jié)果都不一樣。
import threading
import time
lock = threading.Lock()
def task():
global a
for i in range(100000):
a = a + 1
if i == 50:
time.sleep(1)
if __name__ == "__main__":
global a
a = 0
thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(a)
這時(shí)候就需要用到鎖,是使用之前將資源鎖定,鎖定期間不允許其他線程訪問,使用完之后再釋放鎖。在python的threading模塊中有Lock和RLock兩個(gè)類,它們都有兩個(gè)方法,Lock.acquire(blocking=True, timeout=-1) 獲取鎖。Lock.release() 釋放鎖。其二者的區(qū)別在于RLock是可重入鎖,一個(gè)線程可以多次獲取,主要是為了避免死鎖。一個(gè)簡(jiǎn)單的例子,以下代碼會(huì)死鎖
Lock.acquire()
Lock.acquire()
Lock.release()
Lock.release()
用RLock則不會(huì)死鎖
RLock.acquire()
RLock.acquire()
RLock.release()
RLock.release()
?死鎖(Deadlock)是指兩個(gè)或兩個(gè)以上的線程在執(zhí)行過程中,由于競(jìng)爭(zhēng)資源或者由于彼此通信而造成的一種阻塞的現(xiàn)象,若無外力作用,它們都將無法推進(jìn)下去。此時(shí)稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程稱為死鎖進(jìn)程。
?
以上代碼加鎖后就可以得到想要的結(jié)果了,其代碼如下
import threading
import time
lock = threading.Lock()
def task():
global a
for i in range(100000):
lock.acquire()
a = a + 1
lock.release()
if i == 50:
time.sleep(1)
if __name__ == "__main__":
global a
a = 0
thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(a)
假的多線程
關(guān)于python多線程的簡(jiǎn)單使用已經(jīng)講完了,現(xiàn)在回到之前文中提到的,python的多線程是假的多線程,為什么這么說呢,因?yàn)镻ython中有一個(gè)GIL,GIL的全稱是Global Interpreter Lock(全局解釋器鎖),并且由于GIL鎖存在,python里一個(gè)進(jìn)程永遠(yuǎn)只能同時(shí)執(zhí)行一個(gè)線程(拿到GIL的線程才能執(zhí)行),這就是為什么在多核CPU上,python的多線程效率并不高。對(duì)于計(jì)算密集型的Python多線程并不會(huì)提高執(zhí)行效率,甚至可能因?yàn)榫€程切換開銷過大導(dǎo)致性能還不如單線程。但是對(duì)于IO密集型的任務(wù),Python多線程還是可以提高效率。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7035瀏覽量
89045 -
多線程
+關(guān)注
關(guān)注
0文章
278瀏覽量
19963 -
python
+關(guān)注
關(guān)注
56文章
4797瀏覽量
84694
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論