生产者/消费者模式(一)
生產者消費者問題是一個多線程同步問題的經典案例,大多數多線程編程問題都是以生產者-消費者模式為基礎,擴展衍生來的。在生產者消費者模式中,緩沖區起到了連接兩個模塊的作用:生產者把數據放入緩沖區,而消費者從緩沖區取出數據,如下圖所示:
可以看出Buffer緩沖區作為一個中介,將生產者和消費者分開,使得兩部分相對獨立,生產者消費者不需要知道對方的實現邏輯,對其中一個的修改,不會影響另一個,從設計模式的角度看,降低了耦合度。而對于圖中處在多線程環境中Buffer,需要共享給多個多個生產者和消費者,為了保證讀寫數據和操作的正確性與時序性,程序需要對Buffer結構進行同步處理。通常情況下,生產者-消費者模式中的廣泛使用的Buffer緩沖區結構是阻塞隊列。
阻塞隊列的特點為:“當隊列是空的時,從隊列中獲取元素的操作將會被阻塞,或者當隊列是滿時,往隊列里添加元素的操作會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻 塞,直到其他的線程往空的隊列插入新的元素。同樣,試圖往已滿的阻塞隊列中添加新元素的線程同樣也會被阻塞,直到其他的線程使隊列重新變得空閑起來,如從 隊列中移除一個或者多個元素,或者完全清空隊列。” 阻塞隊列的實現通常是對普通隊列進行同步控制,封裝起來。一個linux下的通用隊列定義如下,代碼所示的隊列沒有為滿的情況,不設定隊列元素總數的上限:
1 template<class T> 2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 int Count(); 11 void SetBlockFlag(); 12 private: 13 typedef struct _Node 14 { 15 T * m_pData; 16 struct _Node * m_pNext; 17 } Node; 18 Node * m_pHead; //隊列頭 19 Node * m_pTail; //隊列尾 20 int m_nCount; //隊列中元素個數 21 22 bool m_bBlockFlag; //是否阻塞 23 CCond m_condQueue; 24 }; 25 26 template<class T> 27 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_bBlockFlag(false) 28 { 29 } 30 31 template<class T> 32 void BlockQueue<T>::SetBlockFlag() 33 { 34 m_bBlockFlag = true; 35 } 36 37 template<class T> 38 BlockQueue<T>::~BlockQueue() 39 { 40 Node *pTmp = NULL; 41 while (m_pHead != NULL) 42 { 43 pTmp = m_pHead; 44 m_pHead = m_pHead->m_pNext; 45 delete pTmp; 46 pTmp = NULL; 47 } 48 m_pTail = NULL; 49 } 50 51 template<class T> 52 void BlockQueue<T>::Add(T *pData) 53 { 54 (m_condQueue.GetMutex()).EnterMutex(); 55 56 Node *pTmp = new Node; 57 pTmp->m_pData = pData; 58 pTmp->m_pNext = NULL; 59 60 if (m_nCount == 0) 61 { 62 m_pHead = pTmp; 63 m_pTail = pTmp; 64 } 65 else 66 { 67 m_pTail->m_pNext = pTmp; 68 m_pTail = pTmp; 69 } 70 m_nCount++; 71 72 if (m_bBlockFlag) 73 { 74 m_condQueue.Signal(); 75 } 76 77 (m_condQueue.GetMutex()).LeaveMutex(); 78 } 79 80 template<class T> 81 T *BlockQueue<T>::Get(int timeout) 82 { 83 (m_condQueue.GetMutex()).EnterMutex(); 84 85 while (m_nCount == 0) 86 { 87 if (!m_bBlockFlag) 88 { 89 (m_condQueue.GetMutex()).LeaveMutex(); 90 return NULL; 91 } 92 else 93 { 94 if (m_condQueue.WaitNoLock(timeout) == 1) 95 { 96 (m_condQueue.GetMutex()).LeaveMutex(); 97 return NULL; 98 } 99 } 100 } 101 102 T * pTmpData = m_pHead->m_pData; 103 Node *pTmp = m_pHead; 104 105 m_pHead = m_pHead->m_pNext; 106 delete pTmp; 107 pTmp = NULL; 108 m_nCount--; 109 if (m_nCount == 0) 110 { 111 m_pTail = NULL; 112 } 113 114 (m_condQueue.GetMutex()).LeaveMutex(); 115 116 return pTmpData; 117 } 118 119 template<class T> 120 int BlockQueue<T>::Count() 121 { 122 (m_condQueue.GetMutex()).EnterMutex(); 123 int nCount = m_nCount; 124 (m_condQueue.GetMutex()).LeaveMutex(); 125 126 return nCount; 127 }隊列中使用了Cond條件變量,實現多線程環境中隊列的同步與阻塞:
- 條件變量自己持有mutex,在向隊列中Add/Get元素時,mutex將隊列鎖住,同一時刻只允許一個線程對隊列進行操作;
- 消費者從隊列中獲取數據時,如果使用SetBlockFlag函數設置了阻塞模式,那么當隊列元素為空時,需要阻塞在條件變量上進行等待,其他線程調用Add函數,向隊列中添加元素后,喚醒阻塞在條件變量上的線程。注意這里用到的等待條件是while(count == 0)而不是if(count == 0),因為在多核處理器環境中,Signal喚醒操作可能會激活多于一個線程(阻塞在條件變量上的線程),使得多個調用等待的線程返回。所以用while循環對condition多次判斷,可以避免這種假喚醒。
- 隊列元素為包含T類型指針的Node類型指針,析構時只負責釋放隊列中的Node元素,而真正指向T類型數據的指針,需要調用者進行分配和釋放,一般由生產者調用new分配,消費者使用后調用delete釋放。這里需要特別注意,因為如果忘記釋放,會造成內存泄露。
這段代碼是我在多線程環境下,較為簡單的生產者-消費者模式中,在通用的隊列模型進行的修改。目前隊列在服務器中穩定運行,還未出現太大的性能問題,當然這與應用模塊邏輯較為簡單、交易量較小有很大關。此版本中,BlockQueue的Add函數每次調用都是指向Signal操作,喚醒阻塞在環境變量上線程,如果系統中有大量的寫線程,而讀線程的操作只有少數,那么很多情況下Signal調用都是無意義的,系統將其忽略。如何使Signal操作變得有意義,即每次Signal操作都會喚醒阻塞線程,最初的想法是在m_nCount == 0時即隊列為空時進行調用,根據這個想法修改Add函數如下:
1 template<class T> 2 void BlockQueue<T>::Add(T *pData) 3 { 4 (m_condQueue.GetMutex()).EnterMutex(); 5 6 Node *pTmp = new Node; 7 pTmp->m_pData = pData; 8 pTmp->m_pNext = NULL; 9 10 bool bEmpty = false; 11 if (m_nCount == 0) 12 { 13 m_pHead = pTmp; 14 m_pTail = pTmp; 15 bEmpty = true; 16 } 17 else 18 { 19 m_pTail->m_pNext = pTmp; 20 m_pTail = pTmp; 21 } 22 m_nCount++; 23 24 if (m_bBlockFlag && bEmpty) 25 { 26 m_condQueue.Signal(); 27 } 28 29 (m_condQueue.GetMutex()).LeaveMutex(); 30 }目前來看這樣處理沒什么邏輯問題,很快發現這樣做還是會有一些Signal操作時無意義的,比如當讀線程讀取到隊列中最后一個元素時,m_nCount--,值為零,那么此時寫線程調用Add函數,因為m_nCount==0,調用了Signal操作,而這時如果沒有讀線程在阻塞,那么這個操作仍然會被系統忽略。仔細想想,這里不能根據m_nCount的值來判斷是否有線程在等待,而應該以是否有隊列阻塞來進行Signal的調用。這里可以用一個計數器來表示當前阻塞在Get操作上的線程個數,只有當它大于零時,才說明需要激活,根據這個條件,繼續修改代碼,下面只展示修改后與之前相比的差異代碼:
1 template<class T> 2 class BlockQueue 3 { 4 public: 5 /*與之前代碼相同*/ 6 private: 7 /*與之前代碼相同*/ 8 int m_nCount; 9 int m_nBlockCnt; //阻塞線程數 10 bool m_bBlockFlag; //是否阻塞 11 CCond m_condQueue; 12 }; 13 14 template<class T> 15 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_nBlockCnt(0), m_bBlockFlag(false) 16 {} 17 18 template<class T> 19 void BlockQueue<T>::Add(T *pData) 20 { 21 (m_condQueue.GetMutex()).EnterMutex(); 22 23 Node *pTmp = new Node; 24 pTmp->m_pData = pData; 25 pTmp->m_pNext = NULL; 26 27 if (m_nCount == 0) 28 { 29 m_pHead = pTmp; 30 m_pTail = pTmp; 31 bEmpty = true; 32 } 33 else 34 { 35 m_pTail->m_pNext = pTmp; 36 m_pTail = pTmp; 37 } 38 m_nCount++; 39 40 if (m_bBlockFlag && m_nBlockCnt>0) //阻塞模式,并且有線程等待時調用Signal進行喚醒 41 { 42 m_condQueue.Signal(); 43 } 44 45 (m_condQueue.GetMutex()).LeaveMutex(); 46 } 47 48 template<class T> 49 T *BlockQueue<T>::Get(int timeout) 50 { 51 (m_condQueue.GetMutex()).EnterMutex(); 52 53 while (m_nCount == 0) 54 { 55 if (!m_bBlockFlag) 56 { 57 (m_condQueue.GetMutex()).LeaveMutex(); 58 return NULL; 59 } 60 else 61 { 62 m_nBlockCnt++; //線程阻塞數增加 63 if (m_condQueue.WaitNoLock(timeout) == 1) 64 { 65 (m_condQueue.GetMutex()).LeaveMutex(); 66 m_nBlockCnt--; 67 return NULL; 68 } 69 m_nBlockCnt--; //喚醒后線程阻塞數減少 70 } 71 } 72 73 T * pTmpData = m_pHead->m_pData; 74 Node *pTmp = m_pHead; 75 76 m_pHead = m_pHead->m_pNext; 77 delete pTmp; 78 pTmp = NULL; 79 m_nCount--; 80 if (m_nCount == 0) 81 { 82 m_pTail = NULL; 83 } 84 85 (m_condQueue.GetMutex()).LeaveMutex(); 86 87 return pTmpData; 88 }? 現在代碼已經修改完成,繼續分析可以看出幾乎對每個函數的操作都進行了加鎖保護,臨界區從始到末,粒度較大;再者,隊列中持有的T對象指針,均是由調用者動態分配和釋放的,而內部也是有鏈表實現,Add和Get操作時,會調用鏈表元素的內存分配和釋放。如此一來,在交易量很大的情況下,頻繁讀寫,不僅會導致加鎖解鎖的性能消耗,也會使系統產生大量內存碎片。
未完待續……
posted on 2014-11-18 16:21 Tourun 閱讀(...) 評論(...) 編輯 收藏轉載于:https://www.cnblogs.com/Tour/p/4106085.html
總結
以上是生活随笔為你收集整理的生产者/消费者模式(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 说一下安卓的touch事件分发机制
- 下一篇: QT中循环显示图片和简单的显示图片