很多情況下,使用信號來終止一個長時間運行的線程是合理的。這種線程的存在,可能是因為工作線程所在的線程池被銷毀,或是用戶顯式的取消了這個任務,亦或其他各種原因。不管是什么原因,原理都一樣:需要使用信號來讓未結(jié)束線程停止運行。這里需要一種合適的方式讓線程主動的停下來,而非讓線程戛然而止。
你可能會給每種情況制定一個獨立的機制,這樣做的意義不大。不僅因為用統(tǒng)一的機制會更容易在之后的場景中實現(xiàn),而且寫出來的中斷代碼不用擔心在哪里使用。C++11標準沒有提供這樣的機制,不過實現(xiàn)這樣的機制也并不困難。
在了解一下應該如何實現(xiàn)這種機制前,先來了解一下啟動和中斷線程的接口。
先看一下外部接口,需要從可中斷線程上獲取些什么?最起碼需要和std::thread相同的接口,還要多加一個interrupt()函數(shù):
class interruptible_thread
{
public:
template<typename FunctionType>
interruptible_thread(FunctionType f);
void join();
void detach();
bool joinable() const;
void interrupt();
};
類內(nèi)部可以使用std::thread來管理線程,并且使用一些自定義數(shù)據(jù)結(jié)構(gòu)來處理中斷?,F(xiàn)在,從線程的角度能看到什么呢?“能用這個類來中斷線程”——需要一個斷點(interruption point)。在不添加多余的數(shù)據(jù)的前提下,為了使斷點能夠正常使用,就需要使用一個沒有參數(shù)的函數(shù):interruption_point()。這意味著中斷數(shù)據(jù)結(jié)構(gòu)可以訪問thread_local變量,并在線程運行時,對變量進行設置,因此當線程調(diào)用interruption_point()函數(shù)時,就會去檢查當前運行線程的數(shù)據(jù)結(jié)構(gòu)。我們將在后面看到interruption_point()的具體實現(xiàn)。
thread_local標志是不能使用普通的std::thread管理線程的主要原因;需要使用一種方法分配出一個可訪問的interruptible_thread實例,就像新啟動一個線程一樣。在使用已提供函數(shù)來做這件事情前,需要將interruptible_thread實例傳遞給std::thread的構(gòu)造函數(shù),創(chuàng)建一個能夠執(zhí)行的線程,就像下面的代碼清單所實現(xiàn)。
清單9.9 interruptible_thread的基本實現(xiàn)
class interrupt_flag
{
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; // 1
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
std::promise<interrupt_flag*> p; // 2
internal_thread=std::thread([f,&p]{ // 3
p.set_value(&this_thread_interrupt_flag);
f(); // 4
});
flag=p.get_future().get(); // 5
}
void interrupt()
{
if(flag)
{
flag->set(); // 6
}
}
};
提供函數(shù)f是包裝了一個lambda函數(shù)③,線程將會持有f副本和本地promise變量(p)的引用②。在新線程中,lambda函數(shù)設置promise變量的值到this_thread_interrupt_flag(在thread_local①中聲明)的地址中,為的是讓線程能夠調(diào)用提供函數(shù)的副本④。調(diào)用線程會等待與其future相關(guān)的promise就緒,并且將結(jié)果存入到flag成員變量中⑤。注意,即使lambda函數(shù)在新線程上執(zhí)行,對本地變量p進行懸空引用,都沒有問題,因為在新線程返回之前,interruptible_thread構(gòu)造函數(shù)會等待變量p,直到變量p不被引用。實現(xiàn)沒有考慮處理匯入線程,或分離線程。所以,需要flag變量在線程退出或分離前已經(jīng)聲明,這樣就能避免懸空問題。
interrupt()函數(shù)相對簡單:需要一個線程去做中斷時,需要一個合法指針作為一個中斷標志,所以可以僅對標志進行設置⑥。
現(xiàn)在就可以設置中斷標志了,不過不檢查線程是否被中斷,這樣的意義就不大了。使用interruption_point()函數(shù)最簡單的情況;可以在一個安全的地方調(diào)用這個函數(shù),如果標志已經(jīng)設置,就可以拋出一個thread_interrupted異常:
void interruption_point()
{
if(this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}
代碼中可以在適當?shù)牡胤绞褂眠@個函數(shù):
void foo()
{
while(!done)
{
interruption_point();
process_next_item();
}
}
雖然也能工作,但不理想。最好實在線程等待或阻塞的時候中斷線程,因為這時的線程不能運行,也就不能調(diào)用interruption_point()函數(shù)!在線程等待的時候,什么方式才能去中斷線程呢?
OK,需要仔細選擇中斷的位置,并通過顯式調(diào)用interruption_point()進行中斷,不過在線程阻塞等待的時候,這種辦法就顯得蒼白無力了,例如:等待條件變量的通知。就需要一個新函數(shù)——interruptible_wait()——就可以運行各種需要等待的任務,并且可以知道如何中斷等待。之前提到,可能會等待一個條件變量,所以就從它開始:如何做才能中斷一個等待的條件變量呢?最簡單的方式是,當設置中斷標志時,需要提醒條件變量,并在等待后立即設置斷點。為了讓其工作,需要提醒所有等待對應條件變量的線程,就能確保感謝興趣的線程能夠蘇醒。偽蘇醒是無論如何都要處理的,所以其他線程(非感興趣線程)將會被當作偽蘇醒處理——兩者之間沒什么區(qū)別。interrupt_flag結(jié)構(gòu)需要存儲一個指針指向一個條件變量,所以用set()函數(shù)對其進行提醒。為條件變量實現(xiàn)的interruptible_wait()可能會看起來像下面清單中所示。
清單9.10 為std::condition_variable實現(xiàn)的interruptible_wait有問題版
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv); // 1
cv.wait(lk); // 2
this_thread_interrupt_flag.clear_condition_variable(); // 3
interruption_point();
}
假設函數(shù)能夠設置和清除相關(guān)條件變量上的中斷標志,代碼會檢查中斷,通過interrupt_flag為當前線程關(guān)聯(lián)條件變量①,等待條件變量②,清理相關(guān)條件變量③,并且再次檢查中斷。如果線程在等待期間被條件變量所中斷,中斷線程將廣播條件變量,并喚醒等待該條件變量的線程,所以這里就可以檢查中斷。不幸的是,代碼有兩個問題。第一個問題比較明顯,如果想要線程安全:std::condition_variable::wait()可以拋出異常,所以這里會直接退出,而沒有通過條件變量刪除相關(guān)的中斷標志。這個問題很容易修復,就是在析構(gòu)函數(shù)中添加相關(guān)刪除操作即可。
第二個問題就不大明顯了,這段代碼存在條件競爭。雖然,線程可以通過調(diào)用interruption_point()被中斷,不過在調(diào)用wait()后,條件變量和相關(guān)中斷標志就沒有什么系了,因為線程不是等待狀態(tài),所以不能通過條件變量的方式喚醒。就需要確保線程不會在最后一次中斷檢查和調(diào)用wait()間被喚醒。這里,不對std::condition_variable的內(nèi)部結(jié)構(gòu)進行研究;不過,可通過一種方法來解決這個問題:使用lk上的互斥量對線程進行保護,這就需要將lk傳遞到set_condition_variable()函數(shù)中去。不幸的是,這將產(chǎn)生兩個新問題:需要傳遞一個互斥量的引用到一個不知道生命周期的線程中去(這個線程做中斷操作)為該線程上鎖(調(diào)用interrupt()的時候)。這里可能會死鎖,并且可能訪問到一個已經(jīng)銷毀的互斥量,所以這種方法不可取。當不能完全確定能中斷條件變量等待——沒有interruptible_wait()情況下也可以時(可能有些嚴格),那有沒有其他選擇呢?一個選擇就是放置超時等待,使用wait_for()并帶有一個簡單的超時量(比如,1ms)。在線程被中斷前,算是給了線程一個等待的上限(以時鐘刻度為基準)。如果這樣做了,等待線程將會看到更多因為超時而“偽”蘇醒的線程,不過超時也不輕易的就幫助到我們。與interrupt_flag相關(guān)的實現(xiàn)的一個實現(xiàn)放在下面的清單中展示。
清單9.11 為std::condition_variable在interruptible_wait中使用超時
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::mutex set_clear_mutex;
public:
interrupt_flag():
thread_cond(0)
{}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if(thread_cond)
{
thread_cond->notify_all();
}
}
bool is_set() const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable& cv)
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond=&cv;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond=0;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
this_thread_interrupt_flag.clear_condition_variable();
}
};
};
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk,std::chrono::milliseconds(1));
interruption_point();
}
如果有謂詞(相關(guān)函數(shù))進行等待,1ms的超時將會完全在謂詞循環(huán)中完全隱藏:
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk,
Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
while(!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk,std::chrono::milliseconds(1));
}
interruption_point();
}
這會讓謂詞被檢查的次數(shù)增加許多,不過對于簡單調(diào)用wait()這套實現(xiàn)還是很好用的。超時變量很容易實現(xiàn):通過制定時間,比如:1ms或更短。OK,對于std::condition_variable的等待,就需要小心應對了;std::condition_variable_any呢?還是能做的更好嗎?
std::condition_variable_any中斷等待std::condition_variable_any與std::condition_variable的不同在于,std::condition_variable_any可以使用任意類型的鎖,而不僅有std::unique_lock<std::mutex>??梢宰屖虑樽銎饋砀雍唵危⑶?code>std::condition_variable_any可以比std::condition_variable做的更好。因為能與任意類型的鎖一起工作,就可以設計自己的鎖,上鎖/解鎖interrupt_flag的內(nèi)部互斥量set_clear_mutex,并且鎖也支持等待調(diào)用,就像下面的代碼。
清單9.12 為std::condition_variable_any設計的interruptible_wait
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable* thread_cond;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag():
thread_cond(0),thread_cond_any(0)
{}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if(thread_cond)
{
thread_cond->notify_all();
}
else if(thread_cond_any)
{
thread_cond_any->notify_all();
}
}
template<typename Lockable>
void wait(std::condition_variable_any& cv,Lockable& lk)
{
struct custom_lock
{
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_,
std::condition_variable_any& cond,
Lockable& lk_):
self(self_),lk(lk_)
{
self->set_clear_mutex.lock(); // 1
self->thread_cond_any=&cond; // 2
}
void unlock() // 3
{
lk.unlock();
self->set_clear_mutex.unlock();
}
void lock()
{
std::lock(self->set_clear_mutex,lk); // 4
}
~custom_lock()
{
self->thread_cond_any=0; // 5
self->set_clear_mutex.unlock();
}
};
custom_lock cl(this,cv,lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
// rest as before
};
template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv,
Lockable& lk)
{
this_thread_interrupt_flag.wait(cv,lk);
}
自定義的鎖類型在構(gòu)造的時候,需要所鎖住內(nèi)部set_clear_mutex①,對thread_cond_any指針進行設置,并引用std::condition_variable_any傳入鎖的構(gòu)造函數(shù)中②。Lockable引用將會在之后進行存儲,其變量必須被鎖住?,F(xiàn)在可以安心的檢查中斷,不用擔心競爭了。如果這時中斷標志已經(jīng)設置,那么標志一定是在鎖住set_clear_mutex時設置的。當條件變量調(diào)用自定義鎖的unlock()函數(shù)中的wait()時,就會對Lockable對象和set_clear_mutex進行解鎖③。這就允許線程可以嘗試中斷其他線程獲取set_clear_mutex鎖;以及在內(nèi)部wait()調(diào)用之后,檢查thread_cond_any指針。這就是在替換std::condition_variable后,所擁有的功能(不包括管理)。當wait()結(jié)束等待(因為等待,或因為偽蘇醒),因為線程將會調(diào)用lock()函數(shù),這里依舊要求鎖住內(nèi)部set_clear_mutex,并且鎖住Lockable對象④。現(xiàn)在,在wait()調(diào)用時,custom_lock的析構(gòu)函數(shù)中⑤清理thread_cond_any指針(同樣會解鎖set_clear_mutex)之前,可以再次對中斷進行檢查。
這次輪到中斷條件變量的等待了,不過其他阻塞情況,比如:互斥鎖,等待future等等,該怎么辦呢?通常情況下,可以使用std::condition_variable的超時選項,因為在實際運行中不可能很快的將條件變量的等待終止(不訪問內(nèi)部互斥量或future的話)。不過,在某些情況下,你知道知道你在等待什么,這樣就可以讓循環(huán)在interruptible_wait()函數(shù)中運行。作為一個例子,這里為std::future<>重載了interruptible_wait()的實現(xiàn):
template<typename T>
void interruptible_wait(std::future<T>& uf)
{
while(!this_thread_interrupt_flag.is_set())
{
if(uf.wait_for(lk,std::chrono::milliseconds(1)==
std::future_status::ready)
break;
}
interruption_point();
}
等待會在中斷標志設置好的時候,或future準備就緒的時候停止,不過實現(xiàn)中每次等待future的時間只有1ms。這就意味著,中斷請求被確定前,平均等待的時間為0.5ms(這里假設存在一個高精度的時鐘)。通常wait_for至少會等待一個時鐘周期,所以如果時鐘周期為15ms,那么結(jié)束等待的時間將會是15ms,而不是1ms。接受與不接受這種情況,都得視情況而定。如果這必要,且時鐘支持的話,可以持續(xù)削減超時時間。這種方式將會讓線程蘇醒很多次,來檢查標志,并且增加線程切換的開銷。
OK,我們已經(jīng)了解如何使用interruption_point()和interruptible_wait()函數(shù)檢查中斷。
當中斷被檢查出來了,要如何處理它呢?
從中斷線程的角度看,中斷就是thread_interrupted異常,因此能像處理其他異常那樣進行處理。
特別是使用標準catch塊對其進行捕獲:
try
{
do_something();
}
catch(thread_interrupted&)
{
handle_interruption();
}
捕獲中斷,進行處理。其他線程再次調(diào)用interrupt()時,線程將會再次被中斷,這就被稱為斷點(interruption point)。如果線程執(zhí)行的是一系列獨立的任務,就會需要斷點;中斷一個任務,就意味著這個任務被丟棄,并且該線程就會執(zhí)行任務列表中的其他任務。
因為thread_interrupted是一個異常,在能夠被中斷的代碼中,之前線程安全的注意事項都是適用的,就是為了確保資源不會泄露,并在數(shù)據(jù)結(jié)構(gòu)中留下對應的退出狀態(tài)。通常,讓線程中斷是可行的,所以只需要讓異常傳播即可。不過,當異常傳入std::thread的析構(gòu)函數(shù)時,std::terminate()將會調(diào)用,并且整個程序?qū)K止。為了避免這種情況,需要在每個將interruptible_thread變量作為參數(shù)傳入的函數(shù)中放置catch(thread_interrupted)處理塊,可以將catch塊包裝進interrupt_flag的初始化過程中。因為異常將會終止獨立進程,就能保證未處理的中斷是異常安全的。interruptible_thread構(gòu)造函數(shù)中對線程的初始化,實現(xiàn)如下:
internal_thread=std::thread([f,&p]{
p.set_value(&this_thread_interrupt_flag);
try
{
f();
}
catch(thread_interrupted const&)
{}
});
下面,我們來看個更加復雜的例子。
試想,在桌面上查找一個應用。這就需要與用戶互動,應用的狀態(tài)需要能在顯示器上顯示,就能看出應用有什么改變。為了避免影響GUI的響應時間,通常會將處理線程放在后臺運行。后臺進程需要一直執(zhí)行,直到應用退出;后臺線程會作為應用啟動的一部分被啟動,并且在應用終止的時候停止運行。通常這樣的應用只有在機器關(guān)閉時,才會退出,因為應用需要更新應用最新的狀態(tài),就需要全時間運行。在某些情況下,當應用被關(guān)閉,需要使用有序的方式將后臺線程關(guān)閉,其中一種方式就是中斷。
下面清單中為一個系統(tǒng)實現(xiàn)了簡單的線程管理部分。
清單9.13 在后臺監(jiān)視文件系統(tǒng)
std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;
void background_thread(int disk_id)
{
while(true)
{
interruption_point(); // 1
fs_change fsc=get_fs_changes(disk_id); // 2
if(fsc.has_changes())
{
update_index(fsc); // 3
}
}
}
void start_background_processing()
{
background_threads.push_back(
interruptible_thread(background_thread,disk_1));
background_threads.push_back(
interruptible_thread(background_thread,disk_2));
}
int main()
{
start_background_processing(); // 4
process_gui_until_exit(); // 5
std::unique_lock<std::mutex> lk(config_mutex);
for(unsigned i=0;i<background_threads.size();++i)
{
background_threads[i].interrupt(); // 6
}
for(unsigned i=0;i<background_threads.size();++i)
{
background_threads[i].join(); // 7
}
}
啟動時,后臺線程就已經(jīng)啟動④。之后,對應線程將會處理GUI⑤。當用戶要求進程退出時,后臺進程將會被中斷⑥,并且主線程會等待每一個后臺線程結(jié)束后才退出⑦。后臺線程運行在一個循環(huán)中,并時刻檢查磁盤的變化②,對其序號進行更新③。調(diào)用interruption_point()函數(shù),可以在循環(huán)中對中斷進行檢查。
為什么中斷線程前,對線程進行等待?為什么不中斷每個線程,讓它們執(zhí)行下一個任務?答案就是“并發(fā)”。線程被中斷后,不會馬上結(jié)束,因為需要對下一個斷點進行處理,并且在退出前執(zhí)行析構(gòu)函數(shù)和代碼異常處理部分。因為需要匯聚每個線程,所以就會讓中斷線程等待,即使線程還在做著有用的工作——中斷其他線程。只有當沒有工作時(所有線程都被中斷),不需要等待。這就允許中斷線程并行的處理自己的中斷,并更快的完成中斷。
中斷機制很容易擴展到更深層次的中斷調(diào)用,或在特定的代碼塊中禁用中斷,這就當做留給讀者的作業(yè)吧。