當(dāng)為一個(gè)特殊的任務(wù)設(shè)計(jì)并發(fā)代碼時(shí),需要根據(jù)任務(wù)本身來(lái)考慮之前所提到的問(wèn)題。為了展示以上的注意事項(xiàng)是如何應(yīng)用的,我們將看一下在C++標(biāo)準(zhǔn)庫(kù)中三個(gè)標(biāo)準(zhǔn)函數(shù)的并行實(shí)現(xiàn)。當(dāng)你遇到問(wèn)題時(shí),這里的例子可以作為很好的參照。在有較大的并發(fā)任務(wù)進(jìn)行輔助下,我們也將實(shí)現(xiàn)一些函數(shù)。
我主要演示這些實(shí)現(xiàn)使用的技術(shù),不過(guò)可能這些技術(shù)并不是最先進(jìn)的;更多優(yōu)秀的實(shí)現(xiàn)可以更好的利用硬件并發(fā),不過(guò)這些實(shí)現(xiàn)可能需要到與并行算法相關(guān)的學(xué)術(shù)文獻(xiàn),或者是多線程的專家?guī)熘?比如:Inter的TBB[4])才能看到。
并行版的std::for_each可以看作為能最直觀體現(xiàn)并行概念,就讓我們從并行版的std::for_each開始吧!
std::for_eachstd::for_each的原理很簡(jiǎn)單:其對(duì)某個(gè)范圍中的元素,依次調(diào)用用戶提供的函數(shù)。并行和串行調(diào)用的最大區(qū)別就是函數(shù)的調(diào)用順序。std::for_each是對(duì)范圍中的第一個(gè)元素調(diào)用用戶函數(shù),接著是第二個(gè),以此類推,而在并行實(shí)現(xiàn)中對(duì)于每個(gè)元素的處理順序就不能保證了,并且它們可能(我們希望如此)被并發(fā)的處理。
為了實(shí)現(xiàn)這個(gè)函數(shù)的并行版本,需要對(duì)每個(gè)線程上處理的元素進(jìn)行劃分。你事先知道元素?cái)?shù)量,所以可以處理前對(duì)數(shù)據(jù)進(jìn)行劃分(詳見(jiàn)8.1.1節(jié))。假設(shè)只有并行任務(wù)運(yùn)行,就可以使用std::thread::hardware_concurrency()來(lái)決定線程的數(shù)量。同樣,這些元素都能被獨(dú)立的處理,所以可以使用連續(xù)的數(shù)據(jù)塊來(lái)避免偽共享(詳見(jiàn)8.2.3節(jié))。
這里的算法有點(diǎn)類似于并行版的std::accumulate(詳見(jiàn)8.4.1節(jié)),不過(guò)比起計(jì)算每一個(gè)元素的加和,這里對(duì)每個(gè)元素僅僅使用了一個(gè)指定功能的函數(shù)。因?yàn)椴恍枰祷亟Y(jié)果,可以假設(shè)這可能會(huì)對(duì)簡(jiǎn)化代碼,不過(guò)想要將異常傳遞給調(diào)用者,就需要使用std::packaged_task和std::future機(jī)制對(duì)線程中的異常進(jìn)行轉(zhuǎn)移。這里展示一個(gè)樣本實(shí)現(xiàn)。
清單8.7 并行版std::for_each
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<std::future<void> > futures(num_threads-1); // 1
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<void(void)> task( // 2
[=]()
{
std::for_each(block_start,block_end,f);
});
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task)); // 3
block_start=block_end;
}
std::for_each(block_start,last,f);
for(unsigned long i=0;i<(num_threads-1);++i)
{
futures[i].get(); // 4
}
}
代碼結(jié)構(gòu)與清單8.4的差不多。最重要的不同在于futures向量對(duì)std::future<void>類型①變量進(jìn)行存儲(chǔ),因?yàn)楣ぷ骶€程不會(huì)返回值,并且簡(jiǎn)單的lambda函數(shù)會(huì)對(duì)block_start到block_end上的任務(wù)②執(zhí)行f函數(shù)。這是為了避免傳入線程的構(gòu)造函數(shù)③。當(dāng)工作線程不需要返回一個(gè)值時(shí),調(diào)用futures[i].get()④只是提供檢索工作線程異常的方法;如果不想把異常傳遞出去,就可以省略這一步。
實(shí)現(xiàn)并行std::accumulate的時(shí)候,使用std::async會(huì)簡(jiǎn)化代碼;同樣,parallel_for_each也可以使用std::async。實(shí)現(xiàn)如下所示。
清單8.8 使用std::async實(shí)現(xiàn)std::for_each
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
if(length<(2*min_per_thread))
{
std::for_each(first,last,f); // 1
}
else
{
Iterator const mid_point=first+length/2;
std::future<void> first_half= // 2
std::async(¶llel_for_each<Iterator,Func>,
first,mid_point,f);
parallel_for_each(mid_point,last,f); // 3
first_half.get(); // 4
}
}
和基于std::async的parallel_accumulate(清單8.5)一樣,是在運(yùn)行時(shí)對(duì)數(shù)據(jù)進(jìn)行迭代劃分的,而非在執(zhí)行前劃分好,這是因?yàn)槟悴恢滥愕膸?kù)需要使用多少個(gè)線程。像之前一樣,當(dāng)你將每一級(jí)的數(shù)據(jù)分成兩部分,異步執(zhí)行另外一部分②,剩下的部分就不能再進(jìn)行劃分了,所以直接運(yùn)行這一部分③;這樣就可以直接對(duì)std::for_each①進(jìn)行使用了。這里再次使用std::async和std::future的get()成員函數(shù)④來(lái)提供對(duì)異常的傳播。
回到算法,函數(shù)需要對(duì)每一個(gè)元素執(zhí)行同樣的操作(這樣的操作有很多種,初學(xué)者可能會(huì)想到std::count和std::replace),一個(gè)稍微復(fù)雜一些的例子就是使用std::find。
std::find接下來(lái)是std::find算法,因?yàn)檫@是一種不需要對(duì)數(shù)據(jù)元素做任何處理的算法。比如,當(dāng)?shù)谝粋€(gè)元素就滿足查找標(biāo)準(zhǔn),那就沒(méi)有必要對(duì)其他元素進(jìn)行搜索了。將會(huì)看到,算法屬性對(duì)于性能具有很大的影響,并且對(duì)并行實(shí)現(xiàn)的設(shè)計(jì)有著直接的影響。這個(gè)算法是一個(gè)很特別的例子,數(shù)據(jù)訪問(wèn)模式都會(huì)對(duì)代碼的設(shè)計(jì)產(chǎn)生影響(詳見(jiàn)8.3.2節(jié))。該類中的另一些算法包括std::equal和std::any_of。
當(dāng)你和妻子或者搭檔,在一個(gè)紀(jì)念盒中找尋一張老照片,當(dāng)找到這張照片時(shí),就不會(huì)再看另外的照片了。不過(guò),你得讓其他人知道你已經(jīng)找到照片了(比如,大喊一聲“找到了!”),這樣其他人就會(huì)停止搜索了。很多算法的特性就是要對(duì)每一個(gè)元素進(jìn)行處理,所以它們沒(méi)有辦法像std::find一樣,一旦找到合適數(shù)據(jù)就停止執(zhí)行。因此,你需要設(shè)計(jì)代碼對(duì)其進(jìn)行使用——當(dāng)?shù)玫较胍拇鸢妇椭袛嗥渌蝿?wù)的執(zhí)行,所以不能等待線程處理對(duì)剩下的元素進(jìn)行處理。
如果不中斷其他線程,那么串行版本的性能可能會(huì)超越并行版,因?yàn)榇兴惴梢栽谡业狡ヅ湓氐臅r(shí)候,停止搜索并返回。如果系統(tǒng)能支持四個(gè)并發(fā)線程,那么每個(gè)線程就可以對(duì)總數(shù)據(jù)量的1/4進(jìn)行檢查,并且在我們的實(shí)現(xiàn)只需要單核完成的1/4的時(shí)間,就能完成對(duì)所有元素的查找。如果匹配的元素在第一個(gè)1/4塊中,串行算法將會(huì)返回第一個(gè),因?yàn)樗惴ú恍枰獙?duì)剩下的元素進(jìn)行處理了。
一種辦法,中斷其他線程的一個(gè)辦法就是使用一個(gè)原子變量作為一個(gè)標(biāo)識(shí),在處理過(guò)每一個(gè)元素后就對(duì)這個(gè)標(biāo)識(shí)進(jìn)行檢查。如果標(biāo)識(shí)被設(shè)置,那么就有線程找到了匹配元素,所以算法就可以停止并返回了。用這種方式來(lái)中斷線程,就可以將那些沒(méi)有處理的數(shù)據(jù)保持原樣,并且在更多的情況下,相較于串行方式,性能能提升很多。缺點(diǎn)就是,加載原子變量是一個(gè)很慢的操作,會(huì)阻礙每個(gè)線程的運(yùn)行。
如何返回值和傳播異常呢?現(xiàn)在你有兩個(gè)選擇。你可以使用一個(gè)future數(shù)組,使用std::packaged_task來(lái)轉(zhuǎn)移值和異常,在主線程上對(duì)返回值和異常進(jìn)行處理;或者使用std::promise對(duì)工作線程上的最終結(jié)果直接進(jìn)行設(shè)置。這完全依賴于你想怎么樣處理工作線程上的異常。如果想停止第一個(gè)異常(即使還沒(méi)有對(duì)所有元素進(jìn)行處理),就可以使用std::promise對(duì)異常和最終值進(jìn)行設(shè)置。另外,如果想要讓其他工作線程繼續(xù)查找,可以使用std::packaged_task來(lái)存儲(chǔ)所有的異常,當(dāng)線程沒(méi)有找到匹配元素時(shí),異常將再次拋出。
這種情況下,我會(huì)選擇std::promise,因?yàn)槠湫袨楹?code>std::find更為接近。這里需要注意一下搜索的元素是不是在提供的搜索范圍內(nèi)。因此,在所有線程結(jié)束前,獲取future上的結(jié)果。如果被future阻塞住,所要查找的值不在范圍內(nèi),就會(huì)持續(xù)的等待下去。實(shí)現(xiàn)代碼如下。
清單8.9 并行find算法實(shí)現(xiàn)
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element // 1
{
void operator()(Iterator begin,Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try
{
for(;(begin!=end) && !done_flag->load();++begin) // 2
{
if(*begin==match)
{
result->set_value(begin); // 3
done_flag->store(true); // 4
return;
}
}
}
catch(...) // 5
{
try
{
result->set_exception(std::current_exception()); // 6
done_flag->store(true);
}
catch(...) // 7
{}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::promise<Iterator> result; // 8
std::atomic<bool> done_flag(false); // 9
std::vector<std::thread> threads(num_threads-1);
{ // 10
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(find_element(), // 11
block_start,block_end,match,
&result,&done_flag);
block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag); // 12
}
if(!done_flag.load()) //13
{
return last;
}
return result.get_future().get(); // 14
}
清單8.9中的函數(shù)主體與之前的例子相似。這次,由find_element類①的函數(shù)調(diào)用操作實(shí)現(xiàn),來(lái)完成查找工作的。循環(huán)通過(guò)在給定數(shù)據(jù)塊中的元素,檢查每一步上的標(biāo)識(shí)②。如果匹配的元素被找到,就將最終的結(jié)果設(shè)置到promise③當(dāng)中,并且在返回前對(duì)done_flag④進(jìn)行設(shè)置。
如果有一個(gè)異常被拋出,那么它就會(huì)被通用處理代碼⑤捕獲,并且在promise⑥嘗中試存儲(chǔ)前,對(duì)done_flag進(jìn)行設(shè)置。如果對(duì)應(yīng)promise已經(jīng)被設(shè)置,設(shè)置在promise上的值可能會(huì)拋出一個(gè)異常,所以這里⑦發(fā)生的任何異常,都可以捕獲并丟棄。
這意味著,當(dāng)線程調(diào)用find_element查詢一個(gè)值,或者拋出一個(gè)異常時(shí),如果其他線程看到done_flag被設(shè)置,那么其他線程將會(huì)終止。如果多線程同時(shí)找到匹配值或拋出異常,它們將會(huì)對(duì)promise產(chǎn)生競(jìng)爭(zhēng)。不過(guò),這是良性的條件競(jìng)爭(zhēng);因?yàn)椋晒Φ母?jìng)爭(zhēng)者會(huì)作為“第一個(gè)”返回線程,因此這個(gè)結(jié)果可以接受。
回到parallel_find函數(shù)本身,其擁有用來(lái)停止搜索的promise⑧和標(biāo)識(shí)⑨;隨著對(duì)范圍內(nèi)的元素的查找?,promise和標(biāo)識(shí)會(huì)傳遞到新線程中。主線程也使用find_element來(lái)對(duì)剩下的元素進(jìn)行查找?。像之前提到的,需要在全部線程結(jié)束前,對(duì)結(jié)果進(jìn)行檢查,因?yàn)榻Y(jié)果可能是任意位置上的匹配元素。這里將“啟動(dòng)-匯入”代碼放在一個(gè)塊中⑩,所以所有線程都會(huì)在找到匹配元素時(shí)?進(jìn)行匯入。如果找到匹配元素,就可以調(diào)用std::future<Iterator>(來(lái)自promise?)的成員函數(shù)get()來(lái)獲取返回值或異常。
不過(guò),這里假設(shè)你會(huì)使用硬件上所有可用的的并發(fā)線程,或使用其他機(jī)制對(duì)線程上的任務(wù)進(jìn)行提前劃分。就像之前一樣,可以使用std::async,以及遞歸數(shù)據(jù)劃分的方式來(lái)簡(jiǎn)化實(shí)現(xiàn)(同時(shí)使用C++標(biāo)準(zhǔn)庫(kù)中提供的自動(dòng)縮放工具)。使用std::async的parallel_find實(shí)現(xiàn)如下所示。
清單8.10 使用std::async實(shí)現(xiàn)的并行find算法
template<typename Iterator,typename MatchType> // 1
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
std::atomic<bool>& done)
{
try
{
unsigned long const length=std::distance(first,last);
unsigned long const min_per_thread=25; // 2
if(length<(2*min_per_thread)) // 3
{
for(;(first!=last) && !done.load();++first) // 4
{
if(*first==match)
{
done=true; // 5
return first;
}
}
return last; // 6
}
else
{
Iterator const mid_point=first+(length/2); // 7
std::future<Iterator> async_result=
std::async(¶llel_find_impl<Iterator,MatchType>, // 8
mid_point,last,match,std::ref(done));
Iterator const direct_result=
parallel_find_impl(first,mid_point,match,done); // 9
return (direct_result==mid_point)?
async_result.get():direct_result; // 10
}
}
catch(...)
{
done=true; // 11
throw;
}
}
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomic<bool> done(false);
return parallel_find_impl(first,last,match,done); // 12
}
如果想要在找到匹配項(xiàng)時(shí)結(jié)束,就需要在線程之間設(shè)置一個(gè)標(biāo)識(shí)來(lái)表明匹配項(xiàng)已經(jīng)被找到。因此,需要將這個(gè)標(biāo)識(shí)遞歸的傳遞。通過(guò)函數(shù)①的方式來(lái)實(shí)現(xiàn)是最簡(jiǎn)單的辦法,只需要增加一個(gè)參數(shù)——一個(gè)done標(biāo)識(shí)的引用,這個(gè)表示通過(guò)程序的主入口點(diǎn)傳入?。
核心實(shí)現(xiàn)和之前的代碼一樣。通常函數(shù)的實(shí)現(xiàn)中,會(huì)讓單個(gè)線程處理最少的數(shù)據(jù)項(xiàng)②;如果數(shù)據(jù)塊大小不足于分成兩半,就要讓當(dāng)前線程完成所有的工作了③。實(shí)際算法在一個(gè)簡(jiǎn)單的循環(huán)當(dāng)中(給定范圍),直到在循環(huán)到指定范圍中的最后一個(gè),或找到匹配項(xiàng),并對(duì)標(biāo)識(shí)進(jìn)行設(shè)置④。如果找到匹配項(xiàng),標(biāo)識(shí)done就會(huì)在返回前進(jìn)行設(shè)置⑤。無(wú)論是因?yàn)橐呀?jīng)查找到最后一個(gè),還是因?yàn)槠渌€程對(duì)done進(jìn)行了設(shè)置,都會(huì)停止查找。如果沒(méi)有找到,會(huì)將最后一個(gè)元素last進(jìn)行返回⑥。
如果給定范圍可以進(jìn)行劃分,首先要在st::async在對(duì)第二部分進(jìn)行查找⑧前,要找數(shù)據(jù)中點(diǎn)⑦,而且需要使用std::ref將done以引用的方式傳遞。同時(shí),可以通過(guò)對(duì)第一部分直接進(jìn)行遞歸查找。兩部分都是異步的,并且在原始范圍過(guò)大時(shí),直接遞歸查找的部分可能會(huì)再細(xì)化。
如果直接查找返回的是mid_point,這就意味著沒(méi)有找到匹配項(xiàng),所以就要從異步查找中獲取結(jié)果。如果在另一半中沒(méi)有匹配項(xiàng)的話,返回的結(jié)果就一定是last,這個(gè)值的返回就代表了沒(méi)有找到匹配的元素⑩。如果“異步”調(diào)用被延遲(非真正的異步),那么實(shí)際上這里會(huì)運(yùn)行g(shù)et();這種情況下,如果對(duì)下半部分的元素搜索成功,那么就不會(huì)執(zhí)行對(duì)上半部分元素的搜索了。如果異步查找真實(shí)的運(yùn)行在其他線程上,那么async_result變量的析構(gòu)函數(shù)將會(huì)等待該線程完成,所以這里不會(huì)有線程泄露。
像之前一樣,std::async可以用來(lái)提供“異常-安全”和“異常-傳播”特性。如果直接遞歸拋出異常,future的析構(gòu)函數(shù)就能讓異步執(zhí)行的線程提前結(jié)束;如果異步調(diào)用拋出異常,那么這個(gè)異常將會(huì)通過(guò)對(duì)get()成員函數(shù)的調(diào)用進(jìn)行傳播⑩。使用try/catch塊只能捕捉在done發(fā)生的異常,并且當(dāng)有異常拋出?時(shí),所有線程都能很快的終止運(yùn)行。不過(guò),不使用try/catch的實(shí)現(xiàn)依舊沒(méi)問(wèn)題,不同的就是要等待所有線程的工作是否完成。
實(shí)現(xiàn)中一個(gè)重要的特性就是,不能保證所有數(shù)據(jù)都能被std::find串行處理。其他并行算法可以借鑒這個(gè)特性,因?yàn)橐屢粋€(gè)算法并行起來(lái)這是必須具有的特性。如果有順序問(wèn)題,元素就不能并發(fā)的處理了。如果每個(gè)元素獨(dú)立,雖然對(duì)于parallel_for_each不是很重要,不過(guò)對(duì)于parallel_find,即使在開始部分已經(jīng)找到了匹配元素,也有可能返回范圍中最后一個(gè)元素;如果在知道結(jié)果的前提下,這樣的結(jié)果會(huì)讓人很驚訝。
OK,現(xiàn)在你已經(jīng)使用了并行化的std::find。如在本節(jié)開始說(shuō)的那樣,其他相似算法不需要對(duì)每一個(gè)數(shù)據(jù)元素進(jìn)行處理,并且同樣的技術(shù)可以使用到這些類似的算法上去。我們將在第9章中看到“中斷線程”的問(wèn)題。
為了完成我們的并行“三重奏”,我們將換一個(gè)角度來(lái)看一下std::partial_sum。對(duì)于這個(gè)算法,沒(méi)有太多的文獻(xiàn)可參考,不過(guò)讓這個(gè)算法并行起來(lái)是一件很有趣的事。
std::partial_sumstd::partial_sum會(huì)計(jì)算給定范圍中的每個(gè)元素,并用計(jì)算后的結(jié)果將原始序列中的值替換掉。比如,有一個(gè)序列[1,2,3,4,5],在執(zhí)行該算法后會(huì)成為:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。讓這樣一個(gè)算法并行起來(lái)會(huì)很有趣,因?yàn)檫@里不能講任務(wù)分塊,對(duì)每一塊進(jìn)行獨(dú)立的計(jì)算。比如,原始序列中的第一個(gè)元素需要加到后面的一個(gè)元素中去。
確定某個(gè)范圍部分和的一種的方式,就是在獨(dú)立塊中計(jì)算部分和,然后將第一塊中最后的元素的值,與下一塊中的所有元素進(jìn)行相加,依次類推。如果有個(gè)序列[1,2,3,4,5,6,7,8,9],然后將其分為三塊,那么在第一次計(jì)算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后將6(第一塊的最后一個(gè)元素)加到第二個(gè)塊中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再將第二塊的最后一個(gè)元素21加到第三塊中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。
將原始數(shù)據(jù)分割成塊,加上之前塊的部分和就能夠并行了。如果每個(gè)塊中的末尾元素都是第一個(gè)被更新的,那么塊中其他的元素就能被其他線程所更新,同時(shí)另一個(gè)線程對(duì)下一塊進(jìn)行更新,等等。當(dāng)處理的元素比處理核心的個(gè)數(shù)多的時(shí)候,這樣完成工作沒(méi)問(wèn)題,因?yàn)槊恳粋€(gè)核芯在每一個(gè)階段都有合適的數(shù)據(jù)可以進(jìn)行處理。
如果有很多的處理器(就是要比處理的元素個(gè)數(shù)多),那么之前的方式就無(wú)法正常工作了。如果還是將工作劃分給每個(gè)處理器,那么在第一步就沒(méi)必要去做了。這種情況下,傳遞結(jié)果就意味著讓處理器進(jìn)行等待,這時(shí)需要給這些處于等待中的處理器一些工作。所以,可以采用完全不同的方式來(lái)處理這個(gè)問(wèn)題。比起將數(shù)據(jù)塊中的最后一個(gè)元素的結(jié)果向后面的元素塊傳遞,可以對(duì)部分結(jié)果進(jìn)行傳播:第一次與相鄰的元素(距離為1)相加和(和之前一樣),之后和距離為2的元素相加,在后來(lái)和距離為4的元素相加,以此類推。比如,初始序列為[1,2,3,4,5,6,7,8,9],第一次后為[1,3,5,7,9,11,13,15,17],第二次后為[1,3,6,10,14,18, 22,26,30],下一次就要隔4個(gè)元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8個(gè)元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],這就是最終的結(jié)果。雖然,比起第一種方法多了很多步驟,不過(guò)在可并發(fā)平臺(tái)下,這種方法提高了并行的可行性;每個(gè)處理器可在每一步中處理一個(gè)數(shù)據(jù)項(xiàng)。
總體來(lái)說(shuō),當(dāng)有N個(gè)操作時(shí)(每步使用一個(gè)處理器)第二種方法需要log(N)[底為2]步;在本節(jié)中,N就相當(dāng)于數(shù)據(jù)鏈表的長(zhǎng)度。比起第一種,每個(gè)線程對(duì)分配塊做N/k個(gè)操作,然后在做N/k次結(jié)果傳遞(這里的k是線程的數(shù)量)。因此,第一種方法的時(shí)間復(fù)雜度為O(N),不過(guò)第二種方法的時(shí)間復(fù)雜度為Q(Nlog(N))。當(dāng)數(shù)據(jù)量和處理器數(shù)量相近時(shí),第二種方法需要每個(gè)處理器上log(N)個(gè)操作,第一種方法中每個(gè)處理器上執(zhí)行的操作數(shù)會(huì)隨著k的增加而增多,因?yàn)樾枰獙?duì)結(jié)果進(jìn)行傳遞。對(duì)于處理單元較少的情況,第一種方法會(huì)比較合適;對(duì)于大規(guī)模并行系統(tǒng),第二種方法比較合適。
不管怎么樣,先將效率問(wèn)題放一邊,讓我們來(lái)看一些代碼。下面清單實(shí)現(xiàn)的,就是第一種方法。
清單8.11 使用劃分的方式來(lái)并行的計(jì)算部分和
template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_chunk // 1
{
void operator()(Iterator begin,Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end=last;
++end;
std::partial_sum(begin,end,begin); // 2
if(previous_end_value) // 3
{
value_type& addend=previous_end_value->get(); // 4
*last+=addend; // 5
if(end_value)
{
end_value->set_value(*last); // 6
}
std::for_each(begin,last,[addend](value_type& item) // 7
{
item+=addend;
});
}
else if(end_value)
{
end_value->set_value(*last); // 8
}
}
catch(...) // 9
{
if(end_value)
{
end_value->set_exception(std::current_exception()); // 10
}
else
{
throw; // 11
}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25; // 12
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
typedef typename Iterator::value_type value_type;
std::vector<std::thread> threads(num_threads-1); // 13
std::vector<std::promise<value_type> >
end_values(num_threads-1); // 14
std::vector<std::future<value_type> >
previous_end_values; // 15
previous_end_values.reserve(num_threads-1); // 16
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_last=block_start;
std::advance(block_last,block_size-1); // 17
threads[i]=std::thread(process_chunk(), // 18
block_start,block_last,
(i!=0)?&previous_end_values[i-1]:0,
&end_values[i]);
block_start=block_last;
++block_start; // 19
previous_end_values.push_back(end_values[i].get_future()); // 20
}
Iterator final_element=block_start;
std::advance(final_element,std::distance(block_start,last)-1); // 21
process_chunk()(block_start,final_element, // 22
(num_threads>1)?&previous_end_values.back():0,
0);
}
這個(gè)實(shí)現(xiàn)中,使用的結(jié)構(gòu)體和之前算法中的一樣,將問(wèn)題進(jìn)行分塊解決,每個(gè)線程處理最小的數(shù)據(jù)塊?。其中,有一組線程?和一組promise?,用來(lái)存儲(chǔ)每塊中的最后一個(gè)值;并且實(shí)現(xiàn)中還有一組future?,用來(lái)對(duì)前一塊中的最后一個(gè)值進(jìn)行檢索。可以為future?做些儲(chǔ)備,以避免生成新線程時(shí),再分配內(nèi)存。
主循環(huán)和之前一樣,不過(guò)這次是讓迭代器指向了每個(gè)數(shù)據(jù)塊的最后一個(gè)元素,而不是作為一個(gè)普通值傳遞到最后?,這樣就方便向其他塊傳遞當(dāng)前塊的最后一個(gè)元素了。實(shí)際處理是在process_chunk函數(shù)對(duì)象中完成的,這個(gè)結(jié)構(gòu)體看上去不是很長(zhǎng);當(dāng)前塊的開始和結(jié)束迭代器和前塊中最后一個(gè)值的future一起,作為參數(shù)進(jìn)行傳遞,并且promise用來(lái)保留當(dāng)前范圍內(nèi)最后一個(gè)值的原始值?。
生成新的線程后,就對(duì)開始?jí)K的ID進(jìn)行更新,別忘了傳遞最后一個(gè)元素?,并且將當(dāng)前塊的最后一個(gè)元素存儲(chǔ)到future,上面的數(shù)據(jù)將在循環(huán)中再次使用到?。
在處理最后一個(gè)數(shù)據(jù)塊前,需要獲取之前數(shù)據(jù)塊中最后一個(gè)元素的迭代器(21),這樣就可以將其作為參數(shù)傳入process_chunk(22)中了。std::partial_sum不會(huì)返回一個(gè)值,所以在最后一個(gè)數(shù)據(jù)塊被處理后,就不用再做任何事情了。當(dāng)所有線程的操作完成時(shí),求部分和的操作也就算完成了。
OK,現(xiàn)在來(lái)看一下process_chunk函數(shù)對(duì)象①。對(duì)于整塊的處理是始于對(duì)std::partial_sum的調(diào)用,包括對(duì)于最后一個(gè)值的處理②,不過(guò)得要知道當(dāng)前塊是否是第一塊③。如果當(dāng)前塊不是第一塊,就會(huì)有一個(gè)previous_end_value值從前面的塊傳過(guò)來(lái),所以這里需要等待這個(gè)值的產(chǎn)生④。為了將算法最大程度的并行,首先需要對(duì)最后一個(gè)元素進(jìn)行更新⑤,這樣你就能將這個(gè)值傳遞給下一個(gè)數(shù)據(jù)塊(如果有下一個(gè)數(shù)據(jù)塊的話)⑥。當(dāng)完成這個(gè)操作,就可以使用std::for_each和簡(jiǎn)單的lambda函數(shù)⑦對(duì)剩余的數(shù)據(jù)項(xiàng)進(jìn)行更新。
如果previous_end_value值為空,當(dāng)前數(shù)據(jù)塊就是第一個(gè)數(shù)據(jù)塊,所以只需要為下一個(gè)數(shù)據(jù)塊更新end_value⑧(如果有下一個(gè)數(shù)據(jù)塊的話——當(dāng)前數(shù)據(jù)塊可能是唯一的數(shù)據(jù)塊)。
最后,如果有任意一個(gè)操作拋出異常,就可以將其捕獲⑨,并且存入promise⑩,如果下一個(gè)數(shù)據(jù)塊嘗試獲取前一個(gè)數(shù)據(jù)塊的最后一個(gè)值④時(shí),異常會(huì)再次拋出。處理最后一個(gè)數(shù)據(jù)塊時(shí),異常會(huì)全部重新拋出?,因?yàn)閽伋鰟?dòng)作一定會(huì)在主線程上進(jìn)行。
因?yàn)榫€程間需要同步,這里的代碼就不容易使用std::async重寫。任務(wù)等待會(huì)讓線程中途去執(zhí)行其他的任務(wù),所以所有的任務(wù)必須同時(shí)執(zhí)行。
基于塊,以傳遞末尾元素值的方法就介紹到這里,讓我們來(lái)看一下第二種計(jì)算方式。
實(shí)現(xiàn)以2的冪級(jí)數(shù)為距離部分和算法
第二種算法通過(guò)增加距離的方式,讓更多的處理器充分發(fā)揮作用。在這種情況下,沒(méi)有進(jìn)一步同步的必要了,因?yàn)樗兄虚g結(jié)果都直接傳遞到下一個(gè)處理器上去了。不過(guò),在實(shí)際中我們很少見(jiàn)到,單個(gè)處理器處理對(duì)一定數(shù)量的元素執(zhí)行同一條指令,這種方式成為單指令-多數(shù)據(jù)流(SIMD)。因此,代碼必須能處理通用情況,并且需要在每步上對(duì)線程進(jìn)行顯式同步。
完成這種功能的一種方式是使用柵欄(barrier)——一種同步機(jī)制:只有所有線程都到達(dá)柵欄處,才能進(jìn)行之后的操作;先到達(dá)的線程必須等待未到達(dá)的線程。C++11標(biāo)準(zhǔn)庫(kù)沒(méi)有直接提供這樣的工具,所以你得自行設(shè)計(jì)一個(gè)。
試想游樂(lè)場(chǎng)中的過(guò)山車。如果有適量的游客在等待,那么過(guò)山車管理員就要保證,在過(guò)山車啟動(dòng)前,每一個(gè)位置都得坐一個(gè)游客。柵欄的工作原理也一樣:你已經(jīng)知道了“座位”的數(shù)量,線程就是要等待所有“座位”都坐滿。當(dāng)?shù)却€程夠數(shù),那么它們可以繼續(xù)運(yùn)行;這時(shí),柵欄會(huì)重置,并且會(huì)讓下一撥線程開始扥帶。通常,會(huì)在循環(huán)中這樣做,當(dāng)同一個(gè)線程再次到達(dá)柵欄處,它會(huì)再次等待。這種方法是為了讓線程同步,所以不會(huì)有線程在其他未完成的情況下,就去完成下一個(gè)任務(wù)。如果有線程提前執(zhí)行,對(duì)于這樣一個(gè)算法,就是一場(chǎng)災(zāi)難,因?yàn)樘崆俺霭l(fā)的線程可能會(huì)修改要被其他線程使用到的數(shù)據(jù),后面線程獲取到的數(shù)據(jù)就不是正確數(shù)據(jù)了。
下面的代碼就簡(jiǎn)單的實(shí)現(xiàn)了一個(gè)柵欄。
清單8.12 簡(jiǎn)單的柵欄類
class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_): // 1
count(count_),spaces(count),generation(0)
{}
void wait()
{
unsigned const my_generation=generation; // 2
if(!--spaces) // 3
{
spaces=count; // 4
++generation; // 5
}
else
{
while(generation==my_generation) // 6
std::this_thread::yield(); // 7
}
}
};
這個(gè)實(shí)現(xiàn)中,用一定數(shù)量的“座位”構(gòu)造了一個(gè)barrier①,這個(gè)數(shù)量將會(huì)存儲(chǔ)count變量中。起初,柵欄中的spaces與count數(shù)量相當(dāng)。當(dāng)有線程都在等待時(shí),spaces的數(shù)量就會(huì)減少③。當(dāng)spaces的數(shù)量減到0時(shí),spaces的值將會(huì)重置為count④,并且generation變量會(huì)增加,以向線程發(fā)出信號(hào),讓這些等待線程能夠繼續(xù)運(yùn)行⑤。如果spaces沒(méi)有到達(dá)0,那么線程會(huì)繼續(xù)等待。這個(gè)實(shí)現(xiàn)使用了一個(gè)簡(jiǎn)單的自旋鎖⑥,對(duì)generation的檢查會(huì)在wait()開始的時(shí)候進(jìn)行②。因?yàn)間eneration只會(huì)在所有線程都到達(dá)柵欄的時(shí)候更新⑤,在等待的時(shí)候使用yield()⑦就不會(huì)讓CPU處于忙等待的狀態(tài)。
這個(gè)實(shí)現(xiàn)比較“簡(jiǎn)單”的真實(shí)意義:使用自旋等待的情況下,如果讓線程等待很長(zhǎng)時(shí)間就不會(huì)很理想,并且如果超過(guò)count數(shù)量的線程對(duì)wait()進(jìn)行調(diào)用,這個(gè)實(shí)現(xiàn)就沒(méi)有辦法工作了。如果想要很好的處理這樣的情況,必須使用一個(gè)更加健壯(更加復(fù)雜)的實(shí)現(xiàn)。我依舊堅(jiān)持對(duì)原子變量操作順序的一致性,因?yàn)檫@會(huì)讓事情更加簡(jiǎn)單,不過(guò)有時(shí)還是需要放松這樣的約束。全局同步對(duì)于大規(guī)模并行架構(gòu)來(lái)說(shuō)是消耗巨大的,因?yàn)橄嚓P(guān)處理器會(huì)穿梭于存儲(chǔ)柵欄狀態(tài)的緩存行中(可見(jiàn)8.2.2中對(duì)乒乓緩存的討論),所以需要格外的小心,來(lái)確保使用的是最佳同步方法。
不論怎么樣,這些都需要你考慮到;需要有固定數(shù)量的線程執(zhí)行同步循環(huán)。好吧,大多數(shù)情況下線程數(shù)量都是固定的。你可能還記得,代碼起始部分的幾個(gè)數(shù)據(jù)項(xiàng),只需要幾步就能得到其最終值。這就意味著,無(wú)論是讓所有線程循環(huán)處理范圍內(nèi)的所有元素,還是讓柵欄來(lái)同步線程,都會(huì)遞減count的值。我會(huì)選擇后者,因?yàn)槠淠鼙苊饩€程做不必要的工作,僅僅是等待最終步驟完成。
這意味著你要將count改為一個(gè)原子變量,這樣在多線程對(duì)其進(jìn)行更新的時(shí)候,就不需要添加額外的同步:
std::atomic<unsigned> count;
初始化保持不變,不過(guò)當(dāng)spaces的值被重置后,你需要顯式的對(duì)count進(jìn)行l(wèi)oad()操作:
spaces=count.load();
這就是要對(duì)wait()函數(shù)的改動(dòng);現(xiàn)在需要一個(gè)新的成員函數(shù)來(lái)遞減count。這個(gè)函數(shù)命名為done_waiting(),因?yàn)楫?dāng)一個(gè)線程完成其工作,并在等待的時(shí)候,才能對(duì)其進(jìn)行調(diào)用它:
void done_waiting()
{
--count; // 1
if(!--spaces) // 2
{
spaces=count.load(); // 3
++generation;
}
}
實(shí)現(xiàn)中,首先要減少count①,所以下一次spaces將會(huì)被重置為一個(gè)較小的數(shù)。然后,需要遞減spaces的值②。如果不做這些操作,有些線程將會(huì)持續(xù)等待,因?yàn)閟paces被舊的count初始化,大于期望值。一組當(dāng)中最后一個(gè)線程需要對(duì)計(jì)數(shù)器進(jìn)行重置,并且遞增generation的值③,就像在wait()里面做的那樣。最重要的區(qū)別:最后一個(gè)線程不需要等待。當(dāng)最后一個(gè)線程結(jié)束,整個(gè)等待也就隨之結(jié)束!
現(xiàn)在就準(zhǔn)備開始寫部分和的第二個(gè)實(shí)現(xiàn)吧。在每一步中,每一個(gè)線程都在柵欄出調(diào)用wait(),來(lái)保證線程所處步驟一致,并且當(dāng)所有線程都結(jié)束,那么最后一個(gè)線程會(huì)調(diào)用done_waiting()來(lái)減少count的值。如果使用兩個(gè)緩存對(duì)原始數(shù)據(jù)進(jìn)行保存,柵欄也可以提供你所需要的同步。每一步中,線程都會(huì)從原始數(shù)據(jù)或是緩存中讀取數(shù)據(jù),并且將新值寫入對(duì)應(yīng)位置。如果有線程先從原始數(shù)據(jù)處獲取數(shù)據(jù),那下一步就從緩存上獲取數(shù)據(jù)(或相反)。這就能保證在讀與寫都是由獨(dú)立線程完成,并不存在條件競(jìng)爭(zhēng)。當(dāng)線程結(jié)束等待循環(huán),就能保證正確的值最終被寫入到原始數(shù)據(jù)當(dāng)中。下面的代碼就是這樣的實(shí)現(xiàn)。
清單8.13 通過(guò)兩兩更新對(duì)的方式實(shí)現(xiàn)partial_sum
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):
count(count_),spaces(count_),generation(0)
{}
void wait()
{
unsigned const gen=generation.load();
if(!--spaces)
{
spaces=count.load();
++generation;
}
else
{
while(generation.load()==gen)
{
std::this_thread::yield();
}
}
}
void done_waiting()
{
--count;
if(!--spaces)
{
spaces=count.load();
++generation;
}
}
};
template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_element // 1
{
void operator()(Iterator first,Iterator last,
std::vector<value_type>& buffer,
unsigned i,barrier& b)
{
value_type& ith_element=*(first+i);
bool update_source=false;
for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
{
value_type const& source=(step%2)? // 2
buffer[i]:ith_element;
value_type& dest=(step%2)?
ith_element:buffer[i];
value_type const& addend=(step%2)? // 3
buffer[i-stride]:*(first+i-stride);
dest=source+addend; // 4
update_source=!(step%2);
b.wait(); // 5
}
if(update_source) // 6
{
ith_element=buffer[i];
}
b.done_waiting(); // 7
}
};
unsigned long const length=std::distance(first,last);
if(length<=1)
return;
std::vector<value_type> buffer(length);
barrier b(length);
std::vector<std::thread> threads(length-1); // 8
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(length-1);++i)
{
threads[i]=std::thread(process_element(),first,last, // 9
std::ref(buffer),i,std::ref(b));
}
process_element()(first,last,buffer,length-1,b); // 10
}
代碼的整體結(jié)構(gòu)應(yīng)該不用說(shuō)了。process_element類有函數(shù)調(diào)用操作可以用來(lái)做具體的工作①,就是運(yùn)行一組線程⑨,并將線程存儲(chǔ)到vector中⑧,同樣還需要在主線程中對(duì)其進(jìn)行調(diào)用⑩。這里與之前最大的區(qū)別就是,線程的數(shù)量是根據(jù)列表中的數(shù)據(jù)量來(lái)定的,而非根據(jù)std::thread::hardware_concurrency。如我之前所說(shuō),除非你使用的是一個(gè)大規(guī)模并行的機(jī)器,因?yàn)檫@上面的線程都十分廉價(jià)(雖然這樣的方式并不是很好),還能為我們展示了其整體結(jié)構(gòu)。這個(gè)結(jié)構(gòu)在有較少線程的時(shí)候,每一個(gè)線程只能處理源數(shù)據(jù)中的部分?jǐn)?shù)據(jù),當(dāng)沒(méi)有足夠的線程支持該結(jié)構(gòu)時(shí),效率要比傳遞算法低。
不管怎樣,主要的工作都是調(diào)用process_element的函數(shù)操作符來(lái)完成的。每一步,都會(huì)從原始數(shù)據(jù)或緩存中獲取第i個(gè)元素②,并且將獲取到的元素加到指定stride的元素中去③,如果從原始數(shù)據(jù)開始讀取的元素,加和后的數(shù)需要存儲(chǔ)在緩存中④。然后,在開始下一步前,會(huì)在柵欄處等待⑤。當(dāng)stride超出了給定數(shù)據(jù)的范圍,當(dāng)最終結(jié)果已經(jīng)存在緩存中時(shí),就需要更新原始數(shù)據(jù)中的數(shù)據(jù),同樣這也意味著本次加和結(jié)束。最后,在調(diào)用柵欄中的done_waiting()函數(shù)⑦。
注意這個(gè)解決方案并不是異常安全的。如果某個(gè)線程在process_element執(zhí)行時(shí)拋出一個(gè)異常,其就會(huì)終止整個(gè)應(yīng)用。這里可以使用一個(gè)std::promise來(lái)存儲(chǔ)異常,就像在清單8.9中parallel_find的實(shí)現(xiàn),或僅使用一個(gè)被互斥量保護(hù)的std::exception_ptr即可。
總結(jié)下這三個(gè)例子。希望其能保證我們了解8.1、8.2、8.3和8.4節(jié)中提到的設(shè)計(jì)考量,并且證明了這些技術(shù)在真實(shí)的代碼中,需要承擔(dān)些什么責(zé)任。