為了演示一些在設(shè)計(jì)無鎖數(shù)據(jù)結(jié)構(gòu)中所使用到的技術(shù),我們將看到一些無鎖實(shí)現(xiàn)的簡(jiǎn)單數(shù)據(jù)結(jié)構(gòu)。這里不僅要在每個(gè)例子中描述一個(gè)有用的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),還將使用這些例子的某些特別之處來闡述對(duì)于無鎖數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。
如之前所提到的,無鎖結(jié)構(gòu)依賴與原子操作和內(nèi)存序及相關(guān)保證,以確保多線程以正確的順序訪問數(shù)據(jù)結(jié)構(gòu)。最初,所有原子操作默認(rèn)使用的是memory_order_seq_cst內(nèi)存序;因?yàn)楹?jiǎn)單,所以使用(所有memory_order_seq_cst都遵循一種順序)。不過,在后面的例子中,我們將會(huì)降低內(nèi)存序的要求,使用memory_order_acquire, memory_order_release, 甚至memory_order_relaxed。雖然這個(gè)例子中沒有直接的使用鎖,但需要注意的是對(duì)std::atomic_flag的使用。一些平臺(tái)上的無鎖結(jié)構(gòu)實(shí)現(xiàn)(實(shí)際上在C++的標(biāo)準(zhǔn)庫(kù)的實(shí)現(xiàn)中),使用了內(nèi)部鎖(詳見第5章)。另一些平臺(tái)上,基于鎖的簡(jiǎn)單數(shù)據(jù)結(jié)構(gòu)可能會(huì)更適合;當(dāng)然,還有很多平臺(tái)不能一一說明;在選擇一種實(shí)現(xiàn)前,需要明確需求,并且配置各種選項(xiàng)以滿足要求。
那么,回到數(shù)據(jù)結(jié)構(gòu)上來吧,最簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu)——棧。
棧的要求很簡(jiǎn)單:查詢順序是添加順序的逆序——先入后出(LIFO)。所以,要確保一個(gè)值安全的添加入棧就十分重要,因?yàn)楹芸赡茉谔砑雍?,馬上被其他線程索引,同時(shí)確保只有一個(gè)線程能索引到給定值也是很重要。最簡(jiǎn)單的棧就是鏈表,head指針指向第一個(gè)節(jié)點(diǎn)(可能是下一個(gè)被索引到的節(jié)點(diǎn)),并且每個(gè)節(jié)點(diǎn)依次指向下一個(gè)節(jié)點(diǎn)。
在這樣的情況下,添加一個(gè)節(jié)點(diǎn)相對(duì)來說很簡(jiǎn)單:
在單線程的上下文中,這種方式?jīng)]有問題,不過當(dāng)多線程對(duì)棧進(jìn)行修改時(shí),這幾步就不夠用了。至關(guān)重要的是,當(dāng)有兩個(gè)線程同時(shí)添加節(jié)點(diǎn)的時(shí)候,在第2步和第3步的時(shí)候會(huì)產(chǎn)生條件競(jìng)爭(zhēng):一個(gè)線程可能在修改head的值時(shí),另一個(gè)線程正在執(zhí)行第2步,并且在第3步中對(duì)head進(jìn)行更新。這就會(huì)使之前那個(gè)線程的工作被丟棄,亦或是造成更加糟糕的后果。在了解如何解決這個(gè)條件競(jìng)爭(zhēng)之前,還要注意一個(gè)很重要的事:當(dāng)head更新,并指向了新節(jié)點(diǎn)時(shí),另一個(gè)線程就能讀取到這個(gè)節(jié)點(diǎn)了。因此,在head設(shè)置為指向新節(jié)點(diǎn)前,讓新節(jié)點(diǎn)完全準(zhǔn)備就緒就變得很重要了;因?yàn)?,在這之后就不能對(duì)節(jié)點(diǎn)進(jìn)行修改了。
OK,那如何應(yīng)對(duì)討厭的條件競(jìng)爭(zhēng)呢?答案就是:在第3步的時(shí)候使用一個(gè)原子“比較/交換”操作,來保證當(dāng)步驟2對(duì)head進(jìn)行讀取時(shí),不會(huì)對(duì)head進(jìn)行修改。當(dāng)有修改時(shí),可以循環(huán)“比較/交換”操作。下面的代碼就展示了,不用鎖來實(shí)現(xiàn)線程安全的push()函數(shù)。
清單7.2 不用鎖實(shí)現(xiàn)push()
template<typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;
node(T const& data_): // 1
data(data_)
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data); // 2
new_node->next=head.load(); // 3
while(!head.compare_exchange_weak(new_node->next,new_node)); // 4
}
};
上面代碼近乎能匹配之前所說的三個(gè)步驟:創(chuàng)建一個(gè)新節(jié)點(diǎn)②,設(shè)置新節(jié)點(diǎn)的next指針指向當(dāng)前head③,并設(shè)置head指針指向新節(jié)點(diǎn)④。node結(jié)構(gòu)用其自身的構(gòu)造函數(shù)來進(jìn)行數(shù)據(jù)填充①,必須保證節(jié)點(diǎn)在構(gòu)造完成后隨時(shí)能被彈出。之后需要使用compare_exchange_weak()來保證在被存儲(chǔ)到new_node->next的head指針和之前的一樣③。代碼的亮點(diǎn)是使用“比較/交換”操作:當(dāng)其返回false時(shí),因?yàn)楸容^失敗(例如,head被其他線程鎖修改),new_node->next作為操作的第一個(gè)參數(shù),將會(huì)更新head。循環(huán)中不需要每次都重新加載head指針,因?yàn)榫幾g器會(huì)幫你完成這件事。同樣,因?yàn)檠h(huán)可能直接就失敗了,所以這里使用compare_exchange_weak要好于使用compare_exchange_strong(詳見第5章)。
所以,這里暫時(shí)不需要pop()操作,可以先快速檢查一下push()的實(shí)現(xiàn)是否有違指導(dǎo)意見。這里唯一一個(gè)能拋出異常的地方就構(gòu)造新node的時(shí)候①,不過其會(huì)自行處理,且鏈表中的內(nèi)容沒有被修改,所以這里是安全的。因?yàn)樵跇?gòu)建數(shù)據(jù)的時(shí)候,是將其作為node的一部分作為存儲(chǔ)的,并且使用compare_exchange_weak()來更新head指針,所以這里沒有惡性的條件競(jìng)爭(zhēng)。“比較/交換”成功時(shí),節(jié)點(diǎn)已經(jīng)準(zhǔn)備就緒,且隨時(shí)可以提取。因?yàn)檫@里沒有鎖,所以就不存在死鎖的情況,這里的push()函數(shù)實(shí)現(xiàn)的很成功。
那么,你現(xiàn)在已經(jīng)有往棧中添加數(shù)據(jù)的方法了,現(xiàn)在需要?jiǎng)h除數(shù)據(jù)的方法。其步驟如下,也很簡(jiǎn)單:
但在多線程環(huán)境下,就不像看起來那么簡(jiǎn)單了。當(dāng)有兩個(gè)線程要從棧中移除數(shù)據(jù),兩個(gè)線程可能在步驟1中讀取到同一個(gè)head(值相同)。當(dāng)其中一個(gè)線程處理到步驟5,而另一個(gè)線程還在處理步驟2時(shí),這個(gè)還在處理步驟2的線程將會(huì)解引用一個(gè)懸空指針。這只是寫無鎖代碼所遇到的最大問題之一,所以現(xiàn)在只能跳過步驟5,讓節(jié)點(diǎn)泄露。
另一個(gè)問題就是:當(dāng)兩個(gè)線程讀取到同一個(gè)head值,他們將返回同一個(gè)節(jié)點(diǎn)。這就違反了棧結(jié)構(gòu)的意圖,所以你需要避免這樣的問題產(chǎn)生。你可以像在push()函數(shù)中解決條件競(jìng)爭(zhēng)那樣來解決這個(gè)問題:使用“比較/交換”操作更新head。當(dāng)“比較/交換”操作失敗時(shí),不是一個(gè)新節(jié)點(diǎn)已被推入,就是其他線程已經(jīng)彈出了想要彈出的節(jié)點(diǎn)。無論是那種情況,都得返回步驟1(“比較/交換”操作將會(huì)重新讀取head)。
當(dāng)“比較/交換”成功,就可以確定當(dāng)前線程是彈出給定節(jié)點(diǎn)的唯一線程,之后就可以放心的執(zhí)行步驟4了。這里先看一下pop()的雛形:
template<typename T>
class lock_free_stack
{
public:
void pop(T& result)
{
node* old_head=head.load();
while(!head.compare_exchange_weak(old_head,old_head->next));
result=old_head->data;
}
};
雖然這段代碼很優(yōu)雅,但這里還有兩個(gè)節(jié)點(diǎn)泄露的問題。首先,這段代碼在空鏈表的時(shí)候不工作:當(dāng)head指針式一個(gè)空指針時(shí),當(dāng)要訪問next指針時(shí),將引起未定義行為。這很容易通過對(duì)nullptr的檢查進(jìn)行修復(fù)(在while循環(huán)中),要不對(duì)空棧拋出一個(gè)異常,要不返回一個(gè)bool值來表明成功與否。
第二個(gè)問題就是異常安全問題。當(dāng)在第3章中介紹棧結(jié)構(gòu)時(shí),了解了在返回值的時(shí)候會(huì)出現(xiàn)異常安全問題:當(dāng)有異常被拋出時(shí),復(fù)制的值將丟失。在這種情況下,傳入引用是一種可以接受的解決方案;因?yàn)檫@樣就能保證,當(dāng)有異常拋出時(shí),棧上的數(shù)據(jù)不會(huì)丟失。不幸的是,不能這樣做;只能在單一線程對(duì)值進(jìn)行返回的時(shí)候,才進(jìn)行拷貝,以確??截惒僮鞯陌踩?,這就意味著在拷貝結(jié)束后這個(gè)節(jié)點(diǎn)就被刪除了。因此,通過引用獲取返回值的方式就沒有任何優(yōu)勢(shì):直接返回也是可以的。若想要安全的返回值,你必須使用第3章中的其他方法:返回指向數(shù)據(jù)值的(智能)指針。
當(dāng)返回的是智能指針時(shí),就能返回nullptr以表明沒有值可返回,但是要求在堆上對(duì)智能指針進(jìn)行內(nèi)存分配。將分配過程做為pop()的一部分時(shí)(也沒有更好的選擇了),堆分配時(shí)可能會(huì)拋出一個(gè)異常。與此相反,可以在push()操作中對(duì)內(nèi)存進(jìn)行分配——無論怎樣,都得對(duì)node進(jìn)行內(nèi)存分配。返回一個(gè)std::shared_ptr<>不會(huì)拋出異常,所以在pop()中進(jìn)行分配就是安全的。將上面的觀點(diǎn)放在一起,就能看到如下的代碼。
清單7.3 帶有節(jié)點(diǎn)泄露的無鎖棧
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data; // 1 指針獲取數(shù)據(jù)
node* next;
node(T const& data_):
data(std::make_shared<T>(data_)) // 2 讓std::shared_ptr指向新分配出來的T
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop()
{
node* old_head=head.load();
while(old_head && // 3 在解引用前檢查old_head是否為空指針
!head.compare_exchange_weak(old_head,old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>(); // 4
}
};
智能指針指向當(dāng)前數(shù)據(jù)①,這里必須在堆上為數(shù)據(jù)分配內(nèi)存(在node結(jié)構(gòu)體中)②。而后,在compare_exchage_weak()循環(huán)中③,需要在old_head指針前,檢查指針是否為空。最終,如果存在相關(guān)節(jié)點(diǎn),那么將會(huì)返回相關(guān)節(jié)點(diǎn)的值;當(dāng)不存在時(shí),將返回一個(gè)空指針④。注意,結(jié)構(gòu)是無鎖的,但并不是無等待的,因?yàn)樵趐ush()和pop()函數(shù)中都有while循環(huán),當(dāng)compare_exchange_weak()總是失敗的時(shí)候,循環(huán)將會(huì)無限循環(huán)下去。
第一次了解pop()時(shí),為了避免條件競(jìng)爭(zhēng)(當(dāng)有線程刪除一個(gè)節(jié)點(diǎn)的同時(shí),其他線程還持有指向該節(jié)點(diǎn)的指針,并且要解引用)選擇了帶有內(nèi)存泄露的節(jié)點(diǎn)。但是,不論什么樣的C++程序,存在內(nèi)存泄露都不可接受。所以,現(xiàn)在來解決這個(gè)問題!
基本問題在于,當(dāng)要釋放一個(gè)節(jié)點(diǎn)時(shí),需要確認(rèn)其他線程沒有持有這個(gè)節(jié)點(diǎn)。當(dāng)只有一個(gè)線程調(diào)用pop(),就可以放心的進(jìn)行釋放。當(dāng)節(jié)點(diǎn)添加入棧后,push()就不會(huì)與節(jié)點(diǎn)有任何的關(guān)系了,所以只有調(diào)用pop()函數(shù)的線程與已加入節(jié)點(diǎn)有關(guān),并且能夠安全的將節(jié)點(diǎn)刪除。
另一方面,當(dāng)棧同時(shí)處理多線程對(duì)pop()的調(diào)用時(shí),就需要知道節(jié)點(diǎn)在什么時(shí)候被刪除。這實(shí)際上就需要你寫一個(gè)節(jié)點(diǎn)專用的垃圾收集器。這聽起來有些可怖,同時(shí)也相當(dāng)棘手,不過并不是多么糟糕:這里需要檢查節(jié)點(diǎn),并且檢查哪些節(jié)點(diǎn)被pop()訪問。不需要對(duì)push()中的節(jié)點(diǎn)有所擔(dān)心,因?yàn)檫@些節(jié)點(diǎn)推到棧上以后,才能被訪問到,而多線程只能通過pop()訪問同一節(jié)點(diǎn)。
當(dāng)沒有線程調(diào)用pop()時(shí),這時(shí)可以刪除棧上的任意節(jié)點(diǎn)。因此,當(dāng)添加節(jié)點(diǎn)到“可刪除”列表中時(shí),就能從中提取數(shù)據(jù)了。而后,當(dāng)沒有線程通過pop()訪問節(jié)點(diǎn)時(shí),就可以安全的刪除這些節(jié)點(diǎn)了。那怎么知道沒有線程調(diào)用pop()了呢?很簡(jiǎn)單——計(jì)數(shù)即可。當(dāng)計(jì)數(shù)器數(shù)值增加時(shí),就是有節(jié)點(diǎn)推入;當(dāng)減少時(shí),就是有節(jié)點(diǎn)被刪除。這樣從“可刪除”列表中刪除節(jié)點(diǎn)就很安全了,直到計(jì)數(shù)器的值為0為止。當(dāng)然,這個(gè)計(jì)數(shù)器必須是原子的,這樣它才能在多線程的情況下正確的進(jìn)行計(jì)數(shù)。下面的清單中,展示了修改后的pop()函數(shù),有些支持功能的實(shí)現(xiàn)將在清單7.5中給出。
清單7.4 沒有線程通過pop()訪問節(jié)點(diǎn)時(shí),就對(duì)節(jié)點(diǎn)進(jìn)行回收
template<typename T>
class lock_free_stack
{
private:
std::atomic<unsigned> threads_in_pop; // 1 原子變量
void try_reclaim(node* old_head);
public:
std::shared_ptr<T> pop()
{
++threads_in_pop; // 2 在做事之前,計(jì)數(shù)值加1
node* old_head=head.load();
while(old_head &&
!head.compare_exchange_weak(old_head,old_head->next));
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data); // 3 回收刪除的節(jié)點(diǎn)
}
try_reclaim(old_head); // 4 從節(jié)點(diǎn)中直接提取數(shù)據(jù),而非拷貝指針
return res;
}
};
threads_in_pop①原子變量用來記錄有多少線程試圖彈出棧中的元素。當(dāng)pop()②函數(shù)調(diào)用的時(shí)候,計(jì)數(shù)器加一;當(dāng)調(diào)用try_reclaim()時(shí),計(jì)數(shù)器減一,當(dāng)這個(gè)函數(shù)被節(jié)點(diǎn)調(diào)用時(shí),說明這個(gè)節(jié)點(diǎn)已經(jīng)被刪除④。因?yàn)闀簳r(shí)不需要將節(jié)點(diǎn)刪除,可以通過swap()函數(shù)來刪除節(jié)點(diǎn)上的數(shù)據(jù)③(而非只是拷貝指針),當(dāng)不再需要這些數(shù)據(jù)的時(shí)候,這些數(shù)據(jù)會(huì)自動(dòng)刪除,而不是持續(xù)存在著(因?yàn)檫@里還有對(duì)未刪除節(jié)點(diǎn)的引用)。接下來看一下try_reclaim()是如何實(shí)現(xiàn)的。
清單7.5 采用引用計(jì)數(shù)的回收機(jī)制
template<typename T>
class lock_free_stack
{
private:
std::atomic<node*> to_be_deleted;
static void delete_nodes(node* nodes)
{
while(nodes)
{
node* next=nodes->next;
delete nodes;
nodes=next;
}
}
void try_reclaim(node* old_head)
{
if(threads_in_pop==1) // 1
{
node* nodes_to_delete=to_be_deleted.exchange(nullptr); // 2 聲明“可刪除”列表
if(!--threads_in_pop) // 3 是否只有一個(gè)線程調(diào)用pop()?
{
delete_nodes(nodes_to_delete); // 4
}
else if(nodes_to_delete) // 5
{
chain_pending_nodes(nodes_to_delete); // 6
}
delete old_head; // 7
}
else
{
chain_pending_node(old_head); // 8
--threads_in_pop;
}
}
void chain_pending_nodes(node* nodes)
{
node* last=nodes;
while(node* const next=last->next) // 9 讓next指針指向鏈表的末尾
{
last=next;
}
chain_pending_nodes(nodes,last);
}
void chain_pending_nodes(node* first,node* last)
{
last->next=to_be_deleted; // 10
while(!to_be_deleted.compare_exchange_weak( // 11 用循環(huán)來保證last->next的正確性
last->next,first));
}
void chain_pending_node(node* n)
{
chain_pending_nodes(n,n); // 12
}
};
回收節(jié)點(diǎn)時(shí)①,threads_in_pop的數(shù)值是1,也就是當(dāng)前線程正在對(duì)pop()進(jìn)行訪問,這時(shí)就可以安全的將節(jié)點(diǎn)進(jìn)行刪除了⑦(將等待節(jié)點(diǎn)刪除也是安全的)。當(dāng)數(shù)值不是1時(shí),刪除任何節(jié)點(diǎn)都不安全,所以需要向等待列表中繼續(xù)添加節(jié)點(diǎn)⑧。
假設(shè)在某一時(shí)刻,threads_in_pop的值為1。那就可以嘗試回收等待列表,如果不回收,節(jié)點(diǎn)就會(huì)繼續(xù)等待,直到整個(gè)棧被銷毀。要做到回收,首先要通過一個(gè)原子exchange操作聲明②刪除列表,并將計(jì)數(shù)器減一③。如果之后計(jì)數(shù)的值為0,就意味著沒有其他線程訪問等待節(jié)點(diǎn)鏈表。出現(xiàn)新的等待節(jié)點(diǎn)時(shí),不必為其煩惱,因?yàn)樗鼈儗⒈话踩幕厥铡6?,可以使用delete_nodes對(duì)鏈表進(jìn)行迭代,并將其刪除④。
當(dāng)計(jì)數(shù)值在減后不為0,回收節(jié)點(diǎn)就不安全;所以如果存在⑤,就需要將其掛在等待刪除鏈表之后⑥,這種情況會(huì)發(fā)生在多個(gè)線程同時(shí)訪問數(shù)據(jù)結(jié)構(gòu)的時(shí)候。一些線程在第一次測(cè)試threads_in_pop①和對(duì)“回收”鏈表的聲明②操作間調(diào)用pop(),這可能新填入一個(gè)已經(jīng)被線程訪問的節(jié)點(diǎn)到鏈表中。在圖7.1中,線程C添加節(jié)點(diǎn)Y到to_be_deleted鏈表中,即使線程B仍將其引用作為old_head,之后會(huì)嘗試訪問其next指針。在線程A刪除節(jié)點(diǎn)的時(shí)候,會(huì)造成線程B發(fā)生未定義的行為。
http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter7/7-1.png" alt="" />
圖7.1 三個(gè)線程同時(shí)調(diào)用pop(),說明為什么要在try_reclaim()對(duì)聲明節(jié)點(diǎn)進(jìn)行刪除前,對(duì)threads_in_pop進(jìn)行檢查。
為了將等待刪除的節(jié)點(diǎn)添加入等待刪除鏈表,需要復(fù)用節(jié)點(diǎn)的next指針將等待刪除節(jié)點(diǎn)鏈接在一起。在這種情況下,將已存在的鏈表鏈接到刪除鏈表后面,通過遍歷的方式找到鏈表的末尾⑨,將最后一個(gè)節(jié)點(diǎn)的next指針替換為當(dāng)前to_be_deleted指針⑩,并且將鏈表中的第一個(gè)節(jié)點(diǎn)作為新的to_be_deleted指針進(jìn)行存儲(chǔ)?。這里需要在循環(huán)中使用compare_exchange_weak來保證,通過其他線程添加進(jìn)來的節(jié)點(diǎn)不會(huì)發(fā)生內(nèi)存泄露。這樣,在鏈表發(fā)生改變時(shí),更新next指針很方便。添加單個(gè)節(jié)點(diǎn)是一種特殊情況,因?yàn)檫@需要將這個(gè)節(jié)點(diǎn)作為第一個(gè)節(jié)點(diǎn),同時(shí)也是最后一個(gè)節(jié)點(diǎn)進(jìn)行添加?。
在低負(fù)荷的情況下,這種方式?jīng)]有問題,因?yàn)樵跊]有線程訪問pop(),有一個(gè)合適的靜態(tài)指針。不過,這只是一個(gè)瞬時(shí)的狀態(tài),也就是為什么在回收前,需要檢查threads_in_pop計(jì)數(shù)為0③的原因;同樣也是刪除節(jié)點(diǎn)⑦前進(jìn)行對(duì)計(jì)數(shù)器檢查的原因。刪除節(jié)點(diǎn)是一項(xiàng)耗時(shí)的工作,并且希望其他線程能對(duì)鏈表做的修改越小越好。從第一次發(fā)現(xiàn)threads_in_pop是1,到嘗試刪除節(jié)點(diǎn),會(huì)用很長(zhǎng)的時(shí)間,這樣就會(huì)讓線程有機(jī)會(huì)調(diào)用pop(),會(huì)讓threads_in_pop不為0,阻止節(jié)點(diǎn)的刪除操作。
在高負(fù)荷的情況,不會(huì)存在靜態(tài);因?yàn)?,其他線程在初始化之后,都能進(jìn)入pop()。在這樣的情況下,to_ne_deleted鏈表將會(huì)無界的增加,并且會(huì)再次泄露。當(dāng)這里不存在任何靜態(tài)的情況時(shí),就得為回收節(jié)點(diǎn)尋找替代機(jī)制。關(guān)鍵是要確定沒有線程訪問一個(gè)給定節(jié)點(diǎn),那么這個(gè)節(jié)點(diǎn)就能被回收?,F(xiàn)在,最簡(jiǎn)單的替換機(jī)制就是使用風(fēng)險(xiǎn)指針(hazard pointer)。
“風(fēng)險(xiǎn)指針”這個(gè)術(shù)語引用于Maged Michael的技術(shù)發(fā)現(xiàn)[1]。之所以這樣叫,是因?yàn)閯h除一個(gè)節(jié)點(diǎn)可能會(huì)讓其他引用其的線程處于危險(xiǎn)之中。當(dāng)其他線程持有這個(gè)刪除的節(jié)點(diǎn)的指針,并且解引用進(jìn)行操作的時(shí)候,將會(huì)出現(xiàn)未定義行為。這里的基本觀點(diǎn)就是,當(dāng)有線程去訪問要被(其他線程)刪除的對(duì)象時(shí),會(huì)先設(shè)置對(duì)這個(gè)對(duì)象設(shè)置一個(gè)風(fēng)險(xiǎn)指針,而后通知其他線程,刪除這個(gè)指針是一個(gè)危險(xiǎn)的行為。一旦這個(gè)對(duì)象不再被需要,那么就可以清除風(fēng)險(xiǎn)指針了。如果了解牛津/劍橋的龍舟比賽,那么這里使用到的機(jī)制和龍舟比賽開賽時(shí)差不多:每個(gè)船上的舵手都舉起手來,以表示他們還沒有準(zhǔn)備好。只要有舵手的手是舉著的,那么裁判就不能讓比賽開始。當(dāng)所有舵手的手都放下后,比賽才能開始;在比賽還未開始或感覺自己船隊(duì)的情況有變時(shí),舵手可以再次舉手。
當(dāng)線程想要?jiǎng)h除一個(gè)對(duì)象,那么它就必須檢查系統(tǒng)中其他線程是否持有風(fēng)險(xiǎn)指針。當(dāng)沒有風(fēng)險(xiǎn)指針的時(shí)候,那么它就可以安全刪除對(duì)象。否則,它就必須等待風(fēng)險(xiǎn)指針的消失了。這樣,線程就得周期性的檢查其想要?jiǎng)h除的對(duì)象是否能安全刪除。
看起來很簡(jiǎn)單,在C++中應(yīng)該怎么做呢?
首先,需要一個(gè)地點(diǎn)能存儲(chǔ)指向訪問對(duì)象的指針,這個(gè)地點(diǎn)就是風(fēng)險(xiǎn)指針。這個(gè)地點(diǎn)必須能讓所有線程看到,需要其中一些線程可以對(duì)數(shù)據(jù)結(jié)構(gòu)進(jìn)行訪問。如何正確和高效的分配這些線程,的確是一個(gè)挑戰(zhàn),所以這個(gè)問題可以放在后面解決,而后假設(shè)你有一個(gè)get_hazard_pointer_for_current_thread()的函數(shù),這個(gè)函數(shù)可以返回風(fēng)險(xiǎn)指針的引用。當(dāng)你讀取一個(gè)指針,并且想要解引用它的時(shí)候,你就需要這個(gè)函數(shù)——在這種情況下head數(shù)值源于下面的列表:
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load(); // 1
node* temp;
do
{
temp=old_head;
hp.store(old_head); // 2
old_head=head.load();
} while(old_head!=temp); // 3
// ...
}
在while循環(huán)中就能保證node不會(huì)在讀取舊head指針①時(shí),以及在設(shè)置風(fēng)險(xiǎn)指針的時(shí)被刪除了。這種模式下,其他線程不知道有線程對(duì)這個(gè)給定的節(jié)點(diǎn)進(jìn)行了訪問。幸運(yùn)的是,當(dāng)舊head節(jié)點(diǎn)要被刪除時(shí),head本身是要改變的,所以需要對(duì)head進(jìn)行檢查,并持續(xù)循環(huán),直到head指針中的值與風(fēng)險(xiǎn)指針中的值相同③。使用風(fēng)險(xiǎn)指針,如同依賴對(duì)已刪除對(duì)象的引用。當(dāng)使用默認(rèn)的new和delete操作對(duì)風(fēng)險(xiǎn)指針進(jìn)行操作時(shí),會(huì)出現(xiàn)未定義行為,所以需要確定實(shí)現(xiàn)是否支持這樣的操作,或使用自定義分配器來保證這種用法的正確性。
現(xiàn)在已經(jīng)設(shè)置了風(fēng)險(xiǎn)指針,那就可以對(duì)pop()進(jìn)行處理了,基于現(xiàn)在了解到的安全知識(shí),這里不會(huì)有其他線程來刪除節(jié)點(diǎn)。啊哈!這里每一次重新加載old_head時(shí),解引用剛剛讀取到的指針時(shí),就需要更新風(fēng)險(xiǎn)指針。當(dāng)從鏈表中提取一個(gè)節(jié)點(diǎn)時(shí),就可以將風(fēng)險(xiǎn)指針清除了。如果沒有其他風(fēng)險(xiǎn)指針引用節(jié)點(diǎn),就可以安全的刪除節(jié)點(diǎn)了;否則,就需要將其添加到鏈表中,之后再將其刪除。下面的代碼就是對(duì)該方案的完整實(shí)現(xiàn)。
清單7.6 使用風(fēng)險(xiǎn)指針實(shí)現(xiàn)的pop()
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load();
do
{
node* temp;
do // 1 直到將風(fēng)險(xiǎn)指針設(shè)為head指針
{
temp=old_head;
hp.store(old_head);
old_head=head.load();
} while(old_head!=temp);
}
while(old_head &&
!head.compare_exchange_strong(old_head,old_head->next));
hp.store(nullptr); // 2 當(dāng)聲明完成,清除風(fēng)險(xiǎn)指針
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);
if(outstanding_hazard_pointers_for(old_head)) // 3 在刪除之前對(duì)風(fēng)險(xiǎn)指針引用的節(jié)點(diǎn)進(jìn)行檢查
{
reclaim_later(old_head); // 4
}
else
{
delete old_head; // 5
}
delete_nodes_with_no_hazards(); // 6
}
return res;
}
首先,循環(huán)內(nèi)部會(huì)對(duì)風(fēng)險(xiǎn)指針進(jìn)行設(shè)置,在當(dāng)“比較/交換”操作失敗會(huì)重載old_head,再次進(jìn)行設(shè)置①。使用compare_exchange_strong(),是因?yàn)樾枰谘h(huán)內(nèi)部做一些實(shí)際的工作:當(dāng)compare_exchange_weak()偽失敗后,風(fēng)險(xiǎn)指針將被重置(沒有必要)。這個(gè)過程能保證風(fēng)險(xiǎn)指針在解引用(old_head)之前,能被正確的設(shè)置。當(dāng)已聲明了一個(gè)風(fēng)險(xiǎn)指針,那么就可以將其清除了②。如果想要獲取一個(gè)節(jié)點(diǎn),就需要檢查其他線程上的風(fēng)險(xiǎn)指針,檢查是否有其他指針引用該節(jié)點(diǎn)③。如果有,就不能刪除節(jié)點(diǎn),只能將其放在鏈表中,之后再進(jìn)行回收④;如果沒有,就能直接將這個(gè)節(jié)點(diǎn)刪除了⑤。最后,如果需要對(duì)任意節(jié)點(diǎn)進(jìn)行檢查,可以調(diào)用reclaim_later()。如果鏈表上沒有任何風(fēng)險(xiǎn)指針引用節(jié)點(diǎn),就可以安全的刪除這些節(jié)點(diǎn)⑥。當(dāng)有節(jié)點(diǎn)持有風(fēng)險(xiǎn)指針,就只能讓下一個(gè)調(diào)用pop()的線程離開。
當(dāng)然,這些函數(shù)——get_hazard_pointer_for_current_thread(), reclaim_later(), outstanding_hazard_pointers_for(), 和delete_nodes_with_no_hazards()——的實(shí)現(xiàn)細(xì)節(jié)我們還沒有看到,先來看看它們是如何工作的。
為線程分配風(fēng)險(xiǎn)指針實(shí)例的具體方案:使用get_hazard_pointer_for_current_thread()與程序邏輯的關(guān)系并不大(不過會(huì)影響效率,接下會(huì)看到具體的情況)??梢允褂靡粋€(gè)簡(jiǎn)單的結(jié)構(gòu)體:固定長(zhǎng)度的“線程ID-指針”數(shù)組。get_hazard_pointer_for_curent_thread()就可以通過這個(gè)數(shù)據(jù)來找到第一個(gè)釋放槽,并將當(dāng)前線程的ID放入到這個(gè)槽中。當(dāng)線程退出時(shí),槽就再次置空,可以通過默認(rèn)構(gòu)造std::thread::id()將線程ID放入槽中。這個(gè)實(shí)現(xiàn)就如下所示:
清單7.7 get_hazard_pointer_for_current_thread()函數(shù)的簡(jiǎn)單實(shí)現(xiàn)
unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner
{
hazard_pointer* hp;
public:
hp_owner(hp_owner const&)=delete;
hp_owner operator=(hp_owner const&)=delete;
hp_owner():
hp(nullptr)
{
for(unsigned i=0;i<max_hazard_pointers;++i)
{
std::thread::id old_id;
if(hazard_pointers[i].id.compare_exchange_strong( // 6 嘗試聲明風(fēng)險(xiǎn)指針的所有權(quán)
old_id,std::this_thread::get_id()))
{
hp=&hazard_pointers[i];
break; // 7
}
}
if(!hp) // 1
{
throw std::runtime_error("No hazard pointers available");
}
}
std::atomic<void*>& get_pointer()
{
return hp->pointer;
}
~hp_owner() // 2
{
hp->pointer.store(nullptr); // 8
hp->id.store(std::thread::id()); // 9
}
};
std::atomic<void*>& get_hazard_pointer_for_current_thread() // 3
{
thread_local static hp_owner hazard; // 4 每個(gè)線程都有自己的風(fēng)險(xiǎn)指針
return hazard.get_pointer(); // 5
}
get_hazard_pointer_for_current_thread()的實(shí)現(xiàn)看起來很簡(jiǎn)單③:一個(gè)hp_owner④類型的thread_local(本線程所有)變量,用來存儲(chǔ)當(dāng)前線程的風(fēng)險(xiǎn)指針,可以返回這個(gè)變量所持有的指針⑤。之后的工作:第一次有線程調(diào)用這個(gè)函數(shù)時(shí),新hp_owner實(shí)例就被創(chuàng)建。這個(gè)實(shí)例的構(gòu)造函數(shù)⑥,會(huì)通過查詢“所有者/指針”表,尋找沒有所有者的記錄。其用compare_exchange_strong()來檢查某個(gè)記錄是否有所有者,并進(jìn)行析構(gòu)②。當(dāng)compare_exchange_strong()失敗,其他線程的擁有這個(gè)記錄,所以可以繼續(xù)執(zhí)行下去。當(dāng)交換成功,當(dāng)前線程就擁有了這條記錄,而后對(duì)其進(jìn)行存儲(chǔ),并停止搜索⑦。當(dāng)遍歷了列表也沒有找到物所有權(quán)的記錄①,就說明有很多線程在使用風(fēng)險(xiǎn)指針,所以這里將拋出一個(gè)異常。
一旦hp_owner實(shí)例被一個(gè)給定的線程所創(chuàng)建,那么之后的訪問將會(huì)很快,因?yàn)橹羔樤诰彺嬷校员聿恍枰俅伪闅v。
當(dāng)線程退出時(shí),hp_owner的實(shí)例將會(huì)被銷毀。析構(gòu)函數(shù)會(huì)在std::thread::id()設(shè)置擁有者ID前,將指針重置為nullptr,這樣就允許其他線程對(duì)這條記錄進(jìn)行復(fù)用⑧⑨。
實(shí)現(xiàn)get_hazard_pointer_for_current_thread()后,outstanding_hazard_pointer_for()實(shí)現(xiàn)就簡(jiǎn)單了:只需要對(duì)風(fēng)險(xiǎn)指針表進(jìn)行搜索,就可以找到對(duì)應(yīng)記錄。
bool outstanding_hazard_pointers_for(void* p)
{
for(unsigned i=0;i<max_hazard_pointers;++i)
{
if(hazard_pointers[i].pointer.load()==p)
{
return true;
}
}
return false;
}
實(shí)現(xiàn)都不需要對(duì)記錄的所有者進(jìn)行驗(yàn)證:沒有所有者的記錄會(huì)是一個(gè)空指針,所以比較代碼將總返回false,通過這種方式將代碼簡(jiǎn)化。
reclaim_later()和delete_nodes_with_no_hazards()可以對(duì)簡(jiǎn)單的鏈表進(jìn)行操作;reclaim_later()只是將節(jié)點(diǎn)添加到列表中,delete_nodes_with_no_hazards()就是搜索整個(gè)列表,并將無風(fēng)險(xiǎn)指針的記錄進(jìn)行刪除。下面將展示它們的具體實(shí)現(xiàn)。
清單7.8 回收函數(shù)的簡(jiǎn)單實(shí)現(xiàn)
template<typename T>
void do_delete(void* p)
{
delete static_cast<T*>(p);
}
struct data_to_reclaim
{
void* data;
std::function<void(void*)> deleter;
data_to_reclaim* next;
template<typename T>
data_to_reclaim(T* p): // 1
data(p),
deleter(&do_delete<T>),
next(0)
{}
~data_to_reclaim()
{
deleter(data); // 2
}
};
std::atomic<data_to_reclaim*> nodes_to_reclaim;
void add_to_reclaim_list(data_to_reclaim* node) // 3
{
node->next=nodes_to_reclaim.load();
while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}
template<typename T>
void reclaim_later(T* data) // 4
{
add_to_reclaim_list(new data_to_reclaim(data)); // 5
}
void delete_nodes_with_no_hazards()
{
data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr); // 6
while(current)
{
data_to_reclaim* const next=current->next;
if(!outstanding_hazard_pointers_for(current->data)) // 7
{
delete current; // 8
}
else
{
add_to_reclaim_list(current); // 9
}
current=next;
}
}
首先,reclaim_later()是一個(gè)函數(shù)模板④。因?yàn)轱L(fēng)險(xiǎn)指針是一個(gè)通用解決方案,所以這里就不能將棧節(jié)點(diǎn)的類型寫死。使用std::atomic<void*>對(duì)風(fēng)險(xiǎn)指針進(jìn)行存儲(chǔ)。需要對(duì)任意類型的指針進(jìn)行處理,不過不能使用void*形式,因?yàn)楫?dāng)要?jiǎng)h除數(shù)據(jù)項(xiàng)時(shí),delete操作只能對(duì)實(shí)際類型指針進(jìn)行操作。data_to_reclaim的構(gòu)造函數(shù)處理的就很優(yōu)雅:reclaim_later()只是為指針創(chuàng)建一個(gè)data_to_reclaim的實(shí)例,并且將實(shí)例添加到回收鏈表中⑤。add_to_reclaim_list()③就是使用compare_exchange_weak()循環(huán)來訪問鏈表頭(就如你之前看到的那樣)。
當(dāng)將節(jié)點(diǎn)添加入鏈表時(shí),data_to_reclaim的析構(gòu)函數(shù)不會(huì)被調(diào)用;析構(gòu)函數(shù)會(huì)在沒有風(fēng)險(xiǎn)指針指向節(jié)點(diǎn)的時(shí)候調(diào)用,這也就是delete_nodes_with_no_hazards()的作用。
delete_nodes_with_no_hazards()將已聲明的鏈表節(jié)點(diǎn)進(jìn)行回收,使用的是exchange()函數(shù)⑥(這個(gè)步驟簡(jiǎn)單且關(guān)鍵,是為了保證只有一個(gè)線程回收這些節(jié)點(diǎn))。這樣,其他線程就能自由將節(jié)點(diǎn)添加到鏈表中,或在不影響回收指定節(jié)點(diǎn)線程的情況下,對(duì)節(jié)點(diǎn)進(jìn)行回收。
只要有節(jié)點(diǎn)存在于鏈表中,就需要檢查每個(gè)節(jié)點(diǎn),查看節(jié)點(diǎn)是否被風(fēng)險(xiǎn)指針?biāo)赶颌摺H绻麤]有風(fēng)險(xiǎn)指針,那么就可以安全的將記錄刪除(并且清除存儲(chǔ)的數(shù)據(jù))⑧。否則,就只能將這個(gè)節(jié)點(diǎn)添加到鏈表的后面,再進(jìn)行回收⑨。
雖然這個(gè)實(shí)現(xiàn)很簡(jiǎn)單,也的確安全的回收了被刪除的節(jié)點(diǎn),不過這個(gè)過程增加了很多開銷。遍歷風(fēng)險(xiǎn)指針數(shù)組需要檢查max_hazard_pointers原子變量,并且每次pop()調(diào)用時(shí),都需要再檢查一遍。原子操作很耗時(shí)——在臺(tái)式CPU上,100次原子操作要比100次非原子操作慢——所以,這里pop()成為了性能瓶頸。這種方式,不僅需要遍歷節(jié)點(diǎn)的風(fēng)險(xiǎn)指針鏈表,還要遍歷等待鏈表上的每一個(gè)節(jié)點(diǎn)。顯然,這種方式很糟糕。當(dāng)有max_hazard_pointers在鏈表中,那么就需要檢查max_hazard_pointers多個(gè)已存儲(chǔ)的風(fēng)險(xiǎn)指針。我去!還有更好一點(diǎn)的方法嗎?
對(duì)風(fēng)險(xiǎn)指針(較好)的回收策略
當(dāng)然有更好的辦法。這里只展示一個(gè)風(fēng)險(xiǎn)指針的簡(jiǎn)單實(shí)現(xiàn),來幫助解釋技術(shù)問題。首先,要考慮的是內(nèi)存性能。比起對(duì)回收鏈表上的每個(gè)節(jié)點(diǎn)進(jìn)行檢查都要調(diào)用pop(),除非有超過max_hazard_pointer數(shù)量的節(jié)點(diǎn)存在于鏈表之上,要不就不需要嘗試回收任何節(jié)點(diǎn)。這樣就能保證至少有一個(gè)節(jié)點(diǎn)能夠回收,如果只是等待鏈表中的節(jié)點(diǎn)數(shù)量達(dá)到max_hazard_pointers+1,那比之前的方案也沒好到哪里去。當(dāng)獲取了max_hazard_pointers數(shù)量的節(jié)點(diǎn)時(shí),可以調(diào)用pop()對(duì)節(jié)點(diǎn)進(jìn)行回收,所以這樣也不是很好。不過,當(dāng)有2max_hazard_pointers個(gè)節(jié)點(diǎn)在列表中時(shí),就能保證至少有max_hazard_pointers可以被回收,在再次嘗試回收任意節(jié)點(diǎn)前,至少會(huì)對(duì)pop()有max_hazard_pointers次調(diào)用。這就很不錯(cuò)了。比起檢查max_hazard_pointers個(gè)節(jié)點(diǎn)就調(diào)用max_hazard_pointers次pop()(而且還不一定能回收節(jié)點(diǎn)),當(dāng)檢查2max_hazard_pointers個(gè)節(jié)點(diǎn)時(shí),每max_hazard_pointers次對(duì)pop()的調(diào)用,就會(huì)有max_hazard_pointers個(gè)節(jié)點(diǎn)能被回收。這就意味著,對(duì)兩個(gè)節(jié)點(diǎn)檢查調(diào)用pop(),其中就有一個(gè)節(jié)點(diǎn)能被回收。
這個(gè)方法有個(gè)缺點(diǎn)(有增加內(nèi)存使用的情況):就是得對(duì)回收鏈表上的節(jié)點(diǎn)進(jìn)行計(jì)數(shù),這就意味著要使用原子變量,并且還有很多線程爭(zhēng)相對(duì)回收鏈表進(jìn)行訪問。如果還有多余的內(nèi)存,可以增加內(nèi)存的使用來實(shí)現(xiàn)更好的回收策略:每個(gè)線程中的都擁有其自己的回收鏈表,作為線程的本地變量。這樣就不需要原子變量進(jìn)行計(jì)數(shù)了。這樣的話,就需要分配max_hazard_pointers x max_hazard_pointers個(gè)節(jié)點(diǎn)。所有節(jié)點(diǎn)被回收完畢前時(shí),有線程退出,那么其本地鏈表可以像之前一樣保存在全局中,并且添加到下一個(gè)線程的回收鏈表中,讓下一個(gè)線程對(duì)這些節(jié)點(diǎn)進(jìn)行回收。
風(fēng)險(xiǎn)指針另一個(gè)缺點(diǎn):與IBM申請(qǐng)的專利所沖突[2]。要讓寫的軟件在一個(gè)國(guó)家中使用,那么就必須擁有合法的知識(shí)產(chǎn)權(quán),所以需要擁有合適的許可證。這對(duì)所有無鎖的內(nèi)存回收技術(shù)都適用(這是一個(gè)活躍的研究領(lǐng)域),所以很多大公司都會(huì)有自己的專利。你可能會(huì)問,“為什么用了這么大的篇幅來介紹一個(gè)大多數(shù)人都沒辦法的技術(shù)呢?”,這公平性的問題。首先,使用這種技術(shù)可能不需要買一個(gè)許可證。比如,當(dāng)你使用GPL下的免費(fèi)軟件許可來進(jìn)行軟件開發(fā),那么你的軟件將會(huì)包含到IBM不主張聲明中。其次,也是很重要的,在設(shè)計(jì)無鎖代碼的時(shí)候,還需要從使用的技術(shù)角度進(jìn)行思考,比如,高消耗的原子操作。
所以,是否有非專利的內(nèi)存回收技術(shù),且能被大多數(shù)人所使用呢?很幸運(yùn),的確有。引用計(jì)數(shù)就是這樣一種機(jī)制。
回到7.2.2節(jié)的問題,“想要?jiǎng)h除節(jié)點(diǎn)還能被其他讀者線程訪問,應(yīng)該怎么辦?"。當(dāng)能安全并精確的識(shí)別,節(jié)點(diǎn)是否還被引用,以及沒有線程訪問這些節(jié)點(diǎn)的具體時(shí)間,以便將對(duì)應(yīng)節(jié)點(diǎn)進(jìn)行刪除。風(fēng)險(xiǎn)指針是通過將使用中的節(jié)點(diǎn)存放到鏈表中,解決問題。而引用計(jì)數(shù)是通過對(duì)每個(gè)節(jié)點(diǎn)上訪問的線程數(shù)量進(jìn)行統(tǒng)計(jì),解決問題。
看起來簡(jiǎn)單粗暴……不,優(yōu)雅;實(shí)際上管理起來卻是很困難:首先,你會(huì)想到的就是由std::shared_ptr<>來完成這個(gè)任務(wù),其是有內(nèi)置引用計(jì)數(shù)的指針。不幸的是,雖然std::shared_ptr<>上的一些操作是原子的,不過其也不能保證是無鎖的。智能指針上的原子操作和對(duì)其他原子類型的操作并沒有什么不同,但是std::shared_ptr<>旨在用于有多個(gè)上下文的情況下,并且在無鎖結(jié)構(gòu)中使用原子操作,無異于對(duì)該類增加了很多性能開銷。如果平臺(tái)支持std::atomic_is_lock_free(&some_shared_ptr)實(shí)現(xiàn)返回true,那么所有內(nèi)存回收問題就都迎刃而解了。使用std::shared_ptr<node>構(gòu)成的鏈表實(shí)現(xiàn),如下所示:
清單7.9 無鎖?!褂脽o鎖std::shared_ptr<>的實(shí)現(xiàn)
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
std::shared_ptr<node> next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::shared_ptr<node> head;
public:
void push(T const& data)
{
std::shared_ptr<node> const new_node=std::make_shared<node>(data);
new_node->next=head.load();
while(!std::atomic_compare_exchange_weak(&head,
&new_node->next,new_node));
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> old_head=std::atomic_load(&head);
while(old_head && !std::atomic_compare_exchange_weak(&head,
&old_head,old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>();
}
};
在一些情況下,使用std::shared_ptr<>實(shí)現(xiàn)的結(jié)構(gòu)并非無鎖,這就需要手動(dòng)管理引用計(jì)數(shù)。
一種方式是對(duì)每個(gè)節(jié)點(diǎn)使用兩個(gè)引用計(jì)數(shù):內(nèi)部計(jì)數(shù)和外部計(jì)數(shù)。兩個(gè)值的總和就是對(duì)這個(gè)節(jié)點(diǎn)的引用數(shù)。外部計(jì)數(shù)記錄有多少指針指向節(jié)點(diǎn),即在指針每次進(jìn)行讀取的時(shí)候,外部計(jì)數(shù)加一。當(dāng)線程結(jié)束對(duì)節(jié)點(diǎn)的訪問時(shí),內(nèi)部計(jì)數(shù)減一。指針在讀取時(shí),外部計(jì)數(shù)加一;在讀取結(jié)束時(shí),內(nèi)部計(jì)數(shù)減一。
當(dāng)不需要“外部計(jì)數(shù)-指針”對(duì)時(shí)(該節(jié)點(diǎn)就不能被多線程所訪問了),在外部計(jì)數(shù)減一和在被棄用的時(shí)候,內(nèi)部計(jì)數(shù)將會(huì)增加。當(dāng)內(nèi)部計(jì)數(shù)等于0,那么就沒有指針對(duì)該節(jié)點(diǎn)進(jìn)行引用,就可以將該節(jié)點(diǎn)安全的刪除。使用原子操作來更新共享數(shù)據(jù)也很重要?,F(xiàn)在,就讓我們來看一下使用這種技術(shù)實(shí)現(xiàn)的無鎖棧,只有確定節(jié)點(diǎn)能安全刪除的情況下,才能進(jìn)行節(jié)點(diǎn)回收。
下面程序清單中就展示了內(nèi)部數(shù)據(jù)結(jié)構(gòu),以及對(duì)push()簡(jiǎn)單優(yōu)雅的實(shí)現(xiàn)。
清單7.10 使用分離引用計(jì)數(shù)的方式推送一個(gè)節(jié)點(diǎn)到無鎖棧中
template<typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr // 1
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count; // 2
counted_node_ptr next; // 3
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head; // 4
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data) // 5
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load();
while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
}
};
外部計(jì)數(shù)包含在counted_node_ptr的指針中①,且這個(gè)結(jié)構(gòu)體會(huì)被node中的next指針③和內(nèi)部計(jì)數(shù)②用到。counted_node_ptr是一個(gè)簡(jiǎn)單的結(jié)構(gòu)體,所以可以使用特化std::atomic<>模板來對(duì)鏈表的頭指針進(jìn)行聲明④。
且counted_node_ptr體積夠小,能夠讓std::atomic<counted_node_ptr>無鎖。在一些平臺(tái)上支持雙字比較和交換操作,可以直接對(duì)結(jié)構(gòu)體進(jìn)行操作。當(dāng)你的平臺(tái)不支持這樣的操作時(shí),最好使用std::shared_ptr<>變量(如清單7.9那樣);當(dāng)類型的體積過大,超出了平臺(tái)支持指令,那么原子std::atomic<>將使用鎖來保證其操作的原子性(從而會(huì)讓你的“無鎖”算法“基于鎖”來完成)。另外,如果想要限制計(jì)數(shù)器的大小,需要已知平臺(tái)上指針?biāo)嫉目臻g(比如,地址空間只剩下48位,而一個(gè)指針就要占64位),可以將計(jì)數(shù)存在一個(gè)指針空間內(nèi),不過為了適應(yīng)平臺(tái),也可以存在一個(gè)機(jī)器字當(dāng)中。這樣的技巧需要對(duì)特定系統(tǒng)有足夠的了解,當(dāng)然已經(jīng)超出本書討論的范圍。
push()相對(duì)簡(jiǎn)單⑤,可以構(gòu)造一個(gè)counted_node_ptr實(shí)例,去引用新分配出來的(帶有相關(guān)數(shù)據(jù)的)node,并且將node中的next指針設(shè)置為當(dāng)前head。之后使用compare_exchange_weak()對(duì)head的值進(jìn)行設(shè)置,就像之前代碼清單中所示。因?yàn)閕nternal_count剛被設(shè)置,所以其值為0,并且external_count是1。因?yàn)檫@是一個(gè)新節(jié)點(diǎn),那么這個(gè)節(jié)點(diǎn)只有一個(gè)外部引用(head指針)。
通常,pop()都有一個(gè)從繁到簡(jiǎn)的過程,實(shí)現(xiàn)代碼如下。
清單7.11 使用分離引用計(jì)數(shù)從無鎖棧中彈出一個(gè)節(jié)點(diǎn)
template<typename T>
class lock_free_stack
{
private:
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter)); // 1
old_counter.external_count=new_counter.external_count;
}
public:
std::shared_ptr<T> pop()
{
counted_node_ptr old_head=head.load();
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr; // 2
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next)) // 3
{
std::shared_ptr<T> res;
res.swap(ptr->data); // 4
int const count_increase=old_head.external_count-2; // 5
if(ptr->internal_count.fetch_add(count_increase)== // 6
-count_increase)
{
delete ptr;
}
return res; // 7
}
else if(ptr->internal_count.fetch_sub(1)==1)
{
delete ptr; // 8
}
}
}
};
當(dāng)加載head的值之后,就必須將外部引用加一,是為了表明這個(gè)節(jié)點(diǎn)正在引用,并且保證在解引用時(shí)的安全性。在引用計(jì)數(shù)增加前解引用指針,那么就會(huì)有線程能夠訪問這個(gè)節(jié)點(diǎn),從而當(dāng)前引用指針就成為了一個(gè)懸空指針。這就是將引用計(jì)數(shù)分離的主要原因:通過增加外部引用計(jì)數(shù),保證指針在訪問期間的合法性。在compare_exchange_strong()的循環(huán)中①完成增加,通過比較和設(shè)置整個(gè)結(jié)構(gòu)體來保證指針不會(huì)在同一時(shí)間內(nèi)被其他線程修改。
當(dāng)計(jì)數(shù)增加,就能安全的解引用ptr,并讀取head指針的值,就能訪問指向的節(jié)點(diǎn)②。如果指針是空指針,那么將會(huì)訪問到鏈表的最后。當(dāng)指針不為空時(shí),就能嘗試對(duì)head調(diào)用compare_exchange_strong()來刪除這個(gè)節(jié)點(diǎn)③。
當(dāng)compare_exchange_strong()成功時(shí),就擁有對(duì)應(yīng)節(jié)點(diǎn)的所有權(quán),并且可以和data進(jìn)行交換④,然后返回。這樣數(shù)據(jù)就不會(huì)持續(xù)保存,因?yàn)槠渌€程也會(huì)對(duì)棧進(jìn)行訪問,所以會(huì)有其他指針指向這個(gè)節(jié)點(diǎn)。而后,可以使用原子操作fetch_add⑥,將外部計(jì)數(shù)加到內(nèi)部計(jì)數(shù)中去。如果現(xiàn)在引用計(jì)數(shù)為0,那么之前的值(fetch_add返回的值),在相加之前肯定是一個(gè)負(fù)數(shù),這種情況下就可以將節(jié)點(diǎn)刪除。這里需要注意的是,相加的值要比外部引用計(jì)數(shù)少2⑤;當(dāng)節(jié)點(diǎn)已經(jīng)從鏈表中刪除,就要減少一次計(jì)數(shù),并且這個(gè)線程無法再次訪問指定節(jié)點(diǎn),所以還要再減一。無論節(jié)點(diǎn)是否被刪除,都能完成操作,所以可以將獲取的數(shù)據(jù)進(jìn)行返回⑦。
當(dāng)“比較/交換”③失敗,就說明其他線程在之前把對(duì)應(yīng)節(jié)點(diǎn)刪除了,或者其他線程添加了一個(gè)新的節(jié)點(diǎn)到棧中。無論是哪種原因,需要通過“比較/交換”的調(diào)用,對(duì)具有新值的head重新進(jìn)行操作。不過,首先需要減少節(jié)點(diǎn)(要?jiǎng)h除的節(jié)點(diǎn))上的引用計(jì)數(shù)。這個(gè)線程將再也沒有辦法訪問這個(gè)節(jié)點(diǎn)了。如果當(dāng)前線程是最后一個(gè)持有引用(因?yàn)槠渌€程已經(jīng)將這個(gè)節(jié)點(diǎn)從棧上刪除了)的線程,那么內(nèi)部引用計(jì)數(shù)將會(huì)為1,所以減一的操作將會(huì)讓計(jì)數(shù)器為0。這樣,你就能在循環(huán)⑧進(jìn)行之前將對(duì)應(yīng)節(jié)點(diǎn)刪除了。
目前,使用默認(rèn)std::memory_order_seq_cst內(nèi)存序來規(guī)定原子操作的執(zhí)行順序。在大多數(shù)系統(tǒng)中,這種操作方式都很耗時(shí),且同步操作的開銷要高于內(nèi)存序?,F(xiàn)在,就可以考慮對(duì)數(shù)據(jù)結(jié)構(gòu)的邏輯進(jìn)行修改,對(duì)數(shù)據(jù)結(jié)構(gòu)的部分放寬內(nèi)存序要求;就沒有必要在棧上增加過度的開銷了?,F(xiàn)在讓我們來檢查一下棧的操作,并且捫心自問,這里能對(duì)一些操作使用更加寬松的內(nèi)存序么?如果使用了,能確保同級(jí)安全嗎?
在修改內(nèi)存序之前,需要檢查一下操作之間的依賴關(guān)系。而后,再去確定適合這種需求關(guān)系的最小內(nèi)存序。為了保證這種方式能夠工作,需要在從線程的視角進(jìn)行觀察。其中最簡(jiǎn)單的視角就是,向棧中推入一個(gè)數(shù)據(jù)項(xiàng),之后讓其他線程從棧中彈出這個(gè)數(shù)據(jù)。
即使在簡(jiǎn)單的例子中,都需要三個(gè)重要的數(shù)據(jù)參與。1、counted_node_ptr轉(zhuǎn)移的數(shù)據(jù)head。2、head引用的node。3、節(jié)點(diǎn)所指向的數(shù)據(jù)項(xiàng)。
做push()的線程,會(huì)先構(gòu)造數(shù)據(jù)項(xiàng)和節(jié)點(diǎn),再設(shè)置head。做pop()的線程,會(huì)先加載head的值,再做在循環(huán)中對(duì)head做“比較/交換”操作,并增加引用計(jì)數(shù),再讀取對(duì)應(yīng)的node節(jié)點(diǎn),獲取next的指向的值,現(xiàn)在就可以看到一組需求關(guān)系。next的值是普通的非原子對(duì)象,所以為了保證讀取安全,這里必須確定存儲(chǔ)(推送線程)和加載(彈出線程)的先行關(guān)系。因?yàn)槲ㄒ坏脑硬僮骶褪莗ush()函數(shù)中的compare_exchange_weak(),這里需要釋放操作來獲取兩個(gè)線程間的先行關(guān)系,這里compare_exchange_weak()必須是std::memory_order_release或更嚴(yán)格的內(nèi)存序。當(dāng)compare_exchange_weak()調(diào)用失敗,什么都不會(huì)改變,并且可以持續(xù)循環(huán)下去,所以使用std::memory_order_relaxed就足夠了。
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed)
while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
std::memory_order_release,std::memory_order_relaxed));
}
那pop()的實(shí)現(xiàn)呢?為了確定先行關(guān)系,必須在訪問next值之前使用std::memory_order_acquire或更嚴(yán)格內(nèi)存序的操作。因?yàn)?,在increase_head_count()中使用compare_exchange_strong()就獲取next指針指向的舊值,所以想要其獲取成功就需要確定內(nèi)存序。如同調(diào)用push()那樣,當(dāng)交換失敗,循環(huán)會(huì)繼續(xù),所以在失敗的時(shí)候使用松散的內(nèi)存序:
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
當(dāng)compare_exchange_strong()調(diào)用成功,那么ptr中的值就被存到old_counter中。存儲(chǔ)操作是push()中的一個(gè)釋放操作,并且compare_exchange_strong()操作是一個(gè)獲取操作,現(xiàn)在存儲(chǔ)同步于加載,并且能夠獲取先行關(guān)系。因此,在push()中存儲(chǔ)ptr的值,要先行于在pop()中對(duì)ptr->next的訪問?,F(xiàn)在的操作就安全了。
要注意的是,內(nèi)存序?qū)ead.load()的初始化并不妨礙分析,所以現(xiàn)在就可以使用std::memory_order_relaxed。
接下來,compare_exchange_strong()將old_head.ptr->next設(shè)置為head。是否需要做什么來保證操作線程中的數(shù)據(jù)完整性呢?當(dāng)交換成功,你就能訪問ptr->data,所以這里需要保證在push()線程中已經(jīng)對(duì)ptr->data進(jìn)行了存儲(chǔ)(在加載之前)。在increase_head_count()中的獲取操作,能保證與push()線程中的存儲(chǔ)和“比較/交換”同步。這里的先行關(guān)系是:在push()線程中存儲(chǔ)數(shù)據(jù),先行于存儲(chǔ)head指針;調(diào)用increase_head_count()先行于對(duì)ptr->data的加載。即使,pop()中的“比較/交換”操作使用std::memory_order_relaxed,這些操作還是能正常運(yùn)行。唯一不同的地方就是,調(diào)用swap()讓ptr->data有所變化,且沒有其他線程可以對(duì)同一節(jié)點(diǎn)進(jìn)行操作(這就是“比較/交換”操作的作用)。
當(dāng)compare_exchange_strong()失敗,那么新值就不會(huì)去更新old_head,繼續(xù)循環(huán)。這里,已確定在increase_head_count()中使用std::memory_order_acquire內(nèi)存序的可行性,所以這里使用std::memory_order_relaxed也可以。
其他線程呢?是否需要設(shè)置一些更為嚴(yán)格的內(nèi)存序來保證其他線程的安全呢?回答是“不用”。因?yàn)?,head只會(huì)因“比較/交換”操作有所改變;對(duì)于“讀-改-寫”操作來說,push()中的“比較/交換”操作是構(gòu)成釋放序列的一部分。因此,即使有很多線程在同一時(shí)間對(duì)head進(jìn)行修改,push()中的compare_exchange_weak()與increase_head_count()(讀取已存儲(chǔ)的值)中的compare_exchange_strong()也是同步的。
剩余操作就可以用來處理fetch_add()操作(用來改變引用計(jì)數(shù)的操作),因?yàn)橐阎渌€程不可能對(duì)該節(jié)點(diǎn)的數(shù)據(jù)進(jìn)行修改,所以從節(jié)點(diǎn)中返回?cái)?shù)據(jù)的線程可以繼續(xù)執(zhí)行。不過,當(dāng)線程獲取其他線程修改后的值時(shí),就代表操作失敗(swap()是用來提取數(shù)據(jù)項(xiàng)的引用)。那么,為了避免數(shù)據(jù)競(jìng)爭(zhēng),需要保證swap()先行于delete操作。一種簡(jiǎn)單的解決辦法,在“成功返回”分支中對(duì)fetch_add()使用std::memory_order_release內(nèi)存序,在“再次循環(huán)”分支中對(duì)fetch_add()使用std::memory_order_qcquire內(nèi)存序。不過,這就有點(diǎn)矯枉過正:只有一個(gè)線程做delete操作(將引用計(jì)數(shù)設(shè)置為0的線程),所以只有這個(gè)線程需要獲取操作。幸運(yùn)的是,因?yàn)閒etch_add()是一個(gè)“讀-改-寫”操作,是釋放序列的一部分,所以可以使用一個(gè)額外的load()做獲取。當(dāng)“再次循環(huán)”分支將引用計(jì)數(shù)減為0時(shí),fetch_add()可以重載引用計(jì)數(shù),這里使用std::memory_order_acquire為了保持需求的同步關(guān)系;并且,fetch_add()本身可以使用std::memory_order_relaxed。使用新pop()的棧實(shí)現(xiàn)如下。
清單7.12 基于引用計(jì)數(shù)和松散原子操作的無鎖棧
template<typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed)
while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
std::shared_ptr<T> pop()
{
counted_node_ptr old_head=
head.load(std::memory_order_relaxed);
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr;
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next,
std::memory_order_relaxed))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase=old_head.external_count-2;
if(ptr->internal_count.fetch_add(count_increase,
std::memory_order_release)==-count_increase)
{
delete ptr;
}
return res;
}
else if(ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed)==1)
{
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};
這是一種鍛煉,不過鍛煉要告一段落了,我們已經(jīng)獲得比之前好很多的棧實(shí)現(xiàn)。在深思熟慮后,通過使用更多的松散操作,在不影響并發(fā)性的同時(shí)提高性能。實(shí)現(xiàn)中的pop()有37行,而功能等同于清單6.1中的那7行的基于鎖的棧實(shí)現(xiàn),和清單7.2中無內(nèi)存管理的無鎖棧實(shí)現(xiàn)。對(duì)于接下來要設(shè)計(jì)的無鎖隊(duì)列,將看到類似的情況:無鎖結(jié)構(gòu)的復(fù)雜性,主要在于內(nèi)存的管理。
隊(duì)列的提供的挑戰(zhàn)與棧的有些不同,因?yàn)閜ush()和pop()在隊(duì)列中,操作的不是同一個(gè)地方。因此,同步的需求就不一樣了。需要保證對(duì)一端的修改是正確的,且對(duì)另一端是可見的。不過,在清單6.6中隊(duì)列有一個(gè)try_pop()成員函數(shù),其作用和清單7.2中簡(jiǎn)單的無鎖棧的pop()功能差不多,那么就可以合理的假設(shè)無鎖代碼都很相似。這是為什么呢?
如果將清單6.6中的代碼作為基礎(chǔ),就需要兩個(gè)node指針:head和tail??梢宰尪嗑€程對(duì)它們進(jìn)行訪問,所以這兩個(gè)節(jié)點(diǎn)最好是原子的,這樣就不用考慮互斥問題了。讓我們對(duì)清單6.6中的代碼做一些修改,并且看一下應(yīng)該從哪里開始設(shè)計(jì)。先來看一下下面的代碼。
清單7.13 單生產(chǎn)者/單消費(fèi)者模型下的無鎖隊(duì)列
template<typename T>
class lock_free_queue
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;
node():
next(nullptr)
{}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* pop_head()
{
node* const old_head=head.load();
if(old_head==tail.load()) // 1
{
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node),tail(head.load())
{}
lock_free_queue(const lock_free_queue& other)=delete;
lock_free_queue& operator=(const lock_free_queue& other)=delete;
~lock_free_queue()
{
while(node* const old_head=head.load())
{
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop()
{
node* old_head=pop_head();
if(!old_head)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data); // 2
delete old_head;
return res;
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p=new node; // 3
node* const old_tail=tail.load(); // 4
old_tail->data.swap(new_data); // 5
old_tail->next=p; // 6
tail.store(p); // 7
}
};
一眼望去,這個(gè)實(shí)現(xiàn)并沒什么不好,當(dāng)只有一個(gè)線程調(diào)用一次push(),且只有一個(gè)線程調(diào)用pop()。在這種情況下,隊(duì)列完美工作。push()和pop()之間的先行關(guān)系就很重要了,這直接關(guān)系到獲取到的data。對(duì)tail的存儲(chǔ)⑦同步于對(duì)tail的加載①;存儲(chǔ)之前節(jié)點(diǎn)的data指針⑤先行于存儲(chǔ)tail;并且,加載tail先行于加載data指針②,所以對(duì)data的存儲(chǔ)要先行于加載,一切都沒問題。因此,這是一個(gè)完美的單生產(chǎn)者,單消費(fèi)者(SPSC, single-producer, single-consume)隊(duì)列。
問題在于當(dāng)多線程對(duì)push()或pop()并發(fā)調(diào)用。先看一下push():如果有兩個(gè)線程并發(fā)調(diào)用push(),那么它們會(huì)新分配兩個(gè)節(jié)點(diǎn)作為虛擬節(jié)點(diǎn)③,也會(huì)讀取到相同的tail值④,因此也會(huì)同時(shí)修改同一個(gè)節(jié)點(diǎn),同時(shí)設(shè)置data和next指針⑤⑥。明顯的數(shù)據(jù)競(jìng)爭(zhēng)!
pop_head()函數(shù)也有類似的問題。當(dāng)有兩個(gè)線程并發(fā)的調(diào)用這個(gè)函數(shù)時(shí),這兩個(gè)線程就會(huì)讀取到同一個(gè)head中同樣的值,并且會(huì)同時(shí)通過next指針去復(fù)寫舊值。兩個(gè)線程現(xiàn)在都能索引到同一個(gè)節(jié)點(diǎn)——真是一場(chǎng)災(zāi)難!這里,不僅要保證只有一個(gè)pop()線程可以訪問給定項(xiàng),還要保證其他線程在讀取head指針時(shí),可以安全的訪問節(jié)點(diǎn)中的next。這就和無鎖棧中pop()的問題一樣了,那么就有很多解決方案可以在這里使用。
pop()的問題解決了,那么push()呢?問題在于為了獲取push()和pop()間的先行關(guān)系,就需要在為虛擬節(jié)點(diǎn)設(shè)置數(shù)據(jù)項(xiàng)前,更新tail指針。這就意味著,并發(fā)訪問push()時(shí),因?yàn)槊總€(gè)線程所讀取到的是同一個(gè)tail指針,所以線程會(huì)為同一個(gè)數(shù)據(jù)項(xiàng)進(jìn)行競(jìng)爭(zhēng)。
多線程下的push()
第一個(gè)選擇是在兩個(gè)