基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設(shè)計,需要確保訪問線程持有鎖的時間最短。對于只有一個互斥量的數(shù)據(jù)結(jié)構(gòu)來說,這十分困難。需要保證數(shù)據(jù)不被鎖之外的操作所訪問到,并且還要保證不會在固有結(jié)構(gòu)上產(chǎn)生條件競爭(如第3章所述)。當你使用多個互斥量來保護數(shù)據(jù)結(jié)構(gòu)中不同的區(qū)域時,問題會暴露的更加明顯,當操作需要獲取多個互斥鎖時,就有可能產(chǎn)生死鎖。所以,在設(shè)計時,使用多個互斥量時需要格外小心。
在本節(jié)中,你將使用6.1.1節(jié)中的指導(dǎo)建議,來設(shè)計一些簡單的數(shù)據(jù)結(jié)構(gòu)——使用互斥量和鎖的方式來保護數(shù)據(jù)。每一個例子中,都是在保證數(shù)據(jù)結(jié)構(gòu)是線程安全的前提下,對數(shù)據(jù)結(jié)構(gòu)并發(fā)訪問的概率(機會)進行提高。
我們先來看看在第3章中棧的實現(xiàn),這個實現(xiàn)就是一個十分簡單的數(shù)據(jù)結(jié)構(gòu),它只使用了一個互斥量。但是,這個結(jié)構(gòu)是線程安全的嗎?它離真正的并發(fā)訪問又有多遠呢?
我們先把第3章中線程安全的棧拿過來看看:(這里試圖實現(xiàn)一個線程安全版的std:stack<>)
清單6.1 線程安全棧的類定義
#include <exception>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // 1
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 2
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top()))); // 3
data.pop(); // 4
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=std::move(data.top()); // 5
data.pop(); // 6
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
來看看指導(dǎo)意見是如何應(yīng)用的。
首先,互斥量m能保證基本的線程安全,那就是對每個成員函數(shù)進行加鎖保護。這就保證在同一時間內(nèi),只有一個線程可以訪問到數(shù)據(jù),所以能夠保證,數(shù)據(jù)結(jié)構(gòu)的“不變量”被破壞時,不會被其他線程看到。
其次,在empty()和pop()成員函數(shù)之間會存在潛在的競爭,不過代碼會在pop()函數(shù)上鎖時,顯式的查詢棧是否為空,所以這里的競爭是非惡性的。pop()通過對彈出值的直接返回,就可避免std::stack<>中top()和pop()兩成員函數(shù)之間的潛在競爭。
再次,這個類中也有一些異常源。對互斥量上鎖可能會拋出異常,因為上鎖操作是每個成員函數(shù)所做的第一個操作,所以這是極其罕見的(因為這意味這問題不在鎖上,就是在系統(tǒng)資源上)。因無數(shù)據(jù)修改,所以其是安全的。因解鎖一個互斥量是不會失敗的,所以段代碼很安全,并且使用std::lock_guard<>也能保證互斥量上鎖的狀態(tài)。
對data.push()①的調(diào)用可能會拋出一個異常,不是拷貝/移動數(shù)據(jù)值時,就是內(nèi)存不足的時候。不管是哪種,std::stack<>都能保證其實安全的,所以這里也沒有問題。
在第一個重載pop()中,代碼可能會拋出一個empty_stack的異常②,不過數(shù)據(jù)沒有被修改,所以其是安全的。對于res的創(chuàng)建③,也可能會拋出一個異常,這有兩方面的原因:對std::make_shared的調(diào)用,可能無法分配出足夠的內(nèi)存去創(chuàng)建新的對象,并且內(nèi)部數(shù)據(jù)需要對新對象進行引用;或者,在拷貝或移動構(gòu)造到新分配的內(nèi)存中返回時拋出異常。兩種情況下,c++運行庫和標準庫能確保這里不會出現(xiàn)內(nèi)存泄露,并且新創(chuàng)建的對象(如果有的話)都能被正確銷毀。因為沒有對棧進行任何修改,所以這里也不會有問題。當調(diào)用data.pop()④時,其能確保不拋出異常,并且返回結(jié)果,所以這個重載pop()函數(shù)“異常-安全”。
第二個重載pop()類似,除了在拷貝賦值或移動賦值的時候會拋出異常⑤,當構(gòu)造一個新對象和一個std::shared_ptr實例時都不會拋出異常。同樣,在調(diào)用data.pop()⑥(這個成員函數(shù)保證不會拋出異常)之前,依舊沒有對數(shù)據(jù)結(jié)構(gòu)進行修改,所以這個函數(shù)也為“異常-安全”。
最后,empty()也不會修改任何數(shù)據(jù),所以也是“異常-安全”函數(shù)。
當調(diào)用持有一個鎖的用戶代碼時,這里有兩個地方可能會產(chǎn)生死鎖:進行拷貝構(gòu)造或移動構(gòu)造(①,③)和在對數(shù)據(jù)項進行拷貝賦值或移動賦值操作⑤的時候;還有一個潛在死鎖的地方在于用戶定義的操作符new。當這些函數(shù),無論是以直接調(diào)用棧的成員函數(shù)的方式,還是在成員函數(shù)進行操作時,對已經(jīng)插入或刪除的數(shù)據(jù)進行操作的方式,對鎖進行獲取,都可能造成死鎖。不過,用戶要對棧負責(zé),當棧未對一個數(shù)據(jù)進行拷貝或分配時,用戶就不能想當然的將其添加到棧中。
所有成員函數(shù)都使用st::lock_guard<>來保護數(shù)據(jù),所以棧的成員函數(shù)能有“線程安全”的表現(xiàn)。當然,構(gòu)造與析構(gòu)函數(shù)不是“線程安全”的,不過這也不成問題,因為對實例的構(gòu)造與析構(gòu)只能有一次。調(diào)用一個不完全構(gòu)造對象或是已銷毀對象的成員函數(shù),無論在那種編程方式下,都不可取。所以,用戶就要保證在棧對象完成構(gòu)建前,其他線程無法對其進行訪問;并且,一定要保證在棧對象銷毀后,所有線程都要停止對其進行訪問。
即使在多線程情況下,并發(fā)的調(diào)用成員函數(shù)是安全的(因為使用鎖),也要保證在單線程的情況下,數(shù)據(jù)結(jié)構(gòu)做出正確反應(yīng)。序列化線程會隱性的限制程序性能,這就是棧爭議聲最大的地方:當一個線程在等待鎖時,它就會無所事事。同樣的,對于棧來說,等待添加元素也是沒有意義的,所以當一個線程需要等待時,其會定期檢查empty()或pop(),以及對empty_stack異常進行關(guān)注。這樣的現(xiàn)實會限制棧的實現(xiàn)的方式,在線程等待的時候,會浪費寶貴的資源去檢查數(shù)據(jù),或是要求用戶寫寫外部等待和提示代碼(例如,使用條件變量),這就使內(nèi)部鎖失去存在的意義——這就意味著資源的浪費。第4章中的隊列,就是一種使用條件內(nèi)部變量進行等待的數(shù)據(jù)結(jié)構(gòu),接下來我們就來了解一下。
第4章中的線程安全隊列,在清單6.2中重現(xiàn)一下。和使用仿std::stack<>建立的棧很像,這里隊列的建立也是參照了std::queue<>。不過,與標準容器的接口不同,我們要設(shè)計的是能在多線程下安全并發(fā)訪問的數(shù)據(jù)結(jié)構(gòu)。
清單6.2 使用條件變量實現(xiàn)的線程安全隊列
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one(); // 1
}
void wait_and_pop(T& value) // 2
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() // 3
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();}); // 4
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>(); // 5
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
除了在push()①中調(diào)用data_cond.notify_one(),以及wait_and_pop()②③,6.2中對隊列的實現(xiàn)與6.1中對棧的實現(xiàn)十分相近。兩個重載try_pop()除了在隊列為空時拋出異常,其他的與6.1中pop()函數(shù)完全一樣。不同的是,在6.1中對值的檢索會返回一個bool值,而在6.2中,當指針指向空值的時候會返回NULL指針⑤,這同樣也是實現(xiàn)棧的一個有效途徑。所以,即使排除掉wait_and_pop()函數(shù),之前對棧的分析依舊適用于這里。
wiat_and_pop()函數(shù)是等待隊列向棧進行輸入的一個解決方案;比起持續(xù)調(diào)用empty(),等待線程調(diào)用wait_and_pop()函數(shù)和數(shù)據(jù)結(jié)構(gòu)處理等待中的條件變量的方式要好很多。對于data_cond.wait()的調(diào)用,直到隊列中有一個元素的時候,才會返回,所以你就不用擔(dān)心會出現(xiàn)一個空隊列的情況了,還有,數(shù)據(jù)會一直被互斥鎖保護。因為不變量這里并未發(fā)生變化,所以函數(shù)不會添加新的條件競爭或是死鎖的可能。
異常安全在這里的會有一些變化,當不止一個線程等待對隊列進行推送操作是,只會有一個線程,因得到data_cond.notify_one(),而繼續(xù)工作著。但是,如果這個工作線程在wait_and_pop()中拋出一個異常,例如:構(gòu)造新的std::shared_ptr<>對象④時拋出異常,那么其他線程則會永世長眠。當這種情況是不可接受時,這里的調(diào)用就需要改成data_cond.notify_all(),這個函數(shù)將喚醒所有的工作線程,不過,當大多線程發(fā)現(xiàn)隊列依舊是空時,又會耗費很多資源讓線程重新進入睡眠狀態(tài)。第二種替代方案是,當有異常拋出的時候,讓wait_and_pop()函數(shù)調(diào)用notify_one(),從而讓個另一個線程可以去嘗試索引存儲的值。第三種替代方案就是,將std::shared_ptr<>的初始化過程移到push()中,并且存儲std::shared_ptr<>實例,而非直接使用數(shù)據(jù)的值。將std::shared_ptr<>拷貝到內(nèi)部std::queue<>中,就不會拋出異常了,這樣wait_and_pop()又是安全的了。下面的程序清單,就是根據(jù)第三種方案進行修改的。
清單6.3 持有std::shared_ptr<>實例的線程安全隊列
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(*data_queue.front()); // 1
data_queue.pop();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(*data_queue.front()); // 2
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res=data_queue.front(); // 3
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res=data_queue.front(); // 4
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value))); // 5
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
為讓std::shared_ptr<>持有數(shù)據(jù)的結(jié)果顯而易見:彈出函數(shù)會持有一個變量的引用,為了接收這個新值,必須對存儲的指針進行解引用①,②;并且,在返回到調(diào)用函數(shù)前,彈出函數(shù)都會返回一個std::shared_ptr<>實例,這里實例可以在隊列中做檢索③,④。
std::shared_ptr<>持有數(shù)據(jù)的好處:新的實例分配結(jié)束時,不會被鎖在push()⑤當中(而在清單6.2中,只能在pop()持有鎖時完成)。因為內(nèi)存分配操作的需要在性能上付出很高的代價(性能較低),所以使用std::shared_ptr<>的方式對隊列的性能有很大的提升,其減少了互斥量持有的時間,允許其他線程在分配內(nèi)存的同時,對隊列進行其他的操作。
如同棧的例子,使用互斥量保護整個數(shù)據(jù)結(jié)構(gòu),不過會限制隊列對并發(fā)的支持;雖然,多線程可能被隊列中的各種成員函數(shù)所阻塞,但是仍有一個線程能在任意時間內(nèi)進行工作。不過,這種限制的部分來源是因為在實現(xiàn)中使用了std::queue<>;因為使用標準容器的原因,數(shù)據(jù)處于保護中。要對數(shù)據(jù)結(jié)構(gòu)實現(xiàn)進行具體的控制,需要提供更多細粒度鎖,來完成更高級的并發(fā)。
在清單6.2和6.3中,使用一個互斥量對一個數(shù)據(jù)隊列(data_queue)進行保護。為了使用細粒度鎖,需要看一下隊列內(nèi)部的組成結(jié)構(gòu),并且將一個互斥量與每個數(shù)據(jù)相關(guān)聯(lián)。
對于隊列來說,最簡單的數(shù)據(jù)結(jié)構(gòu)就是單鏈表了,就如圖6.1那樣。隊列里包含一個頭指針,其指向鏈表中的第一個元素,并且每一個元素都會指向下一個元素。從隊列中刪除數(shù)據(jù),其實就是將頭指針指向下一個元素,并將之前頭指針指向的值進行返回。
向隊列中添加元素是要從結(jié)尾進行的。為了做到這點,隊列里還有一個尾指針,其指向鏈表中的最后一個元素。新節(jié)點的加入將會改變尾指針的next指針,之前最后一個元素將會指向新添加進來的元素,新添加進來的元素的next將會使新的尾指針。當鏈表為空時,頭/尾指針皆為NULL。
http://wiki.jikexueyuan.com/project/cplusplus-concurrency-action/images/chapter6/6-1.png" alt="" />
圖6.1 用單鏈表表示的隊列
下面的清單中的代碼,是一個簡單隊列的實現(xiàn),基于清單6.2代碼的精簡版本;因為這個隊列僅供單線程使用,所以這實現(xiàn)中只有一個try_pop()函數(shù);并且,沒有wait_and_pop()函數(shù)。
清單6.4 隊列實現(xiàn)——單線程版
template<typename T>
class queue
{
private:
struct node
{
T data;
std::unique_ptr<node> next;
node(T data_):
data(std::move(data_))
{}
};
std::unique_ptr<node> head; // 1
node* tail; // 2
public:
queue()
{}
queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;
std::shared_ptr<T> try_pop()
{
if(!head)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(head->data)));
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next); // 3
return res;
}
void push(T new_value)
{
std::unique_ptr<node> p(new node(std::move(new_value)));
node* const new_tail=p.get();
if(tail)
{
tail->next=std::move(p); // 4
}
else
{
head=std::move(p); // 5
}
tail=new_tail; // 6
}
};
首先,注意在清單呢6.4中使用了std::unique_ptr<node>來管理節(jié)點,因為其能保證節(jié)點(其引用數(shù)據(jù)的值)在刪除時候,不需要使用delete操作顯式刪除。這樣的關(guān)系鏈表,管理著從頭結(jié)點到尾節(jié)點的每一個原始指針。
雖然,這種實現(xiàn)對于單線程來說沒什么問題,但是,當你在多線程情況下,嘗試使用細粒度鎖時,就會出現(xiàn)問題。因為在給定的實現(xiàn)中有兩個數(shù)據(jù)項(head①和tail②);即使,使用兩個互斥量,來保護頭指針和尾指針,也會出現(xiàn)問題。
顯而易見的問題就是push()可以同時修改頭指針⑤和尾指針⑥,所以push()函數(shù)會同時獲取兩個互斥量。雖然會將兩個互斥量都上鎖,但這還不是太糟糕的問題。糟糕的問題是push()和pop()都能訪問next指針指向的節(jié)點:push()可更新tail->next④,而后try_pop()讀取read->next③。當隊列中只有一個元素時,head==tail,所以head->next和tail->next是同一個對象,并且這個對象需要保護。不過,“在同一個對象在未被head和tail同時訪問時,push()和try_pop()鎖住的是同一個鎖”,就不對了。所以,你就沒有比之間實現(xiàn)更好的選擇了。這里會“柳暗花明又一村”嗎?
通過分離數(shù)據(jù)實現(xiàn)并發(fā)
你可以使用“預(yù)分配一個虛擬節(jié)點(無數(shù)據(jù)),確保這個節(jié)點永遠在隊列的最后,用來分離頭尾指針能訪問的節(jié)點”的辦法,走出這個困境。對于一個空隊列來說,head和tail都屬于虛擬指針,而非空指針。這個辦法挺好,因為當隊列為空時,try_pop()不能訪問head->next了。當添加一個節(jié)點入隊列時(這時有真實節(jié)點了),head和tail現(xiàn)在指向不同的節(jié)點,所以就不會在head->next和tail->next上產(chǎn)生競爭。這里的缺點是,你必須額外添加一個間接層次的指針數(shù)據(jù),來做虛擬節(jié)點。下面的代碼描述了這個方案如何實現(xiàn)。
清單6.5 帶有虛擬節(jié)點的隊列
template<typename T>
class queue
{
private:
struct node
{
std::shared_ptr<T> data; // 1
std::unique_ptr<node> next;
};
std::unique_ptr<node> head;
node* tail;
public:
queue():
head(new node),tail(head.get()) // 2
{}
queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;
std::shared_ptr<T> try_pop()
{
if(head.get()==tail) // 3
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(head->data); // 4
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next); // 5
return res; // 6
}
void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value))); // 7
std::unique_ptr<node> p(new node); //8
tail->data=new_data; // 9
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
};
try_pop()不需要太多的修改。首先,你可以拿head和tail③進行比較,這就要比檢查指針是否為空的好,因為虛擬節(jié)點意味著head不可能是空指針。head是一個std::unique_ptr<node>對象,你需要使用head.get()來做比較。其次,因為node現(xiàn)在存在數(shù)據(jù)指針中①,你就可以對指針進行直接檢索④,而非構(gòu)造一個T類型的新實例。push()函數(shù)改動最大:首先,你必須在堆上創(chuàng)建一個T類型的實例,并且讓其與一個std::shared_ptr<>對象相關(guān)聯(lián)⑦(節(jié)點使用std::make_shared就是為了避免內(nèi)存二次分配,避免增加引用次數(shù))。創(chuàng)建的新節(jié)點就成為了虛擬節(jié)點,所以你不需要為new_value提供構(gòu)造函數(shù)⑧。反而這里你需要將new_value的副本賦給之前的虛擬節(jié)點⑨。最終,為了讓虛擬節(jié)點存在在隊列中,你不得不使用構(gòu)造函數(shù)來創(chuàng)建它②。
那么現(xiàn)在,我確信你會對如何對如何修改隊列,讓其變成一個線程安全的隊列感到驚訝。好吧,現(xiàn)在的push()只能訪問tail,而不能訪問head,這就是一個進步try_pop()可以訪問head和tail,但是tail只需在最初進行比較,所以所存在的時間很短。重大的提升在于,虛擬節(jié)點意味著try_pop()和push()不能對同一節(jié)點進行操作,所以這里已經(jīng)不再需要互斥了。那么,你只需要使用一個互斥量來保護head和tail就夠了。那么,現(xiàn)在應(yīng)該鎖哪里?
我們的目的是為了最大程度的并發(fā)化,所以你需要上鎖的時間,要盡可能的小。push()很簡單:互斥量需要對tail的訪問進行上鎖,這就意味著你需要對每一個新分配的節(jié)點進行上鎖⑧,還有在你對當前尾節(jié)點進行賦值的時候⑨也需要上鎖。鎖需要持續(xù)到函數(shù)結(jié)束時才能解開。
try_pop()就不簡單了。首先,你需要使用互斥量鎖住head,一直到head彈出。實際上,互斥量決定了哪一個線程來進行彈出操作。一旦head被改變⑤,你才能解鎖互斥量;當在返回結(jié)果時,互斥量就不需要進行上鎖了⑥。這使得訪問tail需要一個尾互斥量。因為,你需要只需要訪問tail一次,且只有在訪問時才需要互斥量。這個操作最好是通過函數(shù)進行包裝。事實上,因為代碼只有在成員需要head時,互斥量才上鎖,這項也需要包含在包裝函數(shù)中。最終代碼如下所示。
清單6.6 線程安全隊列——細粒度鎖版
template<typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return nullptr;
}
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
public:
threadsafe_queue():
head(new node),tail(head.get())
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head=pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail=p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
tail->next=std::move(p);
tail=new_tail;
}
};
讓我們用挑剔的目光來看一下上面的代碼,并考慮6.1.1節(jié)中給出的指導(dǎo)意見。在你觀察不變量前,你需要確定的狀態(tài)有:
tail->next == nullptr
tail->data == nullptr
head == taill(意味著空列表)
單元素列表 head->next = tail
在列表中的每一個節(jié)點x,x!=tail且x->data指向一個T類型的實例,并且x->next指向列表中下一個節(jié)點。x->next == tail意味著x就是列表中最后一個節(jié)點
這里的push()很簡單:僅修改了被tail_mutex的數(shù)據(jù),因為新的尾節(jié)點是一個空節(jié)點,并且其data和next都為舊的尾節(jié)點(實際上的尾節(jié)點)設(shè)置好,所以其能維持不變量的狀態(tài)。
有趣的部分在于try_pop()上。事實證明,不僅需要對tail_mutex上鎖,來保護對tail的讀?。贿€要保證在從頭讀取數(shù)據(jù)時,不會產(chǎn)生數(shù)據(jù)競爭。如果沒有這些互斥量,當一個線程調(diào)用try_pop()的同時,另一個線程調(diào)用push(),那么這里操作順序?qū)⒉豢深A(yù)測。盡管,每一個成員函數(shù)都持有一個互斥量,這些互斥量能保護數(shù)據(jù)不會同時被多個線程訪問到;并且,隊列中的所有數(shù)據(jù)來源,都是通過調(diào)用push()得到的。因為線程可能會無序的方位同一數(shù)據(jù),所以這里就會有數(shù)據(jù)競爭(正如你在第5章看到的那樣),以及未定義行為。幸運的是,在get_tail()中的tail_mutex解決了所有的問題。因為調(diào)用get_tail()將會鎖住同名鎖,就像push()一樣,這就為兩個操作規(guī)定好了順序。要不就是get_tail()在push()之前被調(diào)用,這種情況下,線程可以看到舊的尾節(jié)點,要不就是在push()之后完成,這種情況下,線程就能看到tail的新值,以及新數(shù)據(jù)前的真正tail的值。
當get_tail()調(diào)用前,head_mutex已經(jīng)上鎖,這一步也是很重要的哦。如果不這樣,調(diào)用pop_head()時就會被get_tail()和head_mutex所卡住,因為其他線程調(diào)用try_pop()(以及pop_head())時,都需要先獲取鎖,然后阻止從下面的過程中初始化線程:
std::unique_ptr<node> pop_head() // 這是個有缺陷的實現(xiàn)
{
node* const old_tail=get_tail(); // ① 在head_mutex范圍外獲取舊尾節(jié)點的值
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==old_tail) // ②
{
return nullptr;
}
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next); // ③
return old_head;
}
這是一個有缺陷的實現(xiàn),調(diào)用get_tail()是在鎖的范圍之外,你可能也許會發(fā)現(xiàn)head和tail,在你初始化線程,并獲取head_mutex時,發(fā)生了改變。并且,不只是返回尾節(jié)點時,返回的不是尾節(jié)點了,其值甚至都不列表中的值了。即使head是最后一個節(jié)點,這也意味著head和old_tail②比較失敗。因此,當你更新head③時,可能會將head移到tail之后,這樣的話就意味著數(shù)據(jù)結(jié)構(gòu)遭到了破壞。在正確實現(xiàn)中(清單6.6),需要保證在head_mutex保護的范圍內(nèi)調(diào)用get_tail()。這就能保證沒有其他線程能對head進行修改,并且tail會向正確的方向移動(當有新節(jié)點添加進來時),這樣就很安全了。head不會傳遞給get_tail()的返回值,所以不變量的狀態(tài)時穩(wěn)定的。
當使用pop_head()更新head時(從隊列中刪除節(jié)點),互斥量就已經(jīng)上鎖了,并且try_pop()可以提取數(shù)據(jù),并在確實有個數(shù)據(jù)的時候刪除一個節(jié)點(若沒有數(shù)據(jù),則返回std::shared_ptr<>的空實例),因為只有一個線程可以訪問這個節(jié)點,所以根據(jù)我們所掌握的知識,認為這個操作是安全的。
接下來,外部接口就相當于清單6.2代碼中的子集了,所以同樣的分析結(jié)果:對于固有接口來說,不存在條件競爭。
異常是很有趣的東西。雖然,你已經(jīng)改變了數(shù)據(jù)的分配模式,但是異??赡軓膭e的地方襲來。try_pop()中的對鎖的操作會產(chǎn)生異常,并直到鎖獲取才能對數(shù)據(jù)進行修改。因此,try_pop()是異常安全的。另一方面,push()可以在堆上新分配出一個T的實例,以及一個node的新實例,這里可能會拋出異常。但是,所有分配的對象都賦給了智能指針,那么當異常發(fā)生時,他們就會被釋放掉。一旦鎖被獲取,push()中的操作就不會拋出異常,所以push()也是異常安全的。
因為沒有修改任何接口,所以不會死鎖。在實現(xiàn)內(nèi)部也不會有死鎖;唯一需要獲取兩個鎖的是pop_head(),這個函數(shù)需要獲取head_mutex和tail_mutex,所以不會產(chǎn)生死鎖。
那么剩下的問題就都在實際并發(fā)的可行性上了。這個結(jié)構(gòu)對并發(fā)訪問的考慮要多于清單6.2中的代碼,因為這里鎖粒度更加的小,并且更多的數(shù)據(jù)不在鎖的保護范圍內(nèi)。比如,在push()中,新節(jié)點和新數(shù)據(jù)的分配都不需要鎖來保護。這就意味著多線程情況下,節(jié)點及數(shù)據(jù)的分配是“安全”并發(fā)的。同一時間內(nèi),只有一個線程可以將它的節(jié)點和數(shù)據(jù)添加到隊列中,所以代碼中只是簡單使用了指針賦值的形式,相較于基于std::queue<>的實現(xiàn)中,對于std::queue<>的內(nèi)部操作進行上鎖,這個結(jié)構(gòu)中就不需要了。
同樣,try_pop()持有tail_mutex也只有很短的時間,只為保護對tail的讀取。因此,當有數(shù)據(jù)push進隊列后,try_pop()幾乎及可以完全并發(fā)調(diào)用了。同樣在執(zhí)行中,對head_mutex的持有時間也是極短的。當并發(fā)訪問時,這就會增加對try_pop()的訪問次數(shù);且只有一個線程,在同一時間內(nèi)可以訪問pop_head(),且多線程情況下可以刪除隊列中的舊節(jié)點,并且安全的返回數(shù)據(jù)。
等待數(shù)據(jù)彈出
OK,所以清單6.6提供了一個使用細粒度鎖的線程安全隊列,不過只有try_pop()可以并發(fā)訪問(且只有一個重載存在)。那么在清單6.2中方便的wait_and_pop()呢?你能通過細粒度鎖實現(xiàn)一個相同功能的接口嗎?
當然,答案是“是的”,不過的確有些困難,困難在哪里?修改push()是相對簡單的:只需要在函數(shù)體末尾添加data_cond.notify_ont()函數(shù)的調(diào)用即可(如同清單6.2中那樣)。當然,事實并沒有那么簡單:你使用細粒度鎖,是為了保證最大程度的并發(fā)。當將互斥量和notify_one()混用的時,如果被通知的線程在互斥量解鎖后被喚醒,那么這個線程就不得不等待互斥量上鎖。另一方面,當解鎖操作在notify_one()之前調(diào)用,那么互斥量可能會等待線程醒來,來獲取互斥鎖(假設(shè)沒有其他線程對互斥量上鎖)。這可能是一個微小的改動,但是對于一些情況來說,就顯的很重要了。
wait_and_pop()就有些復(fù)雜了,因為需要確定在哪里等待,也就是函數(shù)在哪里執(zhí)行,并且需要確定哪些互斥量需要上鎖。等待的條件是“隊列非空”,這就意味著head!=tail。這樣寫的話,就需要同時獲取head_mutex和tail_mutex,并對其進行上鎖,不過在清單6.6中已經(jīng)使用tail_mutex來保護對tail的讀取,以及不用和自身記性比較,所以這種邏輯也同樣適用于這里。如果有函數(shù)讓head!=get_tail(),你只需要持有head_mutex,然后你就可以使用鎖,對data_cond.wait()的調(diào)用進行保護。當你將等待邏輯添加入結(jié)構(gòu)當中,那么實現(xiàn)的方式與try_pop()基本上是一樣的。
對于try_pop()和wait_and_pop()的重載都需要深思熟慮。當你將返回std::shared_ptr<>替換為從“old_head后索引出的值,并且拷貝賦值給value參數(shù)”進行返回時,那么這里將會存在異常安全問題。數(shù)據(jù)項在互斥鎖未上鎖的情況下被刪除,將剩下的數(shù)據(jù)返回給調(diào)用者。不過,當拷貝賦值拋出異常(可能性很大)時,數(shù)據(jù)項將會丟失,因為它沒有被返回隊列原來的位置上。
當T類型有無異常拋出的移動賦值操作,或無異常拋出的交換操作時,你可以使用它,不過,你肯定更喜歡一種通用的解決方案,無論T是什么類型,這個方案都能使用。在這種情況下,在節(jié)點從列表中刪除前,你就不得不將有可能拋出異常的代碼,放在鎖保護的范圍內(nèi),來保證異常安全性。這也就意味著你需要對pop_head()進行重載,查找索引值在列表改動前的位置。
相比之下,empty()就更加的簡單:只需要鎖住head_mutex,并且檢查head==get_tail()(詳見清單6.10)就可以了。最終的代碼,在清單6.7,6.8,6.9和6.10中。
清單6.7 可上鎖和等待的線程安全隊列——內(nèi)部機構(gòu)及接口
template<typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
public:
threadsafe_queue():
head(new node),tail(head.get())
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
std::shared_ptr<T> try_pop();
bool try_pop(T& value);
std::shared_ptr<T> wait_and_pop();
void wait_and_pop(T& value);
void push(T new_value);
bool empty();
};
向隊列中添加新節(jié)點是相當簡單的——下面的實現(xiàn)與上面的代碼差不多。
清單6.8 可上鎖和等待的線程安全隊列——推入新節(jié)點
template<typename T>
void threadsafe_queue<T>::push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
data_cond.notify_one();
}
如同之前所提到的,復(fù)雜部分都在pop那邊,所以提供幫助性函數(shù)去簡化這部分就很重要了。下一個清單中將展示wait_and_pop()的實現(xiàn),以及先關(guān)的幫助函數(shù)。
清單6.9 可上鎖和等待的線程安全隊列——wait_and_pop()
template<typename T>
class threadsafe_queue
{
private:
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head() // 1
{
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data() // 2
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});
return std::move(head_lock); // 3
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data()); // 4
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data()); // 5
value=std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> const old_head=wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value)
{
std::unique_ptr<node> const old_head=wait_pop_head(value);
}
};
清單6.9中所示的pop部分的實現(xiàn)中有一些幫助函數(shù)來降低代碼的復(fù)雜度,例如pop_head()①和wait_for_data()②,這些函數(shù)分別是刪除頭結(jié)點和等待隊列中有數(shù)據(jù)彈出的。wait_for_data()特別值得關(guān)注,因為其不僅等待使用lambda函數(shù)對條件變量進行等待,而且它還會將鎖的實例返回給調(diào)用者③。這就需要確保同一個鎖在執(zhí)行與wait_pop_head()重載④⑤的相關(guān)操作時,已持有鎖。pop_head()是對try_pop()代碼的復(fù)用,將在下面進行展示:
清單6.10 可上鎖和等待的線程安全隊列——try_pop()和empty()
template<typename T>
class threadsafe_queue
{
private:
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
value=std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head=try_pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const old_head=try_pop_head(value);
return old_head;
}
?bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get()==get_tail());
}
};
這個隊列的實現(xiàn)將作為第7章無鎖隊列的基礎(chǔ)。這是一個無界隊列;線程可以持續(xù)向隊列中添加數(shù)據(jù)項,即使沒有元素被刪除。與之相反的就是有界隊列,在有界隊列中,隊列在創(chuàng)建的時候最大長度就已經(jīng)是固定的了。當有界隊列滿載時,嘗試在向其添加元素的操作將會失敗或者阻塞,直到有元素從隊列中彈出。在任務(wù)執(zhí)行時(詳見第8章),有界隊列對于線程間的工作花費是很有幫助的。其會阻止線程對隊列進行填充,并且可以避免線程從較遠的地方對數(shù)據(jù)項進行索引。
無界隊列的實現(xiàn),很容易擴展成,可在push()中等待跳進變量的定長隊列。相對于等待隊列中具有數(shù)據(jù)項(pop()執(zhí)行完成后),你就需要等待隊列中數(shù)據(jù)項小于最大值就可以了。對于有界隊列更多的討論,已經(jīng)超出了本書的范圍,就不再多說;現(xiàn)在越過隊列,向更加復(fù)雜的數(shù)據(jù)結(jié)構(gòu)進發(fā)。