作者簡(jiǎn)介:余華兵,在網(wǎng)絡(luò)通信行業(yè)工作十多年,負(fù)責(zé)IPv4協(xié)議棧、IPv6協(xié)議棧和Linux內(nèi)核。在工作中看著2.6版本的專業(yè)書籍維護(hù)3.x和4.x版本的Linux內(nèi)核,感覺不方便,于是自己分析4.x版本的Linux內(nèi)核整理出一本書,書名叫《Linux內(nèi)核深度解析》,2019年5月出版,希望對(duì)同行有幫助。
自旋鎖用于處理器之間的互斥,適合保護(hù)很短的臨界區(qū),并且不允許在臨界區(qū)睡眠。申請(qǐng)自旋鎖的時(shí)候,如果自旋鎖被其他處理器占有,本處理器自旋等待(也稱為忙等待)。
進(jìn)程、軟中斷和硬中斷都可以使用自旋鎖。
自旋鎖的實(shí)現(xiàn)經(jīng)歷了3個(gè)階段:
(1) 最早的自旋鎖是無序競(jìng)爭(zhēng)的,不保證先申請(qǐng)的進(jìn)程先獲得鎖。
(2) 第2個(gè)階段是入場(chǎng)券自旋鎖,進(jìn)程按照申請(qǐng)鎖的順序排隊(duì),先申請(qǐng)的進(jìn)程先獲得鎖。
(3) 第3個(gè)階段是MCS自旋鎖。入場(chǎng)券自旋鎖存在性能問題:所有申請(qǐng)鎖的處理器在同一個(gè)變量上自旋等待,緩存同步的開銷大,不適合處理器很多的系統(tǒng)。MCS自旋鎖的策略是為每個(gè)處理器創(chuàng)建一個(gè)變量副本,每個(gè)處理器在自己的本地變量上自旋等待,解決了性能問題。
入場(chǎng)券自旋鎖和MCS自旋鎖都屬于排隊(duì)自旋鎖(queued spinlock),進(jìn)程按照申請(qǐng)鎖的順序排隊(duì),先申請(qǐng)的進(jìn)程先獲得鎖。
1. 數(shù)據(jù)結(jié)構(gòu)
自旋鎖的定義如下:
include/linux/spinlock_types.h
typedef struct spinlock {
union {
struct raw_spinlock rlock;
…
};
} spinlock_t;
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
…
} raw_spinlock_t;
可以看到,數(shù)據(jù)類型spinlock對(duì)raw_spinlock做了封裝,然后數(shù)據(jù)類型raw_spinlock對(duì)arch_spinlock_t做了封裝,各種處理器架構(gòu)需要自定義數(shù)據(jù)類型arch_spinlock_t。
spinlock和raw_spinlock(原始自旋鎖)有什么關(guān)系?
Linux內(nèi)核有一個(gè)實(shí)時(shí)內(nèi)核分支(開啟配置宏CONFIG_PREEMPT_RT)來支持硬實(shí)時(shí)特性,內(nèi)核主線只支持軟實(shí)時(shí)。
對(duì)于沒有打上實(shí)時(shí)內(nèi)核補(bǔ)丁的內(nèi)核,spinlock只是封裝raw_spinlock,它們完全一樣。如果打上實(shí)時(shí)內(nèi)核補(bǔ)丁,那么spinlock使用實(shí)時(shí)互斥鎖保護(hù)臨界區(qū),在臨界區(qū)內(nèi)可以被搶占和睡眠,但raw_spinlock還是自旋鎖。
目前主線版本還沒有合并實(shí)時(shí)內(nèi)核補(bǔ)丁,說不定哪天就會(huì)合并進(jìn)來,為了使代碼可以兼容實(shí)時(shí)內(nèi)核,最好堅(jiān)持3個(gè)原則:
(1)盡可能使用spinlock。
(2)絕對(duì)不允許被搶占和睡眠的地方,使用raw_spinlock,否則使用spinlock。
(3)如果臨界區(qū)足夠小,使用raw_spinlock。
2. 使用方法
定義并且初始化靜態(tài)自旋鎖的方法是:
DEFINE_SPINLOCK(x);
在運(yùn)行時(shí)動(dòng)態(tài)初始化自旋鎖的方法是:
spin_lock_init(x);
申請(qǐng)自旋鎖的函數(shù)是:
(1)void spin_lock(spinlock_t *lock);
申請(qǐng)自旋鎖,如果鎖被其他處理器占有,當(dāng)前處理器自旋等待。
(2)void spin_lock_bh(spinlock_t *lock);
申請(qǐng)自旋鎖,并且禁止當(dāng)前處理器的軟中斷。
(3)void spin_lock_irq(spinlock_t *lock);
申請(qǐng)自旋鎖,并且禁止當(dāng)前處理器的硬中斷。
(4)spin_lock_irqsave(lock, flags);
申請(qǐng)自旋鎖,保存當(dāng)前處理器的硬中斷狀態(tài),并且禁止當(dāng)前處理器的硬中斷。
(5)int spin_trylock(spinlock_t *lock);
申請(qǐng)自旋鎖,如果申請(qǐng)成功,返回1;如果鎖被其他處理器占有,當(dāng)前處理器不等待,立即返回0。
釋放自旋鎖的函數(shù)是:
(1)void spin_unlock(spinlock_t *lock);
(2)void spin_unlock_bh(spinlock_t *lock);
釋放自旋鎖,并且開啟當(dāng)前處理器的軟中斷。
(3)void spin_unlock_irq(spinlock_t *lock);
釋放自旋鎖,并且開啟當(dāng)前處理器的硬中斷。
(4)void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
釋放自旋鎖,并且恢復(fù)當(dāng)前處理器的硬中斷狀態(tài)。
定義并且初始化靜態(tài)原始自旋鎖的方法是:
DEFINE_RAW_SPINLOCK(x);
在運(yùn)行時(shí)動(dòng)態(tài)初始化原始自旋鎖的方法是:
raw_spin_lock_init (x);
申請(qǐng)?jiān)甲孕i的函數(shù)是:
(1)raw_spin_lock(lock)
申請(qǐng)?jiān)甲孕i,如果鎖被其他處理器占有,當(dāng)前處理器自旋等待。
(2)raw_spin_lock_bh(lock)
申請(qǐng)?jiān)甲孕i,并且禁止當(dāng)前處理器的軟中斷。
(3)raw_spin_lock_irq(lock)
申請(qǐng)?jiān)甲孕i,并且禁止當(dāng)前處理器的硬中斷。
(4)raw_spin_lock_irqsave(lock, flags)
申請(qǐng)?jiān)甲孕i,保存當(dāng)前處理器的硬中斷狀態(tài),并且禁止當(dāng)前處理器的硬中斷。
(5)raw_spin_trylock(lock)
申請(qǐng)?jiān)甲孕i,如果申請(qǐng)成功,返回1;如果鎖被其他處理器占有,當(dāng)前處理器不等待,立即返回0。
釋放原始自旋鎖的函數(shù)是:
(1)raw_spin_unlock(lock)
(2)raw_spin_unlock_bh(lock)
釋放原始自旋鎖,并且開啟當(dāng)前處理器的軟中斷。
(3)raw_spin_unlock_irq(lock)
釋放原始自旋鎖,并且開啟當(dāng)前處理器的硬中斷。
(4)raw_spin_unlock_irqrestore(lock, flags)
釋放原始自旋鎖,并且恢復(fù)當(dāng)前處理器的硬中斷狀態(tài)。
3. 入場(chǎng)券自旋鎖
入場(chǎng)券自旋鎖(ticket spinlock)的算法類似于銀行柜臺(tái)的排隊(duì)叫號(hào):
(1)鎖擁有排隊(duì)號(hào)和服務(wù)號(hào),服務(wù)號(hào)是當(dāng)前占有鎖的進(jìn)程的排隊(duì)號(hào)。
(2)每個(gè)進(jìn)程申請(qǐng)鎖的時(shí)候,首先申請(qǐng)一個(gè)排隊(duì)號(hào),然后輪詢鎖的服務(wù)號(hào)是否等于自己的排隊(duì)號(hào),如果等于,表示自己占有鎖,可以進(jìn)入臨界區(qū),否則繼續(xù)輪詢。
(3)當(dāng)進(jìn)程釋放鎖時(shí),把服務(wù)號(hào)加一,下一個(gè)進(jìn)程看到服務(wù)號(hào)等于自己的排隊(duì)號(hào),退出自旋,進(jìn)入臨界區(qū)。
ARM64架構(gòu)定義的數(shù)據(jù)類型arch_spinlock_t如下所示:
arch/arm64/include/asm/spinlock_types.h
typedef struct {
#ifdef __AARCH64EB__ /* 大端字節(jié)序(高位存放在低地址) */
u16 next;
u16 owner;
#else /* 小端字節(jié)序(低位存放在低地址) */
u16 owner;
u16 next;
#endif
} __aligned(4) arch_spinlock_t;
成員next是排隊(duì)號(hào),成員owner是服務(wù)號(hào)。
在多處理器系統(tǒng)中,函數(shù)spin_lock()負(fù)責(zé)申請(qǐng)自旋鎖,ARM64架構(gòu)的代碼如下所示:
spin_lock() -》 raw_spin_lock() -》 _raw_spin_lock() -》 __raw_spin_lock() -》 do_raw_spin_lock() -》 arch_spin_lock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch_spin_lock(arch_spinlock_t *lock)
2 {
3 unsigned int tmp;
4 arch_spinlock_t lockval, newval;
5
6 asm volatile(
7 ARM64_LSE_ATOMIC_INSN(
8 /* LL/SC */
9 “ prfm pstl1strm, %3\n”
10 “1: ldaxr %w0, %3\n”
11 “ add %w1, %w0, %w5\n”
12 “ stxr %w2, %w1, %3\n”
13 “ cbnz %w2, 1b\n”,
14 /* 大系統(tǒng)擴(kuò)展的原子指令 */
15 “ mov %w2, %w5\n”
16 “ ldadda %w2, %w0, %3\n”
17 __nops(3)
18 )
19
20 /* 我們得到鎖了嗎?*/
21 “ eor %w1, %w0, %w0, ror #16\n”
22 “ cbz %w1, 3f\n”
23 “ sevl\n”
24 “2: wfe\n”
25 “ ldaxrh %w2, %4\n”
26 “ eor %w1, %w2, %w0, lsr #16\n”
27 “ cbnz %w1, 2b\n”
28 /* 得到鎖,臨界區(qū)從這里開始*/
29 “3:”
30 : “=&r” (lockval), “=&r” (newval), “=&r” (tmp), “+Q” (*lock)
31 : “Q” (lock-》owner), “I” (1 《《 TICKET_SHIFT)
32 : “memory”);
33 }
第6~18行代碼,申請(qǐng)排隊(duì)號(hào),然后把自旋鎖的排隊(duì)號(hào)加1,這是一個(gè)原子操作,有兩種實(shí)現(xiàn)方法:
1)第9~13行代碼,使用指令ldaxr(帶有獲取語義的獨(dú)占加載)和stxr(獨(dú)占存儲(chǔ))實(shí)現(xiàn),指令ldaxr帶有獲取語義,后面的加載/存儲(chǔ)指令必須在指令ldaxr完成之后開始執(zhí)行。
2)第15~16行代碼,如果處理器支持大系統(tǒng)擴(kuò)展,那么使用帶有獲取語義的原子加法指令ldadda實(shí)現(xiàn),指令ldadda帶有獲取語義,后面的加載/存儲(chǔ)指令必須在指令ldadda完成之后開始執(zhí)行。
第21~22行代碼,如果服務(wù)號(hào)等于當(dāng)前進(jìn)程的排隊(duì)號(hào),進(jìn)入臨界區(qū)。
第24~27行代碼,如果服務(wù)號(hào)不等于當(dāng)前進(jìn)程的排隊(duì)號(hào),那么自旋等待。使用指令ldaxrh(帶有獲取語義的獨(dú)占加載,h表示halfword,即2字節(jié))讀取服務(wù)號(hào),指令ldaxrh帶有獲取語義,后面的加載/存儲(chǔ)指令必須在指令ldaxrh完成之后開始執(zhí)行。
第23行代碼,sevl(send event local)指令的功能是發(fā)送一個(gè)本地事件,避免錯(cuò)過其他處理器釋放自旋鎖時(shí)發(fā)送的事件。
第24行代碼,wfe(wait for event)指令的功能是使處理器進(jìn)入低功耗狀態(tài),等待事件。
函數(shù)spin_unlock()負(fù)責(zé)釋放自旋鎖,ARM64架構(gòu)的代碼如下所示:
spin_unlock() -》 raw_spin_unlock() -》 _raw_spin_unlock() -》 __raw_spin_unlock() -》 do_raw_spin_unlock() -》 arch_spin_unlock()
arch/arm64/include/asm/spinlock.h
1 static inline void arch_spin_unlock(arch_spinlock_t *lock)
2 {
3 unsigned long tmp;
4
5 asm volatile(ARM64_LSE_ATOMIC_INSN(
6 /* LL/SC */
7 “ ldrh %w1, %0\n”
8 “ add %w1, %w1, #1\n”
9 “ stlrh %w1, %0”,
10 /* 大多統(tǒng)擴(kuò)展的原子指令 */
11 “ mov %w1, #1\n”
12 “ staddlh %w1, %0\n”
13 __nops(1))
14 : “=Q” (lock-》owner), “=&r” (tmp)
15 :
16 : “memory”);
17 }
把自旋鎖的服務(wù)號(hào)加1,有兩種實(shí)現(xiàn)方法:
(1)第7~9行代碼,使用指令ldrh(加載,h表示halfword,即2字節(jié))和stlrh(帶有釋放語義的存儲(chǔ))實(shí)現(xiàn),指令stlrh帶有釋放語義,前面的加載/存儲(chǔ)指令必須在指令stlrh開始執(zhí)行之前執(zhí)行完。因?yàn)橐淮沃荒苡幸粋€(gè)進(jìn)程進(jìn)入臨界區(qū),所以只有一個(gè)進(jìn)程把自旋鎖的服務(wù)號(hào)加1,不需要是原子操作。
(2)第11~12行代碼,如果處理器支持大系統(tǒng)擴(kuò)展,那么使用帶有釋放語義的原子加法指令staddlh實(shí)現(xiàn),指令staddlh帶有釋放語義,前面的加載/存儲(chǔ)指令必須在指令staddlh開始執(zhí)行之前執(zhí)行完。
在單處理器系統(tǒng)中,自旋鎖是空的。
include/linux/spinlock_types_up.h
typedef struct { } arch_spinlock_t;
函數(shù)spin_lock()只是禁止內(nèi)核搶占。
spin_lock() -》 raw_spin_lock() -》 _raw_spin_lock()
include/linux/spinlock_api_up.h
#define _raw_spin_lock(lock) __LOCK(lock)
#define __LOCK(lock) \
do { preempt_disable(); ___LOCK(lock); } while (0)
#define ___LOCK(lock) \
do { __acquire(lock); (void)(lock); } while (0)
4. MCS自旋鎖
入場(chǎng)券自旋鎖存在性能問題:所有等待同一個(gè)自旋鎖的處理器在同一個(gè)變量上自旋等待,申請(qǐng)或者釋放鎖的時(shí)候會(huì)修改鎖,導(dǎo)致其他處理器存放自旋鎖的緩存行失效,在擁有幾百甚至幾千個(gè)處理器的大型系統(tǒng)中,處理器申請(qǐng)自旋鎖時(shí)競(jìng)爭(zhēng)可能很激烈,緩存同步的開銷很大,導(dǎo)致系統(tǒng)性能大幅度下降。
MCS(MCS是“Mellor-Crummey”和“Scott”這兩個(gè)發(fā)明人的名字的首字母縮寫)自旋鎖解決了這個(gè)缺點(diǎn),它的策略是為每個(gè)處理器創(chuàng)建一個(gè)變量副本,每個(gè)處理器在申請(qǐng)自旋鎖的時(shí)候在自己的本地變量上自旋等待,避免緩存同步的開銷。
4.1. 傳統(tǒng)的MCS自旋鎖
傳統(tǒng)的MCS自旋鎖包含:
(1)一個(gè)指針tail指向隊(duì)列的尾部。
(2)每個(gè)處理器對(duì)應(yīng)一個(gè)隊(duì)列節(jié)點(diǎn),即mcs_lock_node結(jié)構(gòu)體,其中成員next指向隊(duì)列的下一個(gè)節(jié)點(diǎn),成員locked指示鎖是否被其他處理器占有,如果成員locked的值為1,表示鎖被其他處理器占有。
結(jié)構(gòu)體的定義如下所示:
typedef struct __mcs_lock_node {
struct __mcs_lock_node *next;
int locked;
} ____cacheline_aligned_in_smp mcs_lock_node;
typedef struct {
mcs_lock_node *tail;
mcs_lock_node nodes[NR_CPUS];/* NR_CPUS是處理器的數(shù)量 */
} spinlock_t;
其中“____cacheline_aligned_in_smp”的作用是:在多處理器系統(tǒng)中,結(jié)構(gòu)體的起始地址和長(zhǎng)度都是一級(jí)緩存行長(zhǎng)度的整數(shù)倍。
當(dāng)沒有處理器占有或者等待自旋鎖的時(shí)候,隊(duì)列是空的,tail是空指針。
圖 4.1 處理器0申請(qǐng)MCS自旋鎖
如圖 4.1所示,當(dāng)處理器0申請(qǐng)自旋鎖的時(shí)候,執(zhí)行原子交換操作,使tail指向處理器0的mcs_lock_node結(jié)構(gòu)體,并且返回tail的舊值。tail的舊值是空指針,說明自旋鎖處于空閑狀態(tài),那么處理器0獲得自旋鎖。
圖 4.2 處理器1申請(qǐng)MCS自旋鎖
如圖 4.2所示,當(dāng)處理器0占有自旋鎖的時(shí)候,處理器1申請(qǐng)自旋鎖,執(zhí)行原子交換操作,使tail指向處理器1的mcs_lock_node結(jié)構(gòu)體,并且返回tail的舊值。tail的舊值是處理器0的mcs_lock_node結(jié)構(gòu)體的地址,說明自旋鎖被其他處理器占有,那么使處理器0的mcs_lock_node結(jié)構(gòu)體的成員next指向處理器1的mcs_lock_node結(jié)構(gòu)體,把處理器1的mcs_lock_node結(jié)構(gòu)體的成員locked設(shè)置為1,然后處理器1在自己的mcs_lock_node結(jié)構(gòu)體的成員locked上面自旋等待,等待成員locked的值變成0。
圖 4.3 處理器0釋放MCS自旋鎖
如圖 4.3所示,處理器0釋放自旋鎖,發(fā)現(xiàn)自己的mcs_lock_node結(jié)構(gòu)體的成員next不是空指針,說明有申請(qǐng)者正在等待鎖,于是把下一個(gè)節(jié)點(diǎn)的成員locked設(shè)置為0,處理器1獲得自旋鎖。
處理器1釋放自旋鎖,發(fā)現(xiàn)自己的mcs_lock_node結(jié)構(gòu)體的成員next是空指針,說明自己是最后一個(gè)申請(qǐng)者,于是執(zhí)行原子比較交換操作:如果tail指向自己的mcs_lock_node結(jié)構(gòu)體,那么把tail設(shè)置為空指針。
4.2. 小巧的MCS自旋鎖
傳統(tǒng)的MCS自旋鎖存在的缺陷是:結(jié)構(gòu)體的長(zhǎng)度太大,因?yàn)閙cs_lock_node結(jié)構(gòu)體的起始地址和長(zhǎng)度都必須是一級(jí)緩存行長(zhǎng)度的整數(shù)倍,所以MCS自旋鎖的長(zhǎng)度是(一級(jí)緩存行長(zhǎng)度 + 處理器數(shù)量 * 一級(jí)緩存行長(zhǎng)度),而入場(chǎng)券自旋鎖的長(zhǎng)度只有4字節(jié)。自旋鎖被嵌入到內(nèi)核的很多結(jié)構(gòu)體中,如果自旋鎖的長(zhǎng)度增加,會(huì)導(dǎo)致這些結(jié)構(gòu)體的長(zhǎng)度增加。
經(jīng)過內(nèi)核社區(qū)技術(shù)專家的努力,成功地把MCS自旋鎖放進(jìn)4個(gè)字節(jié),實(shí)現(xiàn)了小巧的MCS自旋鎖。自旋鎖的定義如下所示:
include/asm-generic/qspinlock_types.h
typedef struct qspinlock {
atomic_t val;
} arch_spinlock_t;
另外,為每個(gè)處理器定義1個(gè)隊(duì)列節(jié)點(diǎn)數(shù)組,如下所示:
kernel/locking/qspinlock.c
#ifdef CONFIG_PARAVIRT_SPINLOCKS
#define MAX_NODES 8
#else
#define MAX_NODES 4
#endif
static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);
配置宏CONFIG_PARAVIRT_SPINLOCKS用來啟用半虛擬化的自旋鎖,給虛擬機(jī)使用,本文不考慮這種使用場(chǎng)景。每個(gè)處理器需要4個(gè)隊(duì)列節(jié)點(diǎn),原因如下:
(1) 申請(qǐng)自旋鎖的函數(shù)禁止內(nèi)核搶占,所以進(jìn)程在等待自旋鎖的過程中不會(huì)被其他進(jìn)程搶占。
(2) 進(jìn)程在等待自旋鎖的過程中可能被軟中斷搶占,然后軟中斷等待另一個(gè)自旋鎖。
(3) 軟中斷在等待自旋鎖的過程中可能被硬中斷搶占,然后硬中斷等待另一個(gè)自旋鎖。
(4) 硬中斷在等待自旋鎖的過程中可能被不可屏蔽中斷搶占,然后不可屏蔽中斷等待另一個(gè)自旋鎖。
綜上所述,一個(gè)處理器最多同時(shí)等待4個(gè)自旋鎖。
和入場(chǎng)券自旋鎖相比,MCS自旋鎖增加的內(nèi)存開銷是數(shù)組mcs_nodes。
隊(duì)列節(jié)點(diǎn)的定義如下所示:
kernel/locking/mcs_spinlock.h
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked;
int count;
};
其中成員next指向隊(duì)列的下一個(gè)節(jié)點(diǎn);成員locked指示鎖是否被前一個(gè)等待者占有,如果值為1,表示鎖被前一個(gè)等待者占有;成員count是嵌套層數(shù),也就是數(shù)組mcs_nodes已分配的數(shù)組項(xiàng)的數(shù)量。
自旋鎖的32個(gè)二進(jìn)制位被劃分成4個(gè)字段:
(1) locked字段,指示鎖已經(jīng)被占有,長(zhǎng)度是一個(gè)字節(jié),占用第0~7位。
(2) 一個(gè)pending位,占用第8位,第1個(gè)等待自旋鎖的處理器設(shè)置pending位。
(3) index字段,是數(shù)組索引,指示隊(duì)列的尾部節(jié)點(diǎn)使用數(shù)組mcs_nodes的哪一項(xiàng)。
(4) cpu字段,存放隊(duì)列的尾部節(jié)點(diǎn)的處理器編號(hào),實(shí)際存儲(chǔ)的值是處理器編號(hào)加上1,cpu字段減去1才是真實(shí)的處理器編號(hào)。
index字段和cpu字段合起來稱為tail字段,存放隊(duì)列的尾部節(jié)點(diǎn)的信息,布局分兩種情況:
(1) 如果處理器的數(shù)量小于2的14次方,那么第9~15位沒有使用,第16~17位是index字段,第18~31位是cpu字段。
(2) 如果處理器的數(shù)量大于或等于2的14次方,那么第9~10位是index字段,第11~31位是cpu字段。
把MCS自旋鎖放進(jìn)4個(gè)字節(jié)的關(guān)鍵是:存儲(chǔ)處理器編號(hào)和數(shù)組索引,而不是存儲(chǔ)尾部節(jié)點(diǎn)的地址。
內(nèi)核對(duì)MCS自旋鎖做了優(yōu)化:第1個(gè)等待自旋鎖的處理器直接在鎖自身上面自旋等待,不是在自己的mcs_spinlock結(jié)構(gòu)體上自旋等待。這個(gè)優(yōu)化帶來的好處是:當(dāng)鎖被釋放的時(shí)候,不需要訪問mcs_spinlock結(jié)構(gòu)體的緩存行,相當(dāng)于減少了一次緩存沒命中。后續(xù)的處理器在自己的mcs_spinlock結(jié)構(gòu)體上面自旋等待,直到它們移動(dòng)到隊(duì)列的首部為止。
自旋鎖的pending位進(jìn)一步擴(kuò)展這個(gè)優(yōu)化策略。第1個(gè)等待自旋鎖的處理器簡(jiǎn)單地設(shè)置pending位,不需要使用自己的mcs_spinlock結(jié)構(gòu)體。第2個(gè)處理器看到pending被設(shè)置,開始創(chuàng)建等待隊(duì)列,在自己的mcs_spinlock結(jié)構(gòu)體的locked字段上自旋等待。這種做法消除了兩個(gè)等待者之間的緩存同步,而且第1個(gè)等待者沒使用自己的mcs_spinlock結(jié)構(gòu)體,減少了一次緩存行沒命中。
在多處理器系統(tǒng)中,申請(qǐng)MCS自旋鎖的代碼如下所示:
spin_lock() -》 raw_spin_lock() -》 _raw_spin_lock() -》 __raw_spin_lock() -》 do_raw_spin_lock() -》 arch_spin_lock()
include/asm-generic/qspinlock.h
1 #define arch_spin_lock(l) queued_spin_lock(l)
2
3 static __always_inline void queued_spin_lock(struct qspinlock *lock)
4 {
5 u32 val;
6
7 val = atomic_cmpxchg_acquire(&lock-》val, 0, _Q_LOCKED_VAL);
8 if (likely(val == 0))
9 return;
10 queued_spin_lock_slowpath(lock, val);
11 }
第7行代碼,執(zhí)行帶有獲取語義的原子比較交換操作,如果鎖的值是0,那么把鎖的locked字段設(shè)置為1。獲取語義保證后面的加載/存儲(chǔ)指令必須在函數(shù)atomic_cmpxchg_acquire()完成之后開始執(zhí)行。函數(shù)atomic_cmpxchg_acquire()返回鎖的舊值。
第8~9行代碼,如果鎖的舊值是0,說明申請(qǐng)鎖的時(shí)候鎖處于空閑狀態(tài),那么成功地獲得鎖。
第10行代碼,如果鎖的舊值不是0,說明鎖不是處于空閑狀態(tài),那么執(zhí)行申請(qǐng)自旋鎖的慢速路徑。
申請(qǐng)MCS自旋鎖的慢速路徑如下所示:
kernel/locking/qspinlock.c
1 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
2 {
3 struct mcs_spinlock *prev, *next, *node;
4 u32 new, old, tail;
5 int idx;
6
7 。。.
8 if (val == _Q_PENDING_VAL) {
9 while ((val = atomic_read(&lock-》val)) == _Q_PENDING_VAL)
10 cpu_relax();
11 }
12
13 for (;;) {
14 if (val & ~_Q_LOCKED_MASK)
15 goto queue;
16
17 new = _Q_LOCKED_VAL;
18 if (val == new)
19 new |= _Q_PENDING_VAL;
20
21 old = atomic_cmpxchg_acquire(&lock-》val, val, new);
22 if (old == val)
23 break;
24
25 val = old;
26 }
27
28 if (new == _Q_LOCKED_VAL)
29 return;
30
31 smp_cond_load_acquire(&lock-》val.counter, ?。╒AL & _Q_LOCKED_MASK));
32
33 clear_pending_set_locked(lock);
34 return;
35
36 queue:
37 node = this_cpu_ptr(&mcs_nodes[0]);
38 idx = node-》count++;
39 tail = encode_tail(smp_processor_id(), idx);
40
41 node += idx;
42 node-》locked = 0;
43 node-》next = NULL;
44 。。.
45
46 if (queued_spin_trylock(lock))
47 goto release;
48
49 old = xchg_tail(lock, tail);
50 next = NULL;
51
52 if (old & _Q_TAIL_MASK) {
53 prev = decode_tail(old);
54 smp_read_barrier_depends();
55
56 WRITE_ONCE(prev-》next, node);
57
58 。。.
59 arch_mcs_spin_lock_contended(&node-》locked);
60
61 next = READ_ONCE(node-》next);
62 if (next)
63 prefetchw(next);
64 }
65
66 。。.
67 val = smp_cond_load_acquire(&lock-》val.counter, ?。╒AL & _Q_LOCKED_PENDING_MASK));
68
69 locked:
70 for (;;) {
71 if ((val & _Q_TAIL_MASK) != tail) {
72 set_locked(lock);
73 break;
74 }
75
76 old = atomic_cmpxchg_relaxed(&lock-》val, val, _Q_LOCKED_VAL);
77 if (old == val)
78 goto release;
79
80 val = old;
81 }
82
83 if (!next) {
84 while (?。╪ext = READ_ONCE(node-》next)))
85 cpu_relax();
86 }
87
88 arch_mcs_spin_unlock_contended(&next-》locked);
89 。。.
90
91 release:
92 __this_cpu_dec(mcs_nodes[0].count);
93 }
第8~11行代碼,如果鎖的狀態(tài)是pending,即{tail=0,pending=1,locked=0},那么等待鎖的狀態(tài)變成locked,即{tail=0,pending=0,locked=1}。
第14~15行代碼,如果鎖的tail字段不是0或者pending位是1,說明已經(jīng)有處理器在等待自旋鎖,那么跳轉(zhuǎn)到標(biāo)號(hào)queue,本處理器加入等待隊(duì)列。
第17~21行代碼,如果鎖處于locked狀態(tài),那么把鎖的狀態(tài)設(shè)置為locked & pending,即{tail=0,pending=1,locked=1};如果鎖處于空閑狀態(tài)(占有鎖的處理器剛剛釋放自旋鎖),那么把鎖的狀態(tài)設(shè)置為locked。
第28~29行代碼,如果上一步鎖的狀態(tài)從空閑變成locked,那么成功地獲得鎖。
第31行代碼,等待占有鎖的處理器釋放自旋鎖,即鎖的locked字段變成0。
第32行代碼,成功地獲得鎖,把鎖的狀態(tài)從pending改成locked,即清除pending位,把locked字段設(shè)置為1。
從第2個(gè)等待自旋鎖的處理器開始,需要加入等待隊(duì)列,處理如下:
(1) 第37~43行代碼,從本處理器的數(shù)組mcs_nodes分配一個(gè)數(shù)組項(xiàng),然后初始化。
(2) 第46~47行代碼,如果鎖處于空閑狀態(tài),那么獲得鎖。
(3) 第49行代碼,把自旋鎖的tail字段設(shè)置為本處理器的隊(duì)列節(jié)點(diǎn)的信息,并且返回前一個(gè)隊(duì)列節(jié)點(diǎn)的信息。
(4) 第52行代碼,如果本處理器的隊(duì)列節(jié)點(diǎn)不是隊(duì)列首部,那么處理如下:
1)第56行代碼,把前一個(gè)隊(duì)列節(jié)點(diǎn)的next字段設(shè)置為本處理器的隊(duì)列節(jié)點(diǎn)的地址。
2)第59行代碼,本處理器在自己的隊(duì)列節(jié)點(diǎn)的locked字段上面自旋等待,等待locked字段從0變成1,也就是等待本處理器的隊(duì)列節(jié)點(diǎn)移動(dòng)到隊(duì)列首部。
(5) 第67行代碼,本處理器的隊(duì)列節(jié)點(diǎn)移動(dòng)到隊(duì)列首部以后,在鎖自身上面自旋等待,等待自旋鎖的pending位和locked字段都變成0,也就是等待鎖的狀態(tài)變成空閑。
(6) 鎖的狀態(tài)變成空閑以后,本處理器把鎖的狀態(tài)設(shè)置為locked,分兩種情況:
1)第71行代碼,如果隊(duì)列還有其他節(jié)點(diǎn),即還有其他處理器在等待鎖,那么處理如下:
q第72行代碼,把鎖的locked字段設(shè)置為1。
q第83~86行代碼,等待下一個(gè)等待者設(shè)置本處理器的隊(duì)列節(jié)點(diǎn)的next字段。
q第88行代碼,把下一個(gè)隊(duì)列節(jié)點(diǎn)的locked字段設(shè)置為1。
2)第76行代碼,如果隊(duì)列只有一個(gè)節(jié)點(diǎn),即本處理器是唯一的等待者,那么把鎖的tail字段設(shè)置為0,把locked字段設(shè)置為1。
(7) 第92行代碼,釋放本處理器的隊(duì)列節(jié)點(diǎn)。
釋放MCS自旋鎖的代碼如下所示:
spin_unlock() -》 raw_spin_unlock() -》 _raw_spin_unlock() -》 __raw_spin_unlock() -》 do_raw_spin_unlock() -》 arch_spin_unlock()
include/asm-generic/qspinlock.h
1 #define arch_spin_unlock(l) queued_spin_unlock(l)
2
3 static __always_inline void queued_spin_unlock(struct qspinlock *lock)
4 {
5 (void)atomic_sub_return_release(_Q_LOCKED_VAL, &lock-》val);
6 }
第5行代碼,執(zhí)行帶釋放語義的原子減法操作,把鎖的locked字段設(shè)置為0,釋放語義保證前面的加載/存儲(chǔ)指令在函數(shù)atomic_sub_return_release()開始執(zhí)行之前執(zhí)行完。
MCS自旋鎖的配置宏是CONFIG_ARCH_USE_QUEUED_SPINLOCKS 和CONFIG_QUEUED_SPINLOCKS,目前只有x86處理器架構(gòu)使用MCS自旋鎖,默認(rèn)開啟MCS自旋鎖的配置宏,如下所示:
arch/x86/kconfig
config X86
def_bool y
。。.
select ARCH_USE_QUEUED_SPINLOCKS
。。.
kernel/kconfig.locks
config ARCH_USE_QUEUED_SPINLOCKS
bool
config QUEUED_SPINLOCKS
def_bool y if ARCH_USE_QUEUED_SPINLOCKS
depends on SMP
評(píng)論
查看更多