c++11并发与多线程
文章目錄
- 前言
- 一、創建、啟動、結束線程
- 二、線程傳參、成員函數作為線程函數
- 三、數據共享問題
- 四、unique_lock類模版
- 五、多線程創建單例類對象
- 六、條件變量
- 七、async、future、packaged_task與promise
- 八、future其他成員函數、shared_future與atomic
- 九、Windows臨界區與其他各種mutex互斥量
- 十、線程池
前言
寫多線程程序時,Windows系統下用CreateThread函數創建線程,Linux系統下用pthread_create創建線程,所以這些代碼不能跨平臺使用。當然,如果使用一些跨平臺的多線程庫如POSIX_thread(pthread)是可以跨平臺的。但是使用pthread,在Windows和Linux下還是要分別配置一番,兩個系統總有一些不同的地方,所以還是不夠方便。從C++11新標準開始,C++語言本身增加了針對多線程的支持。
一、創建、啟動、結束線程
主線程是從main函數開始執行的,自己創建的線程也得從一個函數(初始函數)開始執行,函數執行完,線程也就退出了。一般來講,整個進程(程序)是否執行完畢的標志是主線程是否執行完,主線程一旦結束,子線程就會被操作系統強制終止(detach會打破這個規律)。
#include <iostream> #include <thread>using namespace std;void myprintf() {cout << "I'm a child thread." << endl; }int main() {cout << "I'm the main thread." << endl;thread threadObj(myprintf); // 創建一個線程threadObj.join();return 0; }thread是一個類,這個類就是用來創建線程的。它創建了一個對象threadObj,線程創建出來后,會執行myprintf函數。join使主線程阻塞在這里,等待子線程執行完畢。有時并不需要主線程等待子線程退出,可以使用detach。有資料解釋稱,線程detach后,與這個線程關聯的thread對象就會失去與這個線程的關聯(因為thread對象是在主線程中定義的),此時這個線程就會駐留在后臺運行,相當于被C++運行時庫接管了。這種分離的線程在Linux叫守護線程(守護進程)。針對一個線程,一旦detach(join)后就不能join(detach)了。joinable()可以判斷一個線程是否可以join或detach,可以返回true,不能返回false。thread接受的是一個可調用對象作為參數來創建線程,下面換種寫法:
#include <iostream> #include <thread>using namespace std;class TA {public:// 可調用對象,重載圓括號void operator()() { // 不帶參數cout << "TA()" << endl;} };int main() {cout << "I'm the main thread." << endl;TA ta;thread threadObj(ta);// thread threadObj(TA()); // 不能使用臨時對象,編譯無法通過threadObj.join();return 0; }類與detach結合使用,可能會帶來問題:
#include <iostream> #include <thread>using namespace std;class TA {public:TA(int &i) : m_i(i) {}// 可調用對象,重載圓括號void operator()() { // 不帶參數cout << "TA()" << endl;}int &m_i; };int main() {cout << "I'm the main thread." << endl;int i = 6;TA ta(i);thread threadObj(ta);threadObj.detach();return 0; }成員變量m_i是一個引用,綁定的是main函數里的i變量,當主線程執行結束時,i變量被銷毀,子線程可能在后臺繼續運行,繼續使用m_i,就會產生不可預料的后果。還有,當主線程結束后,ta對象被銷毀,而子線程好像正在使用這個ta對象,這樣是否會出現問題?其實ta對象是會被復制到子線程中的,但是這個對象如果有引用或指針,那就可能出現問題。
#include <iostream> #include <thread>using namespace std;class TA {public:TA(int i) : m_i(i) {printf("TA構造函數執行,m_i = %d,this = %p\n", m_i, this);}~TA() {printf("TA析構函數執行,m_i = %d,this = %p\n", m_i, this);}TA(const TA& ta) : m_i(ta.m_i) {printf("TA拷貝構造函數執行,m_i = %d,this = %p\n", m_i, this);}// 可調用對象,重載圓括號void operator()() { // 不帶參數cout << "TA()" << endl;}int m_i; };int main() {int i = 6;TA ta(i);thread threadObj(ta);threadObj.detach();return 0; }執行結果:
拷貝構造函數執行了一遍,說明ta對象是被復制到子線程中去了。主線程結束后,子線程還沒結束,所以只執行了一次析構函數。子線程跑到后臺運行,后續析構函數的打印也不會顯示到屏幕上。用lambda表達式創建線程:
二、線程傳參、成員函數作為線程函數
實際工作中可能需要創建不止一個線程,例如創建10個線程,0號線程加工前10個零件,1號線程加工第11到第20個零件,以此類推,這說明每個線程都需要知道自己需要加工的編號,這就需要給線程傳遞參數。先看看傳遞臨時對象作為線程參數:
#include <iostream> #include <thread>using namespace std;void myprintf(const int &i, char *pBuf) {cout << i << endl;cout << pBuf << endl; }int main() {int var = 1;int &vary = var;char buf[] = "this is a test.";thread threadObj(myprintf, var, buf);threadObj.join();return 0; }運行結果:
根據觀察(跟蹤調試),函數myprintf中,形參i的地址和原來main函數中var地址不同(雖然是引用類型),也就是說thread類的構造函數實際是復制了這個參數。而myprintf函數中的pBuf指向的內存就是main中buf的內存。C++語言只會為const引用產生臨時對象。
第二個參數使用了const引用,所以實際上發生了對象復制,這個與系統內部工作機理有關。
thread threadObj(myprintf, var, buf);這行代碼的本意是希望系統幫助我們把buf隱式轉換成string,但如果是detach,等main函數都執行完了,才把buf往string轉,就會出現問題,所以把這行代碼寫成以下的形式,可以保證在線程myprintf中所用的pBuf肯定是有效的:
thread threadObj(myprintf, var, string(buf));結論就是:如果傳遞int這種簡單類型參數,建議使用值傳遞,不用引用;如果傳遞類對象作為參數,則避免隱式類型轉換(例如把一個char*轉成string,把一個int轉成類A對象),全部在創建線程這一行就構建出臨時對象來,線程入口函數的形參位置使用引用來作為形參(這里如果不使用引用可能在某種情況下會導致多構造一次臨時類對象);建議使用join,不使用detach,就不存在局部變量失效導致線程對內存非法引用的問題。所以在子線程中通過參數傳遞給線程入口函數的形參(對象)實際是實參對象的復制,這就意味著即便修改了線程入口函數中的對象的內容,也無法反饋到外面,這時就需要用到std::ref了,這是一個函數模板。接下來看看傳遞類對象與智能指針作為線程參數,如下代碼:
#include <iostream> #include <vector> #include <thread>using namespace std;class A {public:A(int a) : m_i(a) {cout << "A::A(int a)構造函數執行,this = " << this << ",threadid = " << std::this_thread::get_id() << endl;}A(const A& a) {cout << "A::A(const A)拷貝構造函數執行,this = " << this << ",threadid = " << std::this_thread::get_id() << endl;}~A() {cout << "~A::A()析構函數執行,this = " << this << ",threadid = " << std::this_thread::get_id() << endl;}public:void operator()(int num) {cout << "子線程()執行,this = " << this << "threadid = " << std::this_thread::get_id() << endl;}int m_i; };void myprint2(const A& pmybuf) {cout << "子線程myprint2的參數pmybuf的地址是:" << &pmybuf << ",threadid = " << std::this_thread::get_id() << endl; }int main() {A myobj(10); // 生成一個類對象std::thread mytobj(myprint2, std::ref(myobj));mytobj.join();return 0; }用std::ref(myobj),這樣傳遞的參數真的是一個引用而不是復制出一個臨時對象作為形參了,所以myprint2形參也可以去掉const修飾了。將智能指針作為形參傳遞到線程入口函數:
#include <iostream> #include <vector> #include <thread>using namespace std;void myprint3(unique_ptr<int> pzn) {return; }int main() {unique_ptr<int> myp(new int(100));std::thread mytobj(myprint3, std::move(myp));mytobj.join();return 0; }用std::move將一個unique_ptr轉移到另一個unique_ptr上,代碼相當于將myp轉移到了形參pzn中,myp變為空。最后看看用成員函數作為線程入口函數,在類A中增加一個成員函數:
public:void thread_work(int num) { // 帶一個參數cout << "子線程thread_work執行,this = " << this << ",threadid = " << std::this_thread::get_id() << endl;}main函數調整為:
A myobj(10); std::thread mytobj(&A::thread_work, myobj, 15); // 1 //std::thread mytobj(&A::thread_work, &myobj, 15); // 2 //std::thread mytobj(&A::thread_work, std::ref(myobj), 15); // 3 mytobj.join();如1那樣,會調用一次A的拷貝構造函數,也可以寫成2、3那樣,不會調用A的拷貝構造函數。A中有對圓括號重載,main函數也可以寫成:
A myobj(10); thread mytobj(myobj, 15); // 1 // thread mytobj(std::ref(myobj), 15); // 2 第二個參數無法修改為&myobj,編譯會報錯 mytobj.join();如1那樣,會調用一次A的拷貝構造函數,也可以如2使用std::ref,不會調用類A的拷貝構造函數,但不可以使用&myobj。
三、數據共享問題
多線程同時操作數據時,需要對數據加鎖,可以使用mutex:
#include <iostream> #include <mutex> #include <list> #include <vector>using namespace std;std::list<int> msgRecvQueue; // 容器 std::mutex my_mutex; // 創建互斥量void myFun() {my_mutex.lock();// 操作容器msgRecvQueuemy_mutex.unlock(); }int main() {vector<thread> mythreads;for (int i = 0; i < 5; i++) {mythreads.push_back(thread(myFun)); // 創建并開始執行線程}for (auto iter = mythreads.begin(); iter != mythreads.end(); ++iter) {iter->join(); // 等待5個線程都返回}return 0; }可以使用std::lock_guard替代lock和unlock:
#include <mutex> #include <thread> #include <list> using namespace std;void outMsgLULProc(int& command) {std::lock_guard<std::mutex> sbguard(my_mutex); // sbguard是隨便起的變量名 if (!msgRecvQueue.empty()) {command = msgRecvQueue.front(); // 返回第一個元素但不檢查元素存在與否msgRecvQueue.pop_front();} }lock_guard<std::mutex>的工作原理是:在lock_guard類模板的構造函數里,調用了mutex的lock函數,而在析構函數里調用了mutex的unlock函數。所以當離開sbguard的作用域時,就自動解鎖了。當有兩個互斥量要上鎖時,可能會造成死鎖:
void inMsgRecvQueue() {for (int i = 0; i < 100000; i++) {my_mutex.lock(); // 兩行lock()代碼不一定緊張挨著,可能它們要保護不同的數據共享塊//......需要保護的一些共享數據my_mutex2.lock();msgRecvQueue.push_back(i); my_mutex2.unlock();my_mutex.unlock();} }void outMsgLULProc(int& command) {my_mutex2.lock();my_mutex.lock();if (!msgRecvQueue.empty()) {command = msgRecvQueue.front();msgRecvQueue.pop_front();}my_mutex2.unlock();my_mutex.unlock(); }第一條線程先對my_mutex進行lock,第二條先對my_mutex2進行lock,造成死鎖,程序無法繼續往下運行。當需要鎖住兩個互斥量時,可以使用std::lock函數模板,它能一次鎖住兩個或以上的互斥量,如果它先鎖住第一個互斥量,但鎖第二個的時候失敗了,它會把第一個解鎖,然后卡在那里不斷地嘗試鎖這兩個互斥量。
void inMsgRecvQueue() {for (int i = 0; i < 100000; i++) {std::lock(my_mutex, my_mutex2); // 相當于每個互斥量都調用了lockmsgRecvQueue.push_back(i);my_mutex2.unlock(); // 前面鎖住2個,后面就得解鎖2個my_mutex.unlock();} }結合std::lock_guard使用,用上std::lock_guard的std::adopt_lock參數:
void inMsgRecvQueue() {for (int i = 0; i < 100000; i++) {std::lock(my_mutex, my_mutex2);std::lock_guard<std::mutex> sbguard1(my_mutex, std::adopt_lock);std::lock_guard<std::mutex> sbguard2(my_mutex2, std::adopt_lock); msgRecvQueue.push_back(i);} }使用std::adopt_lock后,在構造lock_guard<std::mutex>對象時,它的構造函數不會調用my_mutex和my_mutex2的lock函數,析構函數仍然會unlock,這樣就可以自動解鎖了。
四、unique_lock類模版
unique_lock也是用來對mutex(互斥量)進行加鎖和解鎖管理,它比lock_guard更加靈活,代價是執行效率差一點,內存占用的也稍微多一點。
bool outMsgLULProc(int& command) {std::unique_lock<std::mutex> sbguard1(my_mutex);if (!msgRecvQueue.empty()) {// 消息不為空 command = msgRecvQueue.front(); // 返回第一個元素,但不檢查元素是否存在;msgRecvQueue.pop_front(); // 移除第一個元素,但不返回;return true;}return false; }unique_lock也有std::adopt_lock參數,使用這個參數表示互斥量mutex已經被lock過了,不需要再在std::unique_lock<std::mutex>對象的構造函數中lock了,使用這個參數一定要保證互斥量已經lock過了,否則會出現異常。
my_mutex.lock(); std::unique_lock<std::mutex> sbguard1(my_mutex, std::adopt_lock);unique_lock的另一個參數std::try_to_lock,使用這個參數,系統會嘗試去鎖住mutex,如果沒鎖住,也會立即返回,不會阻塞在那里。使用std::try_to_lock的前提是開發者不能自己把互斥量lock上。
void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {std::unique_lock<std::mutex> sbguard1(my_mutex, std::try_to_lock);if (sbguard1.owns_lock()) { // 條件成立表示拿到了鎖頭// 拿到了鎖頭,離開sbguard1作用域鎖頭會自動釋放msgRecvQueue.push_back(i); // 假設這個數字就是收到的命令,直接放到消息隊列里來// 其他處理代碼} else {// 沒拿到鎖cout << "inMsgRecvQueue()執行,但沒拿到鎖,只能干點別的事" << i << endl;}} }unique_lock所支持的另一個參數std::defer_lock,用這個參數的前提是開發者不能自己先去把互斥量lock上,否則會報異常。std::defer_lock的意思是初始化這個mutex,但并沒有給這個mutex加鎖。這個參數可以結合unique_lock這個類模板的一些重要的成員函數使用,比如lock函數,像下面的例子,可以隨時加鎖,函數退出的時候,會自動解鎖:
void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {std::unique_lock<std::mutex> sbguard1(my_mutex, std::defer_lock);sbguard1.lock(); // 反正unique_lock能自動解鎖,不用自己解,所以這里只管加鎖msgRecvQueue.push_back(i);} }unique_lock有unlock函數,可以給已經加鎖的互斥量解鎖,雖然unique_lock能夠自動解鎖,但是也可以用該函數手工解鎖,這就是它靈活的地方。unique_lock還有try_lock函數,嘗試給互斥量加鎖,如果拿不到鎖,則返回false,拿到則返回true,這個函數不阻塞。
void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {std::unique_lock<std::mutex> sbguard1(my_mutex, std::defer_lock);if (sbguard1.try_lock() == true) { // 返回true表示拿到了鎖,自己不用管unlock問題msgRecvQueue.push_back(i);} else {cout << "抱歉,沒拿到鎖,做點別的事情吧!" << endl;}} }unique_lock還有一個release函數,返回它管理的mutex對象指針,并釋放所有權,也就是這個unique_lock和mutex不再有關系。一旦解除unique_lock和所管理的mutex的關聯關系,如果原來的mutex對象處于加鎖狀態,則開發者有責任負責解鎖。
std::unique_lock<std::mutex> sbguard1(my_mutex); // mutex鎖定 std::mutex* p_mtx = sbguard1.release(); // 現在關聯關系解除,程序員有責任自己解鎖了,其實這個就是my_mutex,現在sbguard1已經不和my_mutex關聯了 msgRecvQueue.push_back(i); p_mtx->unlock(); // 因為前面已經加鎖,所以這里要自己解鎖了可以看出unique_lock要發揮作用,需要和一個mutex互斥量綁定到一起,這樣才是一個完整的能發揮作用的unique_lock,也就是說unique_lock需要管理一個mutex指針。一個mutex應該只和一個unique_lock綁定,如下代碼會報異常:
std::unique_lock<std::mutex> sbguard1(my_mutex); std::unique_lock<std::mutex> sbguard10(my_mutex);unique_lock可以把它所擁有的這個mutex傳遞給其他的unique_lock,unique_lock對這個mutex的所有權是屬于可以移動但不可以復制的,這個所有權的傳遞跟unique_ptr智能指針的所有權傳遞非常類似。
std::unique_lock<std::mutex> sbguard1(my_mutex); //std::unique_lock<std::mutex> sbguard10(sbguard1); // 復制所有權,不可以 std::unique_lock<std::mutex> sbguard10(std::move(sbguard1)); // 移動語義,這可以現在my_mutex和sbguard10綁定到一起了。移動后sbguard1指向空,sbguard10指向了該my_mutex msgRecvQueue.push_back(i);另外返回unique_lock類型,也是一種用法(程序寫法):
std::unique_lock<std::mutex> rtn_unique_lock() {std::unique_lock<std::mutex> tmpguard(my_mutex);// 從函數返回一個局部unique_lock對象是可以的,返回這種局部對象tmpguard// 會導致系統生成臨時unique_lock對象,并調用unique_lock的移動構造函數return tmpguard; } void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {std::unique_lock<std::mutex> sbguard1 = rtn_unique_lock();msgRecvQueue.push_back(i);} }五、多線程創建單例類對象
單例類對象只能創建一個,如果多線程創建單例類對象,要注意共享數據的保護:
class MyCAS { // 這是一個單例類private:MyCAS() {} // 構造函數是私有的public:static MyCAS* GetInstance() {if (m_instance == NULL) {std::unique_lock<std::mutex> mymutex(resource_mutex); // 自動加鎖if (m_instance == NULL) {m_instance = new MyCAS();static CGarhuishou cl; // 生命周期一直到程序退出} }return m_instance;}class CGarhuishou { // 類中套類,用于釋放對象public:~CGarhuishou() {if (MyCAS::m_instance) {delete MyCAS::m_instance;MyCAS::m_instance = NULL;}}};private:static MyCAS* m_instance; };上面代碼包含了兩句if (m_instance == NULL),這種寫法叫“雙重鎖定”或者“雙重檢查”,當m_instance不為空時,說明m_instance一定被new過了,也就不用加鎖了,可以提高效率。c++11引入了std::call_once函數,能保證函數只被調用一次:
std::once_flag g_flag; // 這是個系統定義的標記static void CreateInstance() {m_instance = new MyCAS();static CGarhuishou cl; }static MyCAS* GetInstance() {if (m_instance == NULL) { // 同樣為提高效率。// 兩個線程同時執到這里時,其中一個線程卡在這行等另外一個線程的該行執行完畢(所以可以把g_flag看成一把鎖)。 std::call_once(g_flag, CreateInstance);}return m_instance; }這樣保證函數CreateInstance()只被調用一次。
六、條件變量
std::condition_variable是一個類,一個和條件相關的類,用于等待一個條件達成,需要和互斥量配合工作:
class A {public:void outMsgRecvQueue() {int command = 0;while (true) {std::unique_lock<std::mutex> sbguard1(my_mutex); // 臨界進去// wait()用于等一個東西// 如果wait()第二個參數的lambda表達式返回的是true,wait就直接返回// 如果wait()第二個參數的lambda表達式返回的是false,那么wait()將解鎖互斥量,并堵塞到這行,堵到其他某個線程調用notify_one()通知為止// 如果wait()不用第二個參數,那跟第二個參數為lambda表達式并且返回false效果一樣(解鎖互斥量,并堵塞到這行,堵到其他某個線程調用notify_one()通知為止)my_cond.wait(sbguard1, [this] {if (!msgRecvQueue.empty())return true;return false;});// 現在互斥量是鎖著的,流程走下來意味著msgRecvQueue隊列里必然有數據command = msgRecvQueue.front(); // 返回第一個元素,但不檢查元素是否存在msgRecvQueue.pop_front(); // 移除第一個元素,但不返回sbguard1.unlock(); // 因為unique_lock的靈活性,可以隨時unlock解鎖,以免鎖住太長時間cout << "outMsgRecvQueue()執行,取出一個元素" << command << " threadid = " << std::this_thread::get_id() << endl;} // end while}void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {cout << "inMsgRecvQueue()執行,插入一個元素" << i << endl;std::unique_lock<std::mutex> sbguard1(my_mutex);msgRecvQueue.push_back(i); // 假設這個數字就是我收到的命令,直接放到消息隊列里來my_cond.notify_one(); // 嘗試把卡(堵塞)在wait()的線程喚醒,但光喚醒了還不夠,這里必須把互斥量解鎖,另外一個線程的wait()才會繼續正常工作}return;}private: std::condition_variable my_cond; // 生成一個條件對象 };這樣outMsgRecvQueue()函數只要等待inMsgRecvQueue()喚醒就可以取數據了。notify_one()可以通知一條線程,當有兩條線程在執行outMsgRecvQueue()時,notify_one()只能通知到其中一條,要想通知全部,就要用到notify_all:
void inMsgRecvQueue() {for (int i = 0; i < 100000; ++i) {cout << "inMsgRecvQueue()執行,插入一個元素" << i << endl;std::unique_lock<std::mutex> sbguard1(my_mutex);msgRecvQueue.push_back(i); // 假設這個數字就是我收到的命令,直接放到消息隊列里來my_cond.notify_all();}return; }int main() { A myobja;std::thread myOutnMsgObj(&A::outMsgRecvQueue, &myobja); // 第二個參數是引用,才能保證線程里用的是同一個對象std::thread myOutnMsgObj2(&A::outMsgRecvQueue, &myobja);std::thread myInMsgObj(&A::inMsgRecvQueue, &myobja);myInMsgObj.join();myOutnMsgObj2.join();myOutnMsgObj.join();cout << "main主函數執行結束!" << endl; return 0; }notify_all通知兩個outMsgRecvQueue線程,當這兩個線程都被喚醒后,這兩個線程中的每一個也需要嘗試重新獲取鎖,結果還是只有一個線程能獲取到鎖往下走,另外一個獲取不到鎖會繼續卡在wait那里等待。所以這里用notify_all和notify_one的結果相同。這里提及一個概念,叫作“虛假喚醒”,就是wait代碼行被喚醒了,但是不排除msgRecvQueue(消息隊列)里面沒有數據的情形。虛假喚醒產生的情況很多,例如push_back一條數據,調用多次notify_one,或者是有多個outMsgRecvQueue線程取數據,但是inMsgRecvQueue線程只push_back了一條數據,然后用notify_all把所有的outMsgRecvQueue線程都通知到了,就總有某個outMsgRecvQueue線程被喚醒,但是隊列中并沒有它要處理的數據。代碼里處理虛假喚醒的做法:
my_cond.wait(sbguard1, [this] {if (!msgRecvQueue.empty())return true; // 該lambda表達式返回true,則wait就返回,流程走下來,互斥鎖被本線程拿到return false; // 解鎖并休眠,卡在wait等待被再次喚醒 });if語句來應付虛假喚醒,因為wait被喚醒后,要先拿鎖,拿到鎖后才會執行這個lambda表達式中的判斷語句,所以此時這個lambda表達式里面的判斷是安全的。
七、async、future、packaged_task與promise
std::async是一個函數模板,用來啟動一個異步任務,會返回一個std::future對象(std::future是一個類模板)。就是說std::async會創建一個新線程(有時不會創建新線程,后面舉例)并開始執行對應的線程入口函數,返回的future對象含有線程入口函數的返回結果,可以通過future對象的成員函數get來獲取結果。
#include <future>int mythread(int mypar) {cout << "mythread() start" << " threadid = " << std::this_thread::get_id() << endl;std::chrono::milliseconds dura(5000); // 1秒 = 1000毫秒,所以5000毫秒 = 5秒std::this_thread::sleep_for(dura); // 休息一定的時長cout << "mythread() end" << " threadid = " << std::this_thread::get_id() << endl;return 5; }int main() { cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::future<int> result = std::async(mythread); // 流程并不會卡在這里,注意如果線程入口函數需要參數,可以把參數放在async的第二個參數的位置cout << result.get() << endl; // 卡在這里等待線程執行完,但是這種get因為一些內部特殊操作,不能get多次,只能get一次,否則執行會報異常 // result.wait(); // 流程卡在這里等待線程返回,但本身不返回結果 }result.get()結果是5。也可以用result.wait(),但只是等待線程返回,不返回結果。std::async可以帶參數:
class A {public:int mythread(int mypar) {cout << mypar << endl;cout << "mythread() start" << " threadid = " << std::this_thread::get_id() << endl; //新的線程idstd::chrono::milliseconds dura(20000); //1秒 = 1000毫秒,所以20000毫秒 = 20秒std::this_thread::sleep_for(dura); //休息一定的時長cout << "mythread() end" << " threadid = " << std::this_thread::get_id() << endl;return 5;} }; int main() {A a;int tmppar = 12;cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::future<int> result = std::async(&A::mythread, &a, tmppar); // 這里第二個參數是對象地址,才能保證線程里面用的是同一個對象。// 第三個參數是線程入口函數的參數cout << "continue......!" << endl;cout << result.get() << endl; cout << "main主函數執行結束!" << endl;return 0; }可以給std::async提供一個額外的參數,類型是std::launch枚舉類型,看一看這個枚舉類型可以取哪些值:
auto result = std::async(std::launch::deferred, &A::mythread, &a, tmppar);使用std::launch::deferred,表示該線程入口函數的執行被延遲到std::future的wait或者get函數被調用時,如果一直不調用wait或get,則這個線程就不執行了。調用了wait或get后,發現線程入口函數被執行了,但發現是在主線程中調用的mythread線程入口函數,所以這種寫法并沒有創建出新線程。而使用另一個參數std::launch::async,表示調用std::async時就開始創建并執行線程,意味著系統必須要創建出新線程來執行。
auto result = std::async(std::launch::async, &A::mythread, &a, tmppar);如果同時使用std::launch::deferred和std::launch::async,系統會根據一定因素(如硬件資源等)去評估是以std::launch::async方式還是std::launch::deferred方式去執行,兩者選其一:
auto result = std::async(std::launch::async | std::launch::deferred, &A::mythread, &a, tmppar); // “|”符號表示兩個枚舉值一起使用如果不使用任何額外的參數,效果就相當于使用了std::launch::async | std::launch::deferred,由系統決定運行方式。相對于std::thread來說,std::async可以由系統根據當前資源,決定是否創建新線程,當系統資源很緊張的時候,創建新線程會導致程序崩潰。std::packaged_task是打包任務,或者說把任務包裝起來的意思,是一個類模板,模板參數是各種可調用對象,將這些可調用對象包裝起來,方便將來作為線程入口函數來調用:
int main() {cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::packaged_task<int(int)> mypt(mythread); // 把函數mythread通過packaged_task包裝起來std::thread t1(std::ref(mypt), 1); // 線程直接開始執行,第二個參數作為線程入口函數的參數t1.join(); // 可以調用這個等待線程執行完畢,不調用這個不行,程序會崩潰std::future<int> result = mypt.get_future(); // std::future對象里含有線程入口函數的返回結果,這里用result保存mythread返回的結果cout << result.get() << endl;return 0; }包裝lambda表達式:
std::packaged_task<int(int)> mypt([](int mypar) {cout << mypar << endl;cout << "lambda mythread() start" << " threadid = " << std::this_thread::get_id() << endl;std::chrono::milliseconds dura(5000); // 1秒 = 1000毫秒,所以20000毫秒 = 20秒std::this_thread::sleep_for(dura); //休息一定的時長cout << "lambda mythread() end" << " threadid = " << std::this_thread::get_id() << endl;return 15; });std::thread t1(std::ref(mypt), 1); t1.join(); // 可以調用這個等待線程執行完畢,不調用這個不行,程序會崩潰 std::future<int> result = mypt.get_future(); // std::future對象里含有線程入口函數的返回結果,這里用result保存mythread返回的結果 cout << result.get() << endl;std::packaged_task包裝起來的對象也可以直接調用:
mypt(105); // 可調用對象,直接調用。當然,這樣寫并沒有創建什么新線程 std::future<int> result = mypt.get_future();實際工作中,可能遇到std::packaged_task的各種用途,比如放到容器中,然后需要的時候取出來用:
vector<std::packaged_task<int(int)>> mytasks; int main() {std::packaged_task<int(int)> mypt([](int mypar) { // 創建或者叫包裝一個任務cout << mypar << endl;cout << "lambda mythread() start" << " threadid = " << std::this_thread::get_id() << endl;std::chrono::milliseconds dura(5000); // 1秒 = 1000毫秒,所以20000毫秒 = 20秒std::this_thread::sleep_for(dura); // 休息一定的時長cout << "lambda mythread() end" << " threadid = " << std::this_thread::get_id() << endl;return 15;});// 入容器mytasks.push_back(std::move(mypt)); // 移動語義,這里要注意,入進去后mytp就empty了// 出容器std::packaged_task<int(int)> mypt2;auto iter = mytasks.begin();mypt2 = std::move(*iter); // 用移動語義mytasks.erase(iter); // 刪除第一個元素,迭代器已經失效,不能再用mypt2(123); // 直接調用。當然,這樣寫并沒有創建什么新線程// 要取得結果,則還是要借助這個futurestd::future<int> result = mypt2.get_future();cout << result.get() << endl;return 0; }std::promise是一個類模板,這個類模板的作用是能夠在某個線程中為其賦值,然后就可以在其它線程中把這個值取出來使用:
void mythread(std::promise<int>& tmpp, int calc) { // 注意第一個參數cout << "mythread() start" << " threadid = " << std::this_thread::get_id() << endl;// 做一系列復雜操作calc++;calc *= 10;// 做其他運算,整個花費了5秒std::chrono::milliseconds dura(5000);std::this_thread::sleep_for(dura);// 終于計算出了結果int result = calc; // 保存結果tmpp.set_value(result); // 結果保存到了tmpp這個對象中cout << "mythread() end" << " threadid = " << std::this_thread::get_id() << endl; }int main() {cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::promise<int> myprom; // 聲明一個std::promise對象myprom,保存的值類型為int// 創建一個線程t1,將函數mythread及對象myprom作為參數放進去std::thread t1(mythread, std::ref(myprom), 180);t1.join(); // 等線程執行完畢,這個必須有,否則報異常,join放在.get后面也可以// 獲取結果值std::future<int> fu1 = myprom.get_future(); // promise和future綁定用于獲取線程返回值auto result = fu1.get(); // 獲取值,但是這種get因為一些內部特殊操作,不能get多次,只能get一次 cout << "result = " << result << endl; return 0; }就是說可以通過promise保存一個值,將來某個時刻通過一個future綁定到這個promise上來得到這個綁定的值。拿到這個值后,再將這個值傳遞到另一個線程:
void mythread2(std::future<int>& tmpf) { // 注意參數auto result = tmpf.get(); // 獲取值,只能get一次否則會報異常cout << "mythread2 result = " << result << endl;return; }std::thread t2(mythread2, std::ref(fu1)); t2.join(); // 等線程執行完畢感覺就像是通過std::promise對象實現兩個線程的數據傳遞。
八、future其他成員函數、shared_future與atomic
std::future還有很多方法,比如判斷線程是否執行完畢,判斷線程是否被延遲執行(而且是通過主線程而非創建子線程來執行):
int main() {cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::future<int> result = std::async(mythread);//std::future<int> result = std::async(std::launch::deferred,mythread); //流程并不會卡在這里cout << "continue......!" << endl;//cout << result.get() << endl; // 卡在這里等待線程執行完,但是這種get因為一些內部特殊操作(移動操作),不能get多次,只能get一次// future_status看成一個枚舉類型std::future_status status = result.wait_for(std::chrono::seconds(1)); // 等待1秒,注意寫法,但如果async的第一參數用了std::launch::deferred,則這里是不會做任何等待的,因為線程根本沒啟動(延遲)if (status == std::future_status::timeout) {// 超時線程還沒執行完cout << "超時線程沒執行完!" << endl;cout << result.get() << endl; // 沒執行完這里也要求卡在這里等線程返回} else if (status == std::future_status::ready) {// 線程成功返回cout << "線程成功執行完畢并返回!" << endl;cout << result.get() << endl;} else if (status == std::future_status::deferred) {// 如果async的第一個參數被設置為std::launch::deferred,則本條件成立cout << "線程被延遲執行!" << endl;cout << result.get() << endl; // 上節說過,這會導致在主線程中執行了線程入口函數}return 0; }前面講到,std::async不加額外參數或者額外參數是std::launch::async | std::launch::deferred,會讓系統自行決定是否創建新線程從而會產生無法預知的潛在問題,問題的焦點在于如何確定異步任務到底有沒有被推遲運行:
int main() {std::future<int> result = std::async(mythread);std::future_status status = result.wait_for(std::chrono::seconds(0)); // 可以寫成0s,還支持ms(毫秒)寫法if (status == std::future_status::deferred) {cout << "線程被延遲執行!" << endl;cout << result.get() << endl; //可以使用.get,.wait()來調用mythread(同步調用),會卡在這里等待完成} else {// 任務未被推遲,已經開始運行,但是否運行結束,則取決于任務執行時間if (status == std::future_status::ready) {// 線程運行完畢,可以獲取結果cout << result.get() << endl;} else if (status == std::future_status::timeout) {// 線程還沒運行完畢//......}}return 0; }上一節講到packaged_task是將線程入口函數包裝起來,然后創建線程mythread,用join等待線程結束,線程的執行結果其實就保存在result這個future對象中了。然后啟動線程mythread2,在該線程中把future對象(也就是result)作為參數傳遞到線程中,而后在線程中調用future對象的get函數,拿到了線程mythread的返回結果。但需要說明的是,因為future對象的get函數被設計為移動語義,一旦調用get,就相當于把這個線程結果信息移動到result里面去了,所以再次調用get會報異常。如果多個線程都去調用get,程序肯定報異常。所以要用到std::shared_future,它的get函數是把數據進行復制(而不是轉移):
void mythread2(std::shared_future<int>& tmpf) { // 注意參數cout << "mythread2() start" << " threadid = " << std::this_thread::get_id() << endl;auto result = tmpf.get(); // 獲取值,get多次沒關系 cout << "mythread2 result = " << result << endl;return; }int main() {cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::packaged_task<int(int)> mypt(mythread); // 把函數mythread通過packaged_task包裝起來std::thread t1(std::ref(mypt), 1); // 線程直接開始執行,第二個參數作為線程入口函數的參數t1.join(); // 調用這個等待線程執行完畢,不調用這個不行,程序會崩潰std::future<int> result = mypt.get_future();// valid,判斷future對象里面的值是否有效bool ifcanget = result.valid(); // 沒有被get過表示能通過get獲取,則這里返回true// auto mythreadresult = result.get(); // 獲取值,只能get一次否則會報異常// ifcanget = result.valid(); // future對象get過了,里邊的值就沒了,這個時候就返回falsestd::shared_future<int> result_s(std::move(result)); // std::move(result)也可以替換成result.share(),在沒針對result調用get時,把result的內容弄到shared_future中來,此時future中空了ifcanget = result.valid(); // 因為result中空了,所以ifcanget為false了;這個時候不能再用result內容了ifcanget = result_s.valid(); // 因為result_s里有內容了,所以ifcanget為true了auto mythreadresult = result_s.get();mythreadresult = result_s.get(); // 可以調用多次,沒有問題std::thread t2(mythread2, std::ref(result_s));t2.join(); // 等線程執行完畢 return 0; }可以在main函數中直接構造一個shared_future對象:
int main() {cout << "main" << " threadid = " << std::this_thread::get_id() << endl;std::packaged_task<int(int)> mypt(mythread);std::thread t1(std::ref(mypt), 1);t1.join();std::shared_future<int> result_s(mypt.get_future()); // 通過get_future返回值直接構造了一個shared_future對象auto mythreadresult = result_s.get();mythreadresult = result_s.get(); // 可以調用多次,沒有問題std::thread t2(mythread2, std::ref(result_s));t2.join(); // 等線程執行完畢return 0; }在C++11中引入std::atomic類模板,叫做原子操作,可以理解成一種不需要用到互斥量加鎖的多線程并發編程方式,一般指不可以分割的操作:
std::atomic<int> g_mycout = 0; // 這是個原子整型類型變量;可以向使用整型變量一樣使用 void mythread() {for (int i = 0; i < 10000000; i++) { // 1千萬// g_mycout++; // 對應的操作就是原子操作,不會被打斷// g_mycout+=1; // 對應的操作就是原子操作,不會被打斷g_mycout = g_mycout + 1; // 這樣寫就不是原子操作了}return; }std::atomic并不是所有操作都是原子的,一般來講,包含++、–、+=、-=、&=、|=、^=等簡單運行算符的運算是原子的,其他的比較復雜的運算就不是原子的。實際工作中,原子操作可能比較多用來計數,比如累計發送出去多少個包等。下面這行代碼,編譯會出錯:
atomic<int> atm; atm = 0; auto atm2 = atm; // 不允許,編譯時報語法錯 atomic<int> atm3; atm3 = atm; // 不允許,編譯時報語法錯這一行調用拷貝構造函數,應該是拷貝構造函數被delete了??梢杂孟旅娴姆椒▽崿F:
atomic<int> atm5(atm.load()); // 這是可以的 atm5.store(12);load是以原子的方式讀atomic對象的值,store以原子的方式寫入內容。
九、Windows臨界區與其他各種mutex互斥量
Windows平臺編程里面“臨界區”的用法和互斥量完全相同:
#include <windows.h> #define __WINDOWSLJQ__ // 宏定義class A {public:A() { #ifdef __WINDOWSLJQ__ InitializeCriticalSection(&my_winsec); // 初始化臨界區 #endif}virtual ~A() { #ifdef __WINDOWSLJQ__ DeleteCriticalSection(&my_winsec); // 釋放臨界區 #endif}void inMsgRecvQueue() {for (int i = 0; i < 100000; i++) {cout << "inMsgRecvQueue()執行,插入一個元素" << i << endl; #ifdef __WINDOWSLJQ__ EnterCriticalSection(&my_winsec); // 進入臨界區msgRecvQueue.push_back(i);LeaveCriticalSection(&my_winsec); // 離開臨界區 #elsemy_mutex.lock();msgRecvQueue.push_back(i);my_mutex.unlock(); #endif}} #ifdef __WINDOWSLJQ__ // windows下叫臨界區(類似于互斥量mutex)CRITICAL_SECTION my_winsec; #endif };同一線程,Windows臨界區可以進入多次,就是說可以連續調用多次EnterCriticalSection(如果是不同線程,在上一條線程進入后還未退出時,會卡在進入臨界區這行代碼上),但退出時也要調用對應次數的LeaveCriticalSection,而互斥量則只能lock一次。可以寫一個類,實現類似std::lock_guard<std::mutex>功能:
// 本類用于自動釋放Windows下的臨界區,防止忘記LeaveCriticalSection的情況發生, // 類似于C++11中的std::lock_guard<std::mutex>功能 class CWinLock {public:CWinLock(CRITICAL_SECTION* pCritSect) { // 構造函數m_pCritical = pCritSect;EnterCriticalSection(m_pCritical);}~CWinLock() { // 析構函數LeaveCriticalSection(m_pCritical);}private:CRITICAL_SECTION* m_pCritical; };CWinLock wlock(&my_winsec); CWinLock wlock2(&my_winsec); // 調用多次也沒問題 msgRecvQueue.push_back(i); // 離開作用域時不用手動LeaveCriticalSection有人把CWinLock類相關的對象如上面的wlock、wlock2叫作RAII對象,CWinLock類也叫作RAII類,RAII翻譯成中文是“資源獲取即初始化”,這種技術的關鍵就是在構造函數中初始化資源,在析構函數中釋放資源,典型的如智能指針、容器等都用到了這種技術。上面講到c++11,一個互斥量不管是同一線程還是不同線程,只能lock一次,但有時候會遇到這種情況:
std::mutex my_mutex; void testfunc1() {std::lock_guard<std::mutex> sbguard(my_mutex);testfunc2(); // 悲劇了,程序異常崩潰了 } void testfunc2() {std::lock_guard<std::mutex> sbguard(my_mutex);//.......做另外一些事 }這時候要用到recursive_mutex,叫作“遞歸的獨占互斥量”,之前的std::mutex叫作“獨占互斥量”。recursive_mutex允許同一線程多次調用同一個互斥量的lock成員函數:
std::recursive_mutex my_mutex; void testfunc1() {std::lock_guard<std::recursive_mutex> sbguard(my_mutex);//.......做一些事testfunc2(); } void testfunc2() {std::lock_guard<std::recursive_mutex> sbguard(my_mutex);//.......做另外一些事 }std::timed_mutex是帶超時功能的獨占互斥量,std::recursive_timed_mutex是帶超時功能的遞歸的獨占互斥量。有了超時功能,就算拿不到鎖頭,也不會一直卡在拿鎖那里。std::timed_mutex有兩個獨有的接口專門用來應對超時問題,一個是try_lock_for,一個是try_lock_until。try_lock_for是等待一段時間,如果拿到鎖或者等待時間到了沒拿到鎖,流程都走下來:
std::timed_mutex my_mutex; void inMsgRecvQueue() {for (int i = 0; i < 100000; i++) {std::chrono::milliseconds timeout(100);if (my_mutex.try_lock_for(timeout)) { // 嘗試獲取鎖,這里只等100毫秒// 在這100毫秒之內拿到了鎖cout << "inMsgRecvQueue()執行,插入一個元素" << i << endl; msgRecvQueue.push_back(i);// 用完了,還要解鎖my_mutex.unlock();} else {// 這次沒拿到鎖就休息一下等待下次拿吧std::chrono::milliseconds sleeptime(100);std::this_thread::sleep_for(sleeptime);}} }try_lock_until的參數是一個時間點,代表一個未來的時間,在這個時間點沒到來的時候,卡在那里等待拿鎖,如果拿到了或者沒拿到但到達了未來的那個時間,程序流程都走下來:
if (my_mutex.try_lock_until(chrono::steady_clock::now() + timeout)) // now:當前時間std::recursive_timed_mutex就是允許同一個線程多次獲取這個互斥量,它和std::timed_mutex兩者的關系同上面講解的std::mutex和std::recursive_mutex關系一樣,可以直接把代碼std::timed_mutex my_mutex改成std::recursive_timed_mutex my_mutex。
十、線程池
請閱讀《高級線程管理》。
總結
以上是生活随笔為你收集整理的c++11并发与多线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [原]海纳百川 有容乃大:SparkR与
- 下一篇: 《C++新经典》第17章 并发与多线程