關(guān)于CAS等原子操作
在開始說無鎖隊(duì)列之前,我們需要知道一個(gè)很重要的技術(shù)就是CAS操作——Compare & Set,或是 Compare & Swap,現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作,X86下對(duì)應(yīng)的是 CMPXCHG 匯編指令。有了這個(gè)原子操作,我們就可以用其來實(shí)現(xiàn)各種無鎖(lock free)的數(shù)據(jù)結(jié)構(gòu)。
這個(gè)操作用C語言來描述就是下面這個(gè)樣子:意思就是說,看一看內(nèi)存*reg里的值是不是oldval,如果是的話,則對(duì)其賦值newval。
{
int old_reg_val = *reg;
if (old_reg_val == oldval) {
*reg = newval;
}
return old_reg_val;
}
我們可以看到,old_reg_val 總是返回,于是,我們可以在 compare_and_swap 操作之后對(duì)其進(jìn)行測(cè)試,以查看它是否與 oldval相匹配,因?yàn)樗赡苡兴煌?,這意味著另一個(gè)并發(fā)線程已成功地競(jìng)爭(zhēng)到 compare_and_swap 并成功將 reg 值從 oldval 更改為別的值了。
這個(gè)操作可以變種為返回bool值的形式(返回 bool值的好處在于,可以調(diào)用者知道有沒有更新成功):
{
if ( *addr != oldval ) {
return false;
}
*addr = newval;
return true;
}
與CAS相似的還有下面的原子操作:
- Fetch And Add,一般用來對(duì)變量做 +1 的原子操作
- Test-and-set,寫值到某個(gè)內(nèi)存位置并傳回其舊值。匯編指令BST
- Test and Test-and-set,用來低低Test-and-Set的資源爭(zhēng)奪情況
注:在實(shí)際的C/C++程序中,CAS的各種實(shí)現(xiàn)版本如下:
1)GCC的CAS
GCC4.1+版本中支持CAS的原子操作
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
2)Windows的CAS
在Windows下,你可以使用下面的Windows API來完成CAS:
__in LONG Exchange,
__in LONG Comperand);
3) C++11中的CAS
C++11中的STL中的atomic類的函數(shù)可以讓你跨平臺(tái)。
bool atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
T* expected, T desired );
無鎖隊(duì)列的鏈表實(shí)現(xiàn)
初始化一個(gè)隊(duì)列的代碼很簡(jiǎn),初始化一個(gè)dummy結(jié)點(diǎn)(注:在鏈表操作中,使用一個(gè)dummy結(jié)點(diǎn),可以少掉很多邊界條件的判斷),如下所示:
{
node = new node()
node->next = NULL;
Q->head = Q->tail = node;
}
我們先來看一下進(jìn)隊(duì)列用CAS實(shí)現(xiàn)的方式,基本上來說就是鏈表的兩步操作:
第一步,把tail指針的next指向要加入的結(jié)點(diǎn)。tail->next = p;
第二步,把tail指針移到隊(duì)尾。tail = p;
{
//準(zhǔn)備新加入的結(jié)點(diǎn)數(shù)據(jù)
n = new node();
n->value = data;
n->next = NULL;
do {
p = Q->tail; //取鏈表尾指針的快照
} while( CAS(p->next, NULL, n) != TRUE);
//while條件注釋:如果沒有把結(jié)點(diǎn)鏈在尾指針上,再試
CAS(Q->tail, p, n); //置尾結(jié)點(diǎn) tail = n;
}
我們可以看到,程序中的那個(gè) do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新結(jié)點(diǎn) n 加到隊(duì)尾。如果不成功,則重新再來一次!
就是說,很有可能我在準(zhǔn)備在隊(duì)列尾加入結(jié)點(diǎn)時(shí),別的線程已經(jīng)加成功了,于是tail指針就變了,于是我的CAS返回了false,于是程序再試,直到試成功為止。這個(gè)很像我們的搶電話熱線的不停重播的情況。
但是你會(huì)看到,為什么我們的“置尾結(jié)點(diǎn)”的操作不判斷是否成功,因?yàn)椋?/p>
- 如果有一個(gè)線程T1,它的while中的CAS如果成功的話,那么其它所有的 隨后線程的CAS都會(huì)失敗,然后就會(huì)再循環(huán),
- 此時(shí),如果T1 線程還沒有更新tail指針,其它的線程繼續(xù)失敗,因?yàn)閠ail->next不是NULL了。
- 直到T1線程更新完 tail 指針,于是其它的線程中的某個(gè)線程就可以得到新的 tail 指針,繼續(xù)往下走了。
- 所以,只要線程能從 while 循環(huán)中退出來,意味著,它已經(jīng)“獨(dú)占”了,tail 指針必然可以被更新。
- 這里有一個(gè)潛在的問題——如果T1線程在用CAS更新tail指針的之前,線程停掉或是掛掉了,那么其它線程就進(jìn)入死循環(huán)了。下面是改良版的EnQueue()
{
n = new node();
n->value = data;
n->next = NULL;
p = Q->tail;
oldp = p
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p.next, NULL, n) != TRUE); //如果沒有把結(jié)點(diǎn)鏈在尾上,再試
CAS(Q->tail, oldp, n); //置尾結(jié)點(diǎn)
}
我們讓每個(gè)線程,自己fetch 指針 p 到鏈表尾。但是這樣的fetch會(huì)很影響性能。而且,如果一個(gè)線程不斷的EnQueue,會(huì)導(dǎo)致所有的其它線程都去 fetch 他們的 p 指針到隊(duì)尾,能不能不要所有的線程都干同一個(gè)事?這樣可以節(jié)省整體的時(shí)間?
比如:直接 fetch Q->tail 到隊(duì)尾?因?yàn)?,所有的線程都共享著 Q->tail,所以,一旦有人動(dòng)了它后,相當(dāng)于其它的線程也跟著動(dòng)了,于是,我們的代碼可以改進(jìn)成如下的實(shí)現(xiàn):
{
n = new node();
n->value = data;
n->next = NULL;
while(TRUE) {
//先取一下尾指針和尾指針的next
tail = Q->tail;
next = tail->next;
//如果尾指針已經(jīng)被移動(dòng)了,則重新開始
if ( tail != Q->tail ) continue;
//如果尾指針的 next 不為NULL,則 fetch 全局尾指針到next
if ( next != NULL ) {
CAS(Q->tail, tail, next);
continue;
}
//如果加入結(jié)點(diǎn)成功,則退出
if ( CAS(tail->next, next, n) == TRUE ) break;
}
CAS(Q->tail, tail, n); //置尾結(jié)點(diǎn)
}
上述的代碼還是很清楚的,相信你一定能看懂,而且,這也是 Java 中的 ConcurrentLinkedQueue 的實(shí)現(xiàn)邏輯,當(dāng)然,我上面的這個(gè)版本比 Java 的好一點(diǎn),因?yàn)闆]有 if 嵌套,嘿嘿。
好了,我們解決了EnQueue,我們?cè)賮砜纯碊eQueue的代碼:(很簡(jiǎn)單,我就不解釋了)
{
do{
p = Q->head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
while( CAS(Q->head, p, p->next) != TRUE );
return p->next->value;
}
我們可以看到,DeQueue的代碼操作的是 head->next,而不是 head 本身。這樣考慮是因?yàn)橐粋€(gè)邊界條件,我們需要一個(gè)dummy的頭指針來解決鏈表中如果只有一個(gè)元素,head 和 tail 都指向同一個(gè)結(jié)點(diǎn)的問題,這樣 EnQueue 和 DeQueue 要互相排斥了。
但是,如果 head 和 tail 都指向同一個(gè)結(jié)點(diǎn),這意味著隊(duì)列為空,應(yīng)該返回 ERR_EMPTY_QUEUE,但是,在判斷 p->next == NULL 時(shí),另外一個(gè)EnQueue操作做了一半,此時(shí)的 p->next 不為 NULL了,但是 tail 指針還差最后一步,沒有更新到新加的結(jié)點(diǎn),這個(gè)時(shí)候就會(huì)出現(xiàn),在 EnQueue 并沒有完成的時(shí)候, DeQueue 已經(jīng)把新增加的結(jié)點(diǎn)給取走了,此時(shí),隊(duì)列為空,但是,head 與 tail 并沒有指向同一個(gè)結(jié)點(diǎn)。如下所示:
雖然,EnQueue的函數(shù)會(huì)把 tail 指針置對(duì),但是,這種情況可能還是會(huì)導(dǎo)致一些并發(fā)問題,所以,嚴(yán)謹(jǐn)來說,我們需要避免這種情況。于是,我們需要加入更多的判斷條件,還確保這個(gè)問題。下面是相關(guān)的改進(jìn)代碼:
{
while(TRUE) {
//取出頭指針,尾指針,和第一個(gè)元素的指針
head = Q->head;
tail = Q->tail;
next = head->next;
// Q->head 指針已移動(dòng),重新取 head指針
if ( head != Q->head ) continue;
// 如果是空隊(duì)列
if ( head == tail && next == NULL ) {
return ERR_EMPTY_QUEUE;
}
//如果 tail 指針落后了
if ( head == tail && next == NULL ) {
CAS(Q->tail, tail, next);
continue;
}
//移動(dòng) head 指針成功后,取出數(shù)據(jù)
if ( CAS( Q->head, head, next) == TRUE){
value = next->value;
break;
}
}
free(head); //釋放老的dummy結(jié)點(diǎn)
return value;
}
CAS的ABA問題
所謂ABA,問題基本是這個(gè)樣子:
- 進(jìn)程P1在共享變量中讀到值為A
- P1被搶占了,進(jìn)程P2執(zhí)行
- P2把共享變量里的值從A改成了B,再改回到A,此時(shí)被P1搶占。
- P1回來看到共享變量里的值沒有被改變,于是繼續(xù)執(zhí)行。
雖然P1以為變量值沒有改變,繼續(xù)執(zhí)行了,但是這個(gè)會(huì)引發(fā)一些潛在的問題。ABA問題最容易發(fā)生在lock free 的算法中的,CAS首當(dāng)其沖,因?yàn)镃AS判斷的是指針的值。很明顯,值是很容易又變成原樣的。
比如上述的DeQueue()函數(shù),因?yàn)槲覀円宧ead和tail分開,所以我們引入了一個(gè)dummy指針給head,當(dāng)我們做CAS的之前,如果head的那塊內(nèi)存被回收并被重用了,而重用的內(nèi)存又被EnQueue()進(jìn)來了,這會(huì)有很大的問題。(內(nèi)存管理中重用內(nèi)存基本上是一種很常見的行為)
這個(gè)例子你可能沒有看懂,一個(gè)活生生的例子——
你拿著一個(gè)裝滿錢的手提箱在飛機(jī)場(chǎng),此時(shí)過來了一個(gè)火辣性感的美女,然后她很暖昧地挑逗著你,并趁你不注意的時(shí)候,把用一個(gè)一模一樣的手提箱和你那裝滿錢的箱子調(diào)了個(gè)包,然后就離開了,你看到你的手提箱還在那,于是就提著手提箱去趕飛機(jī)去了。
這就是ABA的問題。
解決ABA的問題
維基百科上給了一個(gè)解——使用double-CAS(雙保險(xiǎn)的CAS),例如,在32位系統(tǒng)上,我們要檢查64位的內(nèi)容
- 一次用CAS檢查雙倍長(zhǎng)度的值,前半部是值,后半部分是一個(gè)計(jì)數(shù)器。
- 只有這兩個(gè)都一樣,才算通過檢查,要吧賦新的值。并把計(jì)數(shù)器累加1。
這樣一來,ABA發(fā)生時(shí),雖然值一樣,但是計(jì)數(shù)器就不一樣(但是在32位的系統(tǒng)上,這個(gè)計(jì)數(shù)器會(huì)溢出回來又從1開始的,這還是會(huì)有ABA的問題)
當(dāng)然,我們這個(gè)隊(duì)列的問題就是不想讓那個(gè)內(nèi)存重用,這樣明確的業(yè)務(wù)問題比較好解決。
{
loop:
p = q->next;
if (p == NULL){
return p;
}
Fetch&Add(p->refcnt, 1);
if (p == q->next){
return p;
}else{
Release(p);
}
goto loop;
}
其中的 Fetch&Add和Release分是是加引用計(jì)數(shù)和減引用計(jì)數(shù),都是原子操作,這樣就可以阻止內(nèi)存被回收了。
用數(shù)組實(shí)現(xiàn)無鎖隊(duì)列
使用數(shù)組來實(shí)現(xiàn)隊(duì)列是很常見的方法,因?yàn)闆]有內(nèi)存的分部和釋放,一切都會(huì)變得簡(jiǎn)單,實(shí)現(xiàn)的思路如下:
- 數(shù)組隊(duì)列應(yīng)該是一個(gè)ring buffer形式的數(shù)組(環(huán)形數(shù)組)
- 數(shù)組的元素應(yīng)該有三個(gè)可能的值:HEAD,TAIL,EMPTY(當(dāng)然,還有實(shí)際的數(shù)據(jù))
- 數(shù)組一開始全部初始化成EMPTY,有兩個(gè)相鄰的元素要初始化成HEAD和TAIL,這代表空隊(duì)列。
- EnQueue操作。假設(shè)數(shù)據(jù)x要入隊(duì)列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),則說明隊(duì)列滿了。
- DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同樣需要注意,如果x是TAIL,則說明隊(duì)列為空。
算法的一個(gè)關(guān)鍵是——如何定位HEAD或TAIL?
- 我們可以聲明兩個(gè)計(jì)數(shù)器,一個(gè)用來計(jì)數(shù)EnQueue的次數(shù),一個(gè)用來計(jì)數(shù)DeQueue的次數(shù)。
- 這兩個(gè)計(jì)算器使用使用Fetch&ADD來進(jìn)行原子累加,在EnQueue或DeQueue完成的時(shí)候累加就好了。
- 累加后求個(gè)模什么的就可以知道TAIL和HEAD的位置了。
如下圖所示:
小結(jié)
以上基本上就是所有的無鎖隊(duì)列的技術(shù)細(xì)節(jié),這些技術(shù)都可以用在其它的無鎖數(shù)據(jù)結(jié)構(gòu)上。
- 無鎖隊(duì)列主要是通過CAS、FAA這些原子操作,和Retry-Loop實(shí)現(xiàn)。
- 對(duì)于Retry-Loop,我個(gè)人感覺其實(shí)和鎖什么什么兩樣。只是這種“鎖”的粒度變小了,主要是“鎖”HEAD和TAIL這兩個(gè)關(guān)鍵資源。而不是整個(gè)數(shù)據(jù)結(jié)構(gòu)
-
內(nèi)存
+關(guān)注
關(guān)注
8文章
3042瀏覽量
74177 -
數(shù)據(jù)結(jié)構(gòu)
+關(guān)注
關(guān)注
3文章
573瀏覽量
40164 -
CAS
+關(guān)注
關(guān)注
0文章
35瀏覽量
15224 -
線程
+關(guān)注
關(guān)注
0文章
505瀏覽量
19715
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論