為了實現(xiàn)對臨界資源的有效管理,應用層的程序有原子變量,條件變量,信號量來控制并發(fā),同樣的問題也存在與驅(qū)動開發(fā)中,比如一個驅(qū)動同時被多個應用層程序調(diào)用,此時驅(qū)動中的全局變量會同時屬于多個應用層進程的進程空間,這種情況下也要使用一些技術(shù)來實現(xiàn)對并發(fā)的控制。本文將討論內(nèi)核中下述并發(fā)控制技術(shù)的技術(shù)特點和應用場景。
中斷屏蔽
原子操作
原子變量操作
原子位操作
自旋鎖
傳統(tǒng)自旋鎖
讀寫自旋鎖
順序鎖
RCU
信號量
傳統(tǒng)信號量
讀寫信號量
完成量
互斥量
中斷屏蔽
顧名思義,就是屏蔽所有的中斷。在嵌入式系統(tǒng),中斷屏蔽可以有三級,1. 硬件接口的屏蔽,2. 硬件GIC的屏蔽,3. CPU(內(nèi)核)的屏蔽。如果在接口處屏蔽了,那么中斷來了就丟了,根本找不到。如果在GIC處屏蔽了,那么在屏蔽期間如果來了irq_1,irq_2,irq_3個中斷,因為只有一個pending標志位,所以最后irq_3來的時候會將pending置位,之后解除屏蔽了,CPU發(fā)現(xiàn)pending有置位,還是會處理,但是1,2就肯定丟了。在ARM處的屏蔽,即內(nèi)核中的屏蔽,看怎么設置了,如果就是local_irq_disable,那么丟了就是丟了,和在接口處屏蔽一樣,如果是local_irq_save就和第二種一樣,追到最后一個中斷,內(nèi)核也有相應的機制進行中斷計數(shù),知道這期間來了多少個中斷,但是實際操作中,大部分情況我們都不會追著執(zhí)行錯過的中斷,除非這個中斷非常重要。
我們這里討論的,就是在內(nèi)核中進行中斷屏蔽。由于內(nèi)核中很多重要的操作都要依賴于中斷,所以屏蔽所有的中斷是十分危險的,里面執(zhí)行的代碼要盡可能的快,而且,由于內(nèi)核的進程調(diào)度也是由中斷驅(qū)動的,所以中斷屏蔽中不能有可能引發(fā)休眠的代碼,否則無法被喚醒。注意,中斷屏蔽只是屏蔽了本CPU的中斷,所以并不能解決SMP引發(fā)的競泰問題,通常,中斷屏蔽要和自旋鎖聯(lián)合使用,用于防止訪問自旋鎖保護的臨界區(qū)時被中斷打斷
普通的中斷屏蔽
local_irq_disable(); //屏蔽中斷//或local_irq_save(flags); //屏蔽中斷并保存目前CPU中的中斷位信息/* 臨界區(qū) */local_irq_enable(); //解除屏蔽//或local_irq_restore(flags); //解除屏蔽并恢復中斷位信息
底半部的中斷屏蔽
local_bh_disable(); //屏蔽中斷/* 臨界區(qū) */local_bh_enable();
原子操作
原子操作即不能被打斷的操作,和應用層的概念一樣,內(nèi)核中的原子操作模板如下:
整型原子變量
//asm/atomic.h//創(chuàng)建并初始化原子變量atomic_t tv = ATOMIC_INIT(初值);//讀原子變量int atomic_read(atomic_t *v);//寫原子變量void atomic_set(atomic_t *v, int i); /** *atomic_dec_and_test - 嘗試將原子變量-1 *v:如果-1之后原子變量變?yōu)?,返回非0, 否則返回0 */int atomic_dec_and_test(volatile atomic_t *v);int atomic_inc_and_test(volatile atomic_t *v);int atomic_sub_and_test(int i, volatile atomic_t *v);//操作并返回int atomic_add_return(int i, atomic *v);int atomic_sub_return(int i, atomic *v);int atomic_inc_return(atomic *v);int atomic_dev_return(atomic *v);
模板
static atomic_t tv;static int demo_open(struct inode *inode, struct file *filp){ if(!atomic_dec_and_test(&tv)){ atomic_inc(&tv); return -EBUSY; } /* 操作代碼 */ return 0;}static int demo_release(struct inode *inode, struct file *filp){ atomic_inc(&tv); return 0;}static int __init demo_init(void){ // init atomic_t atomic_set(&tv, 1);}
位原子操作
位原子操作即原子的位操作,內(nèi)核中大量使用"位"來記錄信息,比如位圖,這些操作都必須是原子性的,內(nèi)核API如下:
//設置位void set_bit(nr,void *addr);//清除位void clear_bit(nr,void *addr);//改變位void change_bit(nr,void *addr);//測試位test_bit(nr, void *addr);//測試并操作位int test_and_set_bit(nr, void *addr);int test_and_clear_bit(nr,void *addr);int test_and_change_bit(nr,void *addr);
自旋鎖
意即"在原地打轉(zhuǎn)",當加鎖不成功時,自旋,自旋鎖會不斷的占用CPU進行變量的測試,由于屬于原子操作,所以該CPU的占用會升為100%,所以,使用自旋鎖時,臨界區(qū)的代碼需要很短,否則會影響系統(tǒng)性能,此外,作為鎖機制的一種,使用自旋鎖同樣需要注意死鎖的出現(xiàn),自旋鎖鎖定期間不能調(diào)用可能引起進程調(diào)度的函數(shù),如果進程獲得自旋鎖之后再阻塞,eg,copy_from_user(),copy_to_user(),kmalloc(),msleep()等,一旦阻塞發(fā)生就可能導致內(nèi)核 崩潰。
自旋鎖可以用來解決SMP競態(tài)問題。不同類型的自旋鎖有自己的處理機制,適用于不同的情況。包括傳統(tǒng)自旋鎖,讀寫自旋鎖,RCU機制,順序鎖等,自旋鎖是信號量,互斥體的的底層實現(xiàn)工具。
比較類型傳統(tǒng)自旋鎖讀寫自旋鎖順序鎖RCU機制應用場合需要上鎖者獨占的資源需要寫者獨占的資源很少同時讀寫的資源讀多寫少的資源讀+讀 并發(fā)×√√√讀+寫 并發(fā)××√√寫+寫 并發(fā)×××√
和其他鎖機制的一樣,使用自旋鎖保護數(shù)據(jù)分為搶鎖-操作-解鎖,下面就是一個典型的使用鎖的流程,通過自旋鎖實現(xiàn)一個文件只被一個進程打開。
int cnt=0;lock_t lock;static int my_open(){ lock(&lock); if(cnt){ unlock(&lock) } cnt++; unlock(&lock);}static int release(){ lock(&lock); cnt--; unlock(&lock);}
傳統(tǒng)自旋鎖
是一種比較粗暴的自旋鎖,使用這種鎖的時候,被鎖定的臨界區(qū)域不允許其他CPU訪問,需要注意的是,盡管獲得鎖之后執(zhí)行的臨界區(qū)操作不會被其他CPU和本CPU內(nèi)其他搶占進程的打擾,但是仍然會被中斷和底半部的影響,所以通常我們會使用下述API中的衍生版本,比如上文中提到的將自旋鎖+中斷屏蔽來防止使用自旋鎖訪問臨界資源的時候被中斷打斷,對應的宏函數(shù)就是spin_lock_irq和spin_lock_irqsave。
//定義并初始化自旋鎖spinlock_t spinlockvoid spin_lock_init(spinlock_t *);//加鎖//spin_lock - 加鎖函數(shù)(忙等)void spin_lock(spinlock_t *lock);int spin_trylock(spinlock_t *lock);spin_lock_irq(); //=spin_lock() + local_irq_disable()spin_lock_irqsave(); //= spin_lock() + lock_irq_save();spin_lock_bh(); //=spin_lock() + local_bh_disable();//解鎖void spin_unlock(spinlock_t *lock);spin_unlock_irq(); //=spin_unlock() + local_irq_enable()spin_unlock_irqrestore(); //= spin_unlock() + lock_irq_restore();spin_unlock_bh(); //=spin_unlock() + local_bh_enable();
讀寫自旋鎖
傳統(tǒng)的自旋鎖粗暴的將臨界資源劃歸給一個CPU,但是很多資源都不會因為讀而被破壞,所以我們可以允許多個CPU同時讀臨界資源,但不允許同時寫資源,類似于應用層的文件鎖,內(nèi)核的讀寫鎖機制同樣有下述互斥原則:
讀者 + 讀者 不互斥
讀者 + 寫者 互斥
寫者 + 寫者 互斥
//include/linux/rwlock.h//定義并初始化自旋鎖rwlock_t rwlock;void rwlock_init(rwlock_t *lock);//加讀鎖void read_lock(rwlock_t *lock);int read_trylock(rwlock_t *lock);void read_lock_irqsave(rwlock_t *lock,unsigned long flags);void read_lock_irq(rwlock_t *lock, unsigned long flags);void read_lock_bh(rwlock_t *lock);//解讀鎖void read_unlock(rwlock_t *lock)void read_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);void read_unlock_irq(rwlock_t *lock, unsigned long flags);void read_unlock_bh(rwlock_t *lock);//加寫鎖void write_lock(rwlock_t *lock)int write_trylock(rwlock_t *lock)void write_lock_irqsave(rwlock_t *lock,unsigned long flags);void write_lock_irq(rwlock_t *lock, unsigned long flags);void write_lock_bh(rwlock_t *lock);//解寫鎖void write_unlock(rwlock_t *lock)void write_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);void write_unlock_irq(rwlock_t *lock, unsigned long flags);void write_unlock_bh(rwlock_t *lock);
順序鎖
順序鎖可以看作讀寫鎖的升級版,讀寫鎖不允許同時存在讀者和寫者,順序鎖對這一情況進行了改良,它允許寫者和讀者同時訪問臨界區(qū),不必再向讀寫鎖那樣讀者要讀必須等待寫者寫完,寫者要寫必須等待讀者讀完。不過,使用順序鎖的時候,臨界區(qū)不能有指針,因為寫者可能會修改指針的指向,如果讀者去讀,就會Oops,此外,如果讀者讀過的數(shù)據(jù)被寫者改變了,讀者需要重新讀,以維持數(shù)據(jù)是最新的,雖然有這兩個約束,但是順序鎖提供了比讀寫鎖更大的靈活性。對于寫者+寫者的情況,順序鎖的機制和讀寫鎖一樣,必須等!
讀者 + 讀者 不互斥
讀者 + 寫者 不互斥 ,?臨界區(qū)沒有指針+讀者需自己注意更新
寫者 + 寫者 互斥
//include/linux/seqlock.h//定義順序鎖struct seqlock_t sl;//獲取順序鎖void write_seqlock(seqlock_t *sl);void write_tryseqlock(seqlock_t *sl);void write_seqlock_irqsave(lock,flags); //=local_irq_save() + write_seqlock()void write_seqlock_irq(seqlock_t *sl); //=local_irq_disable() + write_seqlock()void write_seqlock_bh(seqlock_t *sl); //local_bh_disable() + write_seqlock()//釋放順序鎖void write_sequnlock(seqlock_t *sl);void write_sequnlock_irqsave(lock,flags); //=local_irq_restore() + write_sequnlock()void write_sequnlock_irq(seqlock_t *sl); //=local_irq_enable() + write_sequnlock()void write_sequnlock_bh(seqlock_t *sl); //local_bh_enable() + write_sequnlock()//讀開始unsigned read_seqbegin(const seqlock_t *sl);read_seqbegin_irqsave(lock,flags); //=local_irq_save() + read_seqbegin();//重讀int read_seqretry(const seqlock_t *sl,unsigned iv);read_seqretry_irqrestore(lock,iv,flags); //=local_irq_restore() + read_seqretry();
RCU
RCU即Read-Copy Update,即讀者直接讀,寫者先拷貝再擇時更新,是另外一種讀寫鎖的升級版,這種機制在VFS層被大量使用。正如其名,讀者訪問臨界資源不需要鎖,從下面的rcu_read_lock的定義即可看出,寫者在寫之前先將臨界資源進行備份,去修改這個副本,等所有的CPU都退出對這塊臨界區(qū)的引用后,再通過回調(diào)機制,將引用這塊資源的原指針指向已經(jīng)修改的備份。從中可以看出,在RCU機制下,讀者的開銷大大降低,也沒有順序鎖的指針問題,但是寫者的開銷很大,所以RCU適用于讀多寫少的臨界資源。如果寫操作很多,就有可能將讀操作節(jié)約的性能抵消掉,得不償失。
讀者 + 讀者 不互斥
讀者 + 寫者 不互斥 , 讀者自己注意更新
寫者 + 寫者 不互斥 ,寫者之間自己去同步
內(nèi)核會為每一個CPU維護兩個數(shù)據(jù)結(jié)構(gòu)-rcu_data和rcu_bh_data,他們用于保存回調(diào)函數(shù),函數(shù)call_rcu()把回調(diào)函數(shù)注冊到rcu_data,而call_rcu_bh()則把回調(diào)函數(shù)注冊到rcu_bh_data,在每一個數(shù)據(jù)結(jié)構(gòu)上,回調(diào)函數(shù)們會組成一個隊列。
使用RCU時,讀執(zhí)行單元必須提供一個信號給寫執(zhí)行單元以便寫執(zhí)行單元能夠確定數(shù)據(jù)可以被安全地釋放或修改的時機。內(nèi)核中有一個專門的垃圾收集器來探測讀執(zhí)行單元的信號,一旦所有的讀執(zhí)行單元都已經(jīng)發(fā)送信號告訴收集器自己都沒有使用RCU的數(shù)據(jù)結(jié)構(gòu),收集器就調(diào)用回調(diào)函數(shù)完成最后的數(shù)據(jù)釋放或修改操作。
讀
讀即RCU中的R,從下面的宏定義可以看出,讀操作其實就是禁止內(nèi)核的搶占調(diào)度,并沒有使用一個鎖對象。
//讀鎖定//include/linux/rcupdate.hrcu_read_lock(); //preempt_disbale()rcu_read_lock_bh(); //local_bh_disable()//讀解鎖rcu_read_unlock() //preempt_enable()rcu_read_unlock_bh(); //local_bh_enable()
同步
同步即是RCU寫操作的最后一個步驟-Update,下面這個接口會則色寫執(zhí)行單元,直到所有的讀執(zhí)行單元已經(jīng)完成讀執(zhí)行單元臨界區(qū),寫執(zhí)行單元才可以繼續(xù)下一步操作。如果有多個RCU寫執(zhí)行單元調(diào)用該函數(shù),他們將在一個grace period(即所有的讀執(zhí)行單元已經(jīng)完成對臨界區(qū)的訪問)之后全部被喚醒。
synchrosize_rcu()
掛起回調(diào)
下面這個接口也由RCU寫執(zhí)行單元調(diào)用,它不會使寫執(zhí)行單元阻塞,因而可以在中斷上下文或軟中斷中使用,該函數(shù)把func掛接到RCU回調(diào)函數(shù)鏈上,然后立即返回。函數(shù)sychrosize_rcu()其實也會調(diào)用call_rcu()。
void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
下面這個接口會將軟中斷的完成也當作經(jīng)歷一個quiecent state(靜默狀態(tài)),因此如果寫執(zhí)行單元調(diào)用了該函數(shù),在進程上下文的讀執(zhí)行單元必須使用rcu_read_lock_bh();
void call_rcu_bh(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
RCU機制被大量的運用在內(nèi)核鏈表的讀寫中,下面這些就是內(nèi)核中使用RCU機制保護的數(shù)據(jù)結(jié)構(gòu),函數(shù)的功能和非RCU版本一樣,具體可以參考內(nèi)核文件"include/linux/list.h",只不過這里的操作都會使用RCU保護起來。
void list_add_rcu(struct list_head *new, struct list_head *head);void list_add_tail_rcu(struct list_head *new,struct list_head *head);void list_del_rcu(struct list_head *entry);void list_replace_rcu(struct list_head *old,struct list_head *new);list_for_each_rcu(pos,head);list_for_each_safe_rcu(pos,n,head);list_for_each_entry_rcu(pos,head,member);void hlist_del_rcu(struct hlist_node *n);void hlist_add_head_rcu(struct hlist_node *n, struct hlist_head *h);list_for_each_rcu(pos,head);hlist_for_each_entry_rcu(tpos,pos,head,member);
信號量
自旋鎖一節(jié)提過,如果一個CPU不能獲取臨界資源,就會造成"原地自旋",所以自旋鎖保護的臨界區(qū)的執(zhí)行時間不能太長,但如果我們的確需要保護一段執(zhí)行時間比較長的臨界區(qū)呢?答案就是信號量
,信號量的底層依托于自旋鎖來實現(xiàn)其原子性,并進一步將其提高到"進程"的維度,稱為一種可以運行在進程上下文的"鎖",正是這種能運行在進程上下文的能力賦予了信號量和自旋鎖的諸多不同。
使用信號量,如果試圖獲取信號量的進程獲取失敗,內(nèi)核就會將其調(diào)度為睡眠狀態(tài),執(zhí)行其他進程,避免了CPU的忙等。不過,進程上下文的切換也是有成本的,所以通常,信號量在內(nèi)核中都是只用于保護大塊臨界區(qū)。
此外,一個進程一旦無法獲取期待的信號量就會進入睡眠,所以信號量保護的臨界區(qū)可以有睡眠的代碼。在這方面,自旋鎖保護的區(qū)域是不能睡眠也不能執(zhí)行schedule()的,因為一旦一個搶到了鎖的CPU進行了進程上下文的切換或睡眠,那么其他等待這個自旋鎖的CPU就會一直在那忙等,就永遠無法等到這個鎖,,形成死鎖,除非有其他進程將其喚醒(通常都不會有)。
也正是由于信號量操作可能引起阻塞,所以信號量不能用于中斷上下文??偨Y(jié)一下剛才羅嗦這一段:
項目信號量自旋鎖臨界區(qū)時間進程切換時間更短臨界區(qū)執(zhí)行時間更短進程上下文臨界區(qū)可以睡眠或調(diào)度臨界區(qū)不可以睡眠或調(diào)度中斷上下文只有down_trylock()可以可以
傳統(tǒng)信號量
內(nèi)核的信號量和應用層的信號量的使用方式類似,但沒有獲取信號量這一步驟,因為內(nèi)核中中的信號量可以映射到所有調(diào)用這個模塊的用戶進程的內(nèi)核空間。這些用戶進程也就直接共享了一個信號量,所以也就沒有獲取信號量一說,相關(guān)的內(nèi)容我在"Linux IPC System V 信號量"一文中有所討論。
和應用層的信號量一樣,內(nèi)核信號量也是用于對臨界資源的互斥/順序訪問,同樣,雖然在使用信號量的時候我們可以初始化為任意值,但實際操作上我們通常只初始化為1或0,下述是Linux內(nèi)核提供的信號量API。
//include/linux/semaphore.h//定義并初始化semaphore對象struct semphore sem;//初始化信號量void sem_init(struct semaphore * sem,int val);init_MUTEX(sem);init_MUTEX_LOCKED(sem);DECLARE_MUTEX(sem);DECLARE_MUTEX_LOCKED(sem);//P操作//down()會導致睡眠,不能用于中斷上下文void down(struct semaphore *sem);//down_interruptible同樣會進入休眠,但能被打斷int down_interruptible(struct semaphore *sem);//down_trylock不能獲得鎖時會立即返回,不會睡眠,可以用在中斷上下文int down_trylock(struct semaphore *sem);//V操作void up(struct semaphore *sem);
讀寫信號量
讀寫信號量與信號量的關(guān)系 和 讀寫自旋鎖與自旋鎖的關(guān)系類似,他們的互斥邏輯都是一樣的,這里不再贅述
//定義并初始化讀寫信號量struct rw_semaphore my_rwsem;void init_rwsem(struct rw_semaphore *sem);//P讀信號量void down_read(struct rw_semaphore *sem);int down_read_trylock(struct rw_semaphore *sem);//V讀信號量void up_read(struct rw_semaphore *sem);//P寫信號量void down_write(struct rw_semaphore *sem);int down_write_trylock(struct rw_semaphore *sem);//V寫信號量void up_write(struct rw_semaphore *sem);
模板
struct rw_semaphore my_rwsem;void init_rwsem(&my_rwsem);//讀前獲取讀信號量down_read(&my_rwsem); //若要非阻塞:down_read_trylock(&my_rwsem);/* 讀臨界區(qū) *///讀完釋放讀信號量up_read(&my_rwsem); //寫前獲取寫信號量down_write(&my_rwsem); //若要非阻塞:down_write_trylock(&my_rwsem); /* 寫臨界區(qū) *///寫完釋放寫信號量up_write(&my_rwsem);
完成量
完成量用于一個執(zhí)行單元等待另一個執(zhí)行單元執(zhí)行完某事,和傳統(tǒng)信號量一樣,主要是用來實現(xiàn)隊臨界區(qū)的順序/互斥訪問。但是完成量還提供一種喚醒一個或喚醒所有等待進程的接口,有點類似與應用層的條件變量。
//定義并初始化完成量struct completion my_completion;init_completion(&my_completion);//或DECLARE_COMPLETION(my_completion)//等待completionvoid wait_for_completion(struct completion *c);//喚醒completionvoid complete(struct completion *c); //只喚醒一個等待的執(zhí)行單元void complete_all(struct completion *c); //釋放所有等待該完成量的執(zhí)行單元
互斥體
除了信號量,Linux內(nèi)核還提供了一種專門用于實現(xiàn)互斥的機制-互斥體,相關(guān)的內(nèi)核API如下:
//include/linux/mutex.h//定義并初始化mutex對象struct mutex my_mutex;mutex_init(&my_mutex);//獲取mutexvoid mutex_lock(struct mutex *lock);int mutex_trylock(struct mutex *lock);int mutex_lock_interruptible(struct mutex *lock);//釋放mutexvoid mutex_unlock(struct mutex *lock);
?
評論
查看更多