在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ C/ 9.2 中斷線程
3.4 本章總結(jié)
6.3 基于鎖設計更加復雜的數(shù)據(jù)結(jié)構(gòu)
6.1 為并發(fā)設計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導變量類型
2.1 線程管理的基礎
8.5 在實踐中設計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權(quán)
8.3 為多線程性能設計數(shù)據(jù)結(jié)構(gòu)
6.4 本章總結(jié)
7.3 對于設計無鎖數(shù)據(jù)結(jié)構(gòu)的指導建議
關(guān)于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術(shù)
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設計
3.3 保護共享數(shù)據(jù)的替代設施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構(gòu)設計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構(gòu)設計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構(gòu)的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術(shù)
4.2 使用期望等待一次性事件
8.4 設計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關(guān)的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

9.2 中斷線程

很多情況下,使用信號來終止一個長時間運行的線程是合理的。這種線程的存在,可能是因為工作線程所在的線程池被銷毀,或是用戶顯式的取消了這個任務,亦或其他各種原因。不管是什么原因,原理都一樣:需要使用信號來讓未結(jié)束線程停止運行。這里需要一種合適的方式讓線程主動的停下來,而非讓線程戛然而止。

你可能會給每種情況制定一個獨立的機制,這樣做的意義不大。不僅因為用統(tǒng)一的機制會更容易在之后的場景中實現(xiàn),而且寫出來的中斷代碼不用擔心在哪里使用。C++11標準沒有提供這樣的機制,不過實現(xiàn)這樣的機制也并不困難。

在了解一下應該如何實現(xiàn)這種機制前,先來了解一下啟動和中斷線程的接口。

9.2.1 啟動和中斷線程

先看一下外部接口,需要從可中斷線程上獲取些什么?最起碼需要和std::thread相同的接口,還要多加一個interrupt()函數(shù):

class interruptible_thread
{
public:
  template<typename FunctionType>
  interruptible_thread(FunctionType f);
  void join();
  void detach();
  bool joinable() const;
  void interrupt();
};

類內(nèi)部可以使用std::thread來管理線程,并且使用一些自定義數(shù)據(jù)結(jié)構(gòu)來處理中斷?,F(xiàn)在,從線程的角度能看到什么呢?“能用這個類來中斷線程”——需要一個斷點(interruption point)。在不添加多余的數(shù)據(jù)的前提下,為了使斷點能夠正常使用,就需要使用一個沒有參數(shù)的函數(shù):interruption_point()。這意味著中斷數(shù)據(jù)結(jié)構(gòu)可以訪問thread_local變量,并在線程運行時,對變量進行設置,因此當線程調(diào)用interruption_point()函數(shù)時,就會去檢查當前運行線程的數(shù)據(jù)結(jié)構(gòu)。我們將在后面看到interruption_point()的具體實現(xiàn)。

thread_local標志是不能使用普通的std::thread管理線程的主要原因;需要使用一種方法分配出一個可訪問的interruptible_thread實例,就像新啟動一個線程一樣。在使用已提供函數(shù)來做這件事情前,需要將interruptible_thread實例傳遞給std::thread的構(gòu)造函數(shù),創(chuàng)建一個能夠執(zhí)行的線程,就像下面的代碼清單所實現(xiàn)。

清單9.9 interruptible_thread的基本實現(xiàn)

class interrupt_flag
{
public:
  void set();
  bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag;  // 1

class interruptible_thread
{
  std::thread internal_thread;
  interrupt_flag* flag;
public:
  template<typename FunctionType>
  interruptible_thread(FunctionType f)
  {
    std::promise<interrupt_flag*> p;  // 2
    internal_thread=std::thread([f,&p]{  // 3
      p.set_value(&this_thread_interrupt_flag);
      f();  // 4
    });
    flag=p.get_future().get();  // 5
  }
  void interrupt()
  {
    if(flag)
    {
      flag->set();  // 6
    }
  }
};

提供函數(shù)f是包裝了一個lambda函數(shù)③,線程將會持有f副本和本地promise變量(p)的引用②。在新線程中,lambda函數(shù)設置promise變量的值到this_thread_interrupt_flag(在thread_local①中聲明)的地址中,為的是讓線程能夠調(diào)用提供函數(shù)的副本④。調(diào)用線程會等待與其future相關(guān)的promise就緒,并且將結(jié)果存入到flag成員變量中⑤。注意,即使lambda函數(shù)在新線程上執(zhí)行,對本地變量p進行懸空引用,都沒有問題,因為在新線程返回之前,interruptible_thread構(gòu)造函數(shù)會等待變量p,直到變量p不被引用。實現(xiàn)沒有考慮處理匯入線程,或分離線程。所以,需要flag變量在線程退出或分離前已經(jīng)聲明,這樣就能避免懸空問題。

interrupt()函數(shù)相對簡單:需要一個線程去做中斷時,需要一個合法指針作為一個中斷標志,所以可以僅對標志進行設置⑥。

9.2.2 檢查線程是否中斷

現(xiàn)在就可以設置中斷標志了,不過不檢查線程是否被中斷,這樣的意義就不大了。使用interruption_point()函數(shù)最簡單的情況;可以在一個安全的地方調(diào)用這個函數(shù),如果標志已經(jīng)設置,就可以拋出一個thread_interrupted異常:

void interruption_point()
{
  if(this_thread_interrupt_flag.is_set())
  {
    throw thread_interrupted();
  }
}

代碼中可以在適當?shù)牡胤绞褂眠@個函數(shù):

void foo()
{
  while(!done)
  {
    interruption_point();
    process_next_item();
  }
}

雖然也能工作,但不理想。最好實在線程等待或阻塞的時候中斷線程,因為這時的線程不能運行,也就不能調(diào)用interruption_point()函數(shù)!在線程等待的時候,什么方式才能去中斷線程呢?

9.2.3 中斷等待——條件變量

OK,需要仔細選擇中斷的位置,并通過顯式調(diào)用interruption_point()進行中斷,不過在線程阻塞等待的時候,這種辦法就顯得蒼白無力了,例如:等待條件變量的通知。就需要一個新函數(shù)——interruptible_wait()——就可以運行各種需要等待的任務,并且可以知道如何中斷等待。之前提到,可能會等待一個條件變量,所以就從它開始:如何做才能中斷一個等待的條件變量呢?最簡單的方式是,當設置中斷標志時,需要提醒條件變量,并在等待后立即設置斷點。為了讓其工作,需要提醒所有等待對應條件變量的線程,就能確保感謝興趣的線程能夠蘇醒。偽蘇醒是無論如何都要處理的,所以其他線程(非感興趣線程)將會被當作偽蘇醒處理——兩者之間沒什么區(qū)別。interrupt_flag結(jié)構(gòu)需要存儲一個指針指向一個條件變量,所以用set()函數(shù)對其進行提醒。為條件變量實現(xiàn)的interruptible_wait()可能會看起來像下面清單中所示。

清單9.10 為std::condition_variable實現(xiàn)的interruptible_wait有問題版

void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);  // 1
  cv.wait(lk);  // 2
  this_thread_interrupt_flag.clear_condition_variable();  // 3
  interruption_point();
}

假設函數(shù)能夠設置和清除相關(guān)條件變量上的中斷標志,代碼會檢查中斷,通過interrupt_flag為當前線程關(guān)聯(lián)條件變量①,等待條件變量②,清理相關(guān)條件變量③,并且再次檢查中斷。如果線程在等待期間被條件變量所中斷,中斷線程將廣播條件變量,并喚醒等待該條件變量的線程,所以這里就可以檢查中斷。不幸的是,代碼有兩個問題。第一個問題比較明顯,如果想要線程安全:std::condition_variable::wait()可以拋出異常,所以這里會直接退出,而沒有通過條件變量刪除相關(guān)的中斷標志。這個問題很容易修復,就是在析構(gòu)函數(shù)中添加相關(guān)刪除操作即可。

第二個問題就不大明顯了,這段代碼存在條件競爭。雖然,線程可以通過調(diào)用interruption_point()被中斷,不過在調(diào)用wait()后,條件變量和相關(guān)中斷標志就沒有什么系了,因為線程不是等待狀態(tài),所以不能通過條件變量的方式喚醒。就需要確保線程不會在最后一次中斷檢查和調(diào)用wait()間被喚醒。這里,不對std::condition_variable的內(nèi)部結(jié)構(gòu)進行研究;不過,可通過一種方法來解決這個問題:使用lk上的互斥量對線程進行保護,這就需要將lk傳遞到set_condition_variable()函數(shù)中去。不幸的是,這將產(chǎn)生兩個新問題:需要傳遞一個互斥量的引用到一個不知道生命周期的線程中去(這個線程做中斷操作)為該線程上鎖(調(diào)用interrupt()的時候)。這里可能會死鎖,并且可能訪問到一個已經(jīng)銷毀的互斥量,所以這種方法不可取。當不能完全確定能中斷條件變量等待——沒有interruptible_wait()情況下也可以時(可能有些嚴格),那有沒有其他選擇呢?一個選擇就是放置超時等待,使用wait_for()并帶有一個簡單的超時量(比如,1ms)。在線程被中斷前,算是給了線程一個等待的上限(以時鐘刻度為基準)。如果這樣做了,等待線程將會看到更多因為超時而“偽”蘇醒的線程,不過超時也不輕易的就幫助到我們。與interrupt_flag相關(guān)的實現(xiàn)的一個實現(xiàn)放在下面的清單中展示。

清單9.11 為std::condition_variable在interruptible_wait中使用超時

class interrupt_flag
{
  std::atomic<bool> flag;
  std::condition_variable* thread_cond;
  std::mutex set_clear_mutex;

public:
  interrupt_flag():
    thread_cond(0)
  {}

  void set()
  {
    flag.store(true,std::memory_order_relaxed);
    std::lock_guard<std::mutex> lk(set_clear_mutex);
    if(thread_cond)
    {
      thread_cond->notify_all();
    }
  }

  bool is_set() const
  {
    return flag.load(std::memory_order_relaxed);
  }

  void set_condition_variable(std::condition_variable& cv)
  {
    std::lock_guard<std::mutex> lk(set_clear_mutex);
    thread_cond=&cv;
  }

  void clear_condition_variable()
  {
    std::lock_guard<std::mutex> lk(set_clear_mutex);
    thread_cond=0;
  }

  struct clear_cv_on_destruct
  {
    ~clear_cv_on_destruct()
    {
      this_thread_interrupt_flag.clear_condition_variable();
    }
  };
};

void interruptible_wait(std::condition_variable& cv,
  std::unique_lock<std::mutex>& lk)
{
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);
  interrupt_flag::clear_cv_on_destruct guard;
  interruption_point();
  cv.wait_for(lk,std::chrono::milliseconds(1));
  interruption_point();
}

如果有謂詞(相關(guān)函數(shù))進行等待,1ms的超時將會完全在謂詞循環(huán)中完全隱藏:

template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
                        std::unique_lock<std::mutex>& lk,
                        Predicate pred)
{
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);
  interrupt_flag::clear_cv_on_destruct guard;
  while(!this_thread_interrupt_flag.is_set() && !pred())
  {
    cv.wait_for(lk,std::chrono::milliseconds(1));
  }
  interruption_point();
}

這會讓謂詞被檢查的次數(shù)增加許多,不過對于簡單調(diào)用wait()這套實現(xiàn)還是很好用的。超時變量很容易實現(xiàn):通過制定時間,比如:1ms或更短。OK,對于std::condition_variable的等待,就需要小心應對了;std::condition_variable_any呢?還是能做的更好嗎?

9.2.4 使用std::condition_variable_any中斷等待

std::condition_variable_anystd::condition_variable的不同在于,std::condition_variable_any可以使用任意類型的鎖,而不僅有std::unique_lock<std::mutex>??梢宰屖虑樽銎饋砀雍唵危⑶?code>std::condition_variable_any可以比std::condition_variable做的更好。因為能與任意類型的鎖一起工作,就可以設計自己的鎖,上鎖/解鎖interrupt_flag的內(nèi)部互斥量set_clear_mutex,并且鎖也支持等待調(diào)用,就像下面的代碼。

清單9.12 為std::condition_variable_any設計的interruptible_wait

class interrupt_flag
{
  std::atomic<bool> flag;
  std::condition_variable* thread_cond;
  std::condition_variable_any* thread_cond_any;
  std::mutex set_clear_mutex;

public:
  interrupt_flag(): 
    thread_cond(0),thread_cond_any(0)
  {}

  void set()
  {
    flag.store(true,std::memory_order_relaxed);
    std::lock_guard<std::mutex> lk(set_clear_mutex);
    if(thread_cond)
    {
      thread_cond->notify_all();
    }
    else if(thread_cond_any)
    {
      thread_cond_any->notify_all();
    }
  }

  template<typename Lockable>
  void wait(std::condition_variable_any& cv,Lockable& lk)
  {
    struct custom_lock
    {
      interrupt_flag* self;
      Lockable& lk;

      custom_lock(interrupt_flag* self_,
                  std::condition_variable_any& cond,
                  Lockable& lk_):
        self(self_),lk(lk_)
      {
        self->set_clear_mutex.lock();  // 1
        self->thread_cond_any=&cond;  // 2
      }

      void unlock()  // 3
      {
        lk.unlock();
        self->set_clear_mutex.unlock();
      }

      void lock()
      {
        std::lock(self->set_clear_mutex,lk);  // 4
      }

      ~custom_lock()
      {
        self->thread_cond_any=0;  // 5
        self->set_clear_mutex.unlock();
      }
    };
    custom_lock cl(this,cv,lk);
    interruption_point();
    cv.wait(cl);
    interruption_point();
  }
  // rest as before
};

template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv,
                        Lockable& lk)
{
  this_thread_interrupt_flag.wait(cv,lk);
}

自定義的鎖類型在構(gòu)造的時候,需要所鎖住內(nèi)部set_clear_mutex①,對thread_cond_any指針進行設置,并引用std::condition_variable_any傳入鎖的構(gòu)造函數(shù)中②。Lockable引用將會在之后進行存儲,其變量必須被鎖住?,F(xiàn)在可以安心的檢查中斷,不用擔心競爭了。如果這時中斷標志已經(jīng)設置,那么標志一定是在鎖住set_clear_mutex時設置的。當條件變量調(diào)用自定義鎖的unlock()函數(shù)中的wait()時,就會對Lockable對象和set_clear_mutex進行解鎖③。這就允許線程可以嘗試中斷其他線程獲取set_clear_mutex鎖;以及在內(nèi)部wait()調(diào)用之后,檢查thread_cond_any指針。這就是在替換std::condition_variable后,所擁有的功能(不包括管理)。當wait()結(jié)束等待(因為等待,或因為偽蘇醒),因為線程將會調(diào)用lock()函數(shù),這里依舊要求鎖住內(nèi)部set_clear_mutex,并且鎖住Lockable對象④。現(xiàn)在,在wait()調(diào)用時,custom_lock的析構(gòu)函數(shù)中⑤清理thread_cond_any指針(同樣會解鎖set_clear_mutex)之前,可以再次對中斷進行檢查。

9.2.5 中斷其他阻塞調(diào)用

這次輪到中斷條件變量的等待了,不過其他阻塞情況,比如:互斥鎖,等待future等等,該怎么辦呢?通常情況下,可以使用std::condition_variable的超時選項,因為在實際運行中不可能很快的將條件變量的等待終止(不訪問內(nèi)部互斥量或future的話)。不過,在某些情況下,你知道知道你在等待什么,這樣就可以讓循環(huán)在interruptible_wait()函數(shù)中運行。作為一個例子,這里為std::future<>重載了interruptible_wait()的實現(xiàn):

template<typename T>
void interruptible_wait(std::future<T>& uf)
{
  while(!this_thread_interrupt_flag.is_set())
  {
    if(uf.wait_for(lk,std::chrono::milliseconds(1)==
       std::future_status::ready)
      break;
  }
  interruption_point();
}

等待會在中斷標志設置好的時候,或future準備就緒的時候停止,不過實現(xiàn)中每次等待future的時間只有1ms。這就意味著,中斷請求被確定前,平均等待的時間為0.5ms(這里假設存在一個高精度的時鐘)。通常wait_for至少會等待一個時鐘周期,所以如果時鐘周期為15ms,那么結(jié)束等待的時間將會是15ms,而不是1ms。接受與不接受這種情況,都得視情況而定。如果這必要,且時鐘支持的話,可以持續(xù)削減超時時間。這種方式將會讓線程蘇醒很多次,來檢查標志,并且增加線程切換的開銷。

OK,我們已經(jīng)了解如何使用interruption_point()和interruptible_wait()函數(shù)檢查中斷。

當中斷被檢查出來了,要如何處理它呢?

9.2.6 處理中斷

從中斷線程的角度看,中斷就是thread_interrupted異常,因此能像處理其他異常那樣進行處理。

特別是使用標準catch塊對其進行捕獲:

try
{
  do_something();
}
catch(thread_interrupted&)
{
  handle_interruption();
}

捕獲中斷,進行處理。其他線程再次調(diào)用interrupt()時,線程將會再次被中斷,這就被稱為斷點(interruption point)。如果線程執(zhí)行的是一系列獨立的任務,就會需要斷點;中斷一個任務,就意味著這個任務被丟棄,并且該線程就會執(zhí)行任務列表中的其他任務。

因為thread_interrupted是一個異常,在能夠被中斷的代碼中,之前線程安全的注意事項都是適用的,就是為了確保資源不會泄露,并在數(shù)據(jù)結(jié)構(gòu)中留下對應的退出狀態(tài)。通常,讓線程中斷是可行的,所以只需要讓異常傳播即可。不過,當異常傳入std::thread的析構(gòu)函數(shù)時,std::terminate()將會調(diào)用,并且整個程序?qū)K止。為了避免這種情況,需要在每個將interruptible_thread變量作為參數(shù)傳入的函數(shù)中放置catch(thread_interrupted)處理塊,可以將catch塊包裝進interrupt_flag的初始化過程中。因為異常將會終止獨立進程,就能保證未處理的中斷是異常安全的。interruptible_thread構(gòu)造函數(shù)中對線程的初始化,實現(xiàn)如下:

internal_thread=std::thread([f,&p]{
        p.set_value(&this_thread_interrupt_flag);

        try
        {
          f();
        }
        catch(thread_interrupted const&)
        {}
      });

下面,我們來看個更加復雜的例子。

9.2.7 應用退出時中斷后臺任務

試想,在桌面上查找一個應用。這就需要與用戶互動,應用的狀態(tài)需要能在顯示器上顯示,就能看出應用有什么改變。為了避免影響GUI的響應時間,通常會將處理線程放在后臺運行。后臺進程需要一直執(zhí)行,直到應用退出;后臺線程會作為應用啟動的一部分被啟動,并且在應用終止的時候停止運行。通常這樣的應用只有在機器關(guān)閉時,才會退出,因為應用需要更新應用最新的狀態(tài),就需要全時間運行。在某些情況下,當應用被關(guān)閉,需要使用有序的方式將后臺線程關(guān)閉,其中一種方式就是中斷。

下面清單中為一個系統(tǒng)實現(xiàn)了簡單的線程管理部分。

清單9.13 在后臺監(jiān)視文件系統(tǒng)

std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;

void background_thread(int disk_id)
{
  while(true)
  {
    interruption_point();  // 1
    fs_change fsc=get_fs_changes(disk_id);  // 2
    if(fsc.has_changes())
    {
      update_index(fsc);  // 3
    }
  }
}

void start_background_processing()
{
  background_threads.push_back(
    interruptible_thread(background_thread,disk_1));
  background_threads.push_back(
    interruptible_thread(background_thread,disk_2));
}

int main()
{
  start_background_processing();  // 4
  process_gui_until_exit();  // 5
  std::unique_lock<std::mutex> lk(config_mutex);
  for(unsigned i=0;i<background_threads.size();++i)
  {
    background_threads[i].interrupt();  // 6
  }
  for(unsigned i=0;i<background_threads.size();++i)
  {
    background_threads[i].join(); // 7
  }
}

啟動時,后臺線程就已經(jīng)啟動④。之后,對應線程將會處理GUI⑤。當用戶要求進程退出時,后臺進程將會被中斷⑥,并且主線程會等待每一個后臺線程結(jié)束后才退出⑦。后臺線程運行在一個循環(huán)中,并時刻檢查磁盤的變化②,對其序號進行更新③。調(diào)用interruption_point()函數(shù),可以在循環(huán)中對中斷進行檢查。

為什么中斷線程前,對線程進行等待?為什么不中斷每個線程,讓它們執(zhí)行下一個任務?答案就是“并發(fā)”。線程被中斷后,不會馬上結(jié)束,因為需要對下一個斷點進行處理,并且在退出前執(zhí)行析構(gòu)函數(shù)和代碼異常處理部分。因為需要匯聚每個線程,所以就會讓中斷線程等待,即使線程還在做著有用的工作——中斷其他線程。只有當沒有工作時(所有線程都被中斷),不需要等待。這就允許中斷線程并行的處理自己的中斷,并更快的完成中斷。

中斷機制很容易擴展到更深層次的中斷調(diào)用,或在特定的代碼塊中禁用中斷,這就當做留給讀者的作業(yè)吧。

上一篇:1.4 開始入門下一篇:資源