std::thread::hardware_concurrency()在新版C++標(biāo)準(zhǔn)庫中是一個很有用的函數(shù)。這個函數(shù)將返回能同時并發(fā)在一個程序中的線程數(shù)量。例如,多核系統(tǒng)中,返回值可以是CPU核芯的數(shù)量。返回值也僅僅是一個提示,當(dāng)系統(tǒng)信息無法獲取時,函數(shù)也會返回0。但是,這也無法掩蓋這個函數(shù)對啟動線程數(shù)量的幫助。
清單2.8實現(xiàn)了一個并行版的std::accumulate。代碼中將整體工作拆分成小任務(wù)交給每個線程去做,其中設(shè)置最小任務(wù)數(shù),是為了避免產(chǎn)生太多的線程。程序可能會在操作數(shù)量為0的時候拋出異常。比如,std::thread構(gòu)造函數(shù)無法啟動一個執(zhí)行線程,就會拋出一個異常。在這個算法中討論異常處理,已經(jīng)超出現(xiàn)階段的討論范圍,這個問題我們將在第8章中再來討論。
清單2.8 原生并行版的std::accumulate
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); // 10
return std::accumulate(results.begin(),results.end(),init); // 11
}
函數(shù)看起來很長,但不復(fù)雜。如果輸入的范圍為空①,就會得到init的值。反之,如果范圍內(nèi)多于一個元素時,都需要用范圍內(nèi)元素的總數(shù)量除以線程(塊)中最小任務(wù)數(shù),從而確定啟動線程的最大數(shù)量②,這樣能避免無謂的計算資源的浪費。比如,一臺32芯的機(jī)器上,只有5個數(shù)需要計算,卻啟動了32個線程。
計算量的最大值和硬件支持線程數(shù)中,較小的值為啟動線程的數(shù)量③。因為上下文頻繁的切換會降低線程的性能,所以你肯定不想啟動的線程數(shù)多于硬件支持的線程數(shù)量。當(dāng)std::thread::hardware_concurrency()返回0,你可以選擇一個合適的數(shù)作為你的選擇;在本例中,我選擇了"2"。你也不想在一臺單核機(jī)器上啟動太多的線程,因為這樣反而會降低性能,有可能最終讓你放棄使用并發(fā)。
每個線程中處理的元素數(shù)量,是范圍中元素的總量除以線程的個數(shù)得出的④。對于分配是否得當(dāng),我們會在后面討論。
現(xiàn)在,確定了線程個數(shù),通過創(chuàng)建一個std::vector<T>容器存放中間結(jié)果,并為線程創(chuàng)建一個std::vector<std::thread>容器⑤。這里需要注意的是,啟動的線程數(shù)必須比num_threads少1個,因為在啟動之前已經(jīng)有了一個線程(主線程)。
使用簡單的循環(huán)來啟動線程:block_end迭代器指向當(dāng)前塊的末尾⑥,并啟動一個新線程為當(dāng)前塊累加結(jié)果⑦。當(dāng)?shù)髦赶虍?dāng)前塊的末尾時,啟動下一個塊⑧。
啟動所有線程后,⑨中的線程會處理最終塊的結(jié)果。對于分配不均,因為知道最終塊是哪一個,那么這個塊中有多少個元素就無所謂了。
當(dāng)累加最終塊的結(jié)果后,可以等待std::for_each⑩創(chuàng)建線程的完成(如同在清單2.7中做的那樣),之后使用std::accumulate將所有結(jié)果進(jìn)行累加?。
結(jié)束這個例子之前,需要明確:T類型的加法運算不滿足結(jié)合律(比如,對于float型或double型,在進(jìn)行加法操作時,系統(tǒng)很可能會做截斷操作),因為對范圍中元素的分組,會導(dǎo)致parallel_accumulate得到的結(jié)果可能與std::accumulate得到的結(jié)果不同。同樣的,這里對迭代器的要求更加嚴(yán)格:必須都是向前迭代器,而std::accumulate可以在只傳入迭代器的情況下工作。對于創(chuàng)建出results容器,需要保證T有默認(rèn)構(gòu)造函數(shù)。對于算法并行,通常都要這樣的修改;不過,需要根據(jù)算法本身的特性,選擇不同的并行方式。算法并行會在第8章有更加深入的討論。需要注意的:因為不能直接從一個線程中返回一個值,所以需要傳遞results容器的引用到線程中去。另一個辦法,通過地址來獲取線程執(zhí)行的結(jié)果;第4章中,我們將使用期望(futures)完成這種方案。
當(dāng)線程運行時,所有必要的信息都需要傳入到線程中去,包括存儲計算結(jié)果的位置。不過,并非總需如此:有時候這是識別線程的可行方案,可以傳遞一個標(biāo)識數(shù),例如清單2.7中的i。不過,當(dāng)需要標(biāo)識的函數(shù)在調(diào)用棧的深層,同時其他線程也可調(diào)用該函數(shù),那么標(biāo)識數(shù)就會變的捉襟見肘。好消息是在設(shè)計C++的線程庫時,就有預(yù)見了這種情況,在之后的實現(xiàn)中就給每個線程附加了唯一標(biāo)識符。