多进程生产者消费者框架设计
前言
介紹了進程如何基于面向對象的封裝,本章我們基于封裝好的Process類來實現一種無鎖版的生產者和消費者框架,用它實現了高性能文件拷貝功能。讀這篇文章之前大家可以想一下如果是你,你會怎么設計這樣的框架?在這個模型中作為每個生產者,如何把讀取到內容發送給消費者。而作為消費者,如何把接收到的內容寫入文件,并且最終保證文件內容是一致的。
好了,廢話不多說,我們來通過代碼看下是怎么設計的吧?
生產者消費者模型
我們回顧一下什么是生產者消費者模型。這個模型是為了解決在整個程序過程中既要不斷產生數據,又要處理數據的一種方案:生產者專注產生數據,消費者專注處理數據。
但如果模型中生產者產生數據很快,消費者處理數據很慢,那么必然會出現生產者要等待消費者處理完以后,才能繼續生產。或者反之也是類似,消費者則會等到生產者生產完才去處理。這樣兩者就形成了依賴關系,無法做到高性能,需要在兩者之間加入緩沖的區域,讓它們不再有直接相互依賴的情況,如下圖所示:
生產者消費者進程
結合上述對于多進程的封裝以及生產者消費者的模型,該框架的設計關鍵點如下:
1.采用多個生產者和消費者都為進程。將Process作為基類,設計Process的繼承類生產者進程類(ProducerProcess)和消費者進程類(ConsumerProcess)
2.兩者之間的數據緩沖的通道則基于對無名管道的封裝設計為一個緩存通道類,讓生產者進程和消費者進程共用,并且生產者只往通道里放入數據,消費者只從通道里獲取數據。
3.考慮到具體過程是文件處理,需要設計一種數據結構,用于生產者和消費者之間的數據傳遞。
發送數據結構設計
1.需要發送到管道的數據用Producer結構體來存放,并且通過內存拷貝來傳輸和接受數據。因為多進程之間指針是很難共享的,這里采用相對簡單的方式是通過進程間通信的辦法,把指針指向的內容傳遞過去。
2.結構體內要有一個讀/寫數據長度的成員變量。因為無法預設要拷貝的文件會有多大,所以不能認為只通過一兩次讀取文件就能拿到它的所有內容了。那么就得把讀取文件的過程設計為每次不斷讀取固定長度的數據,直到讀到文件末尾為止。
具體代碼如下:
typedef struct Producer{size_t length;size_t readoffset;char *readbuf;}Producer;typedef struct Consumer{size_t length;size_t writeoffset;char *writebuf;}Consumer;緩沖通道設計
1.如果單次傳輸數據量大于了管道的大小,就會出問題,因此對于上訴數據傳遞結構體中的數據長度要小于管道最大存儲值(默認是4k)。
2.需要封裝管道的讀端寫端的打開/關閉,判斷是否已打開等。
具體代碼如下:
class Tunnel{public:Tunnel();~Tunnel();bool isOpen() const;void open();void closeWriteDescriptor();void closeReadDescriptor();ssize_t read(char *buf, size_t count);ssize_t write(char *buf, size_t count);inline int getReadDescriptor(){return m_pipeFd[0];}inline int getWriteDescriptor(){return m_pipeFd[1];}private:int m_pipeFd[2];};讀文件設計
這里有個問題,既然每個生產者進程的業務邏輯都是先讀了源文件的一部分后再向管道發送讀到的數據。那么到下一次讀源文件的時候,要偏移多少呢?在這里,多個生產者進程讀取文件的設計上為了減少同步機制造成的操作系統開銷,采取每個進程從不同初始偏移量開始讀取的方式。初始偏移量=index??m_maxlength。index為進程序列號,比如共有5個進程,index為0、1、2、3、4 ,m_maxlength為最大讀取長度。到下一次再去讀取源文件的時候,偏移量offset又從原有的offset增加了m_maxProducer??m_maxlength。其中的m_maxProducer是類ConfigManager的成員變量,從輸入參數中獲取。
此外,還采用pread接口讀取文件,其實際操作是lseek+read,即先將offset調整到指定值,再調用read讀取數據。它同read的不同之處在于它是原子操作,即pread操作前后的offset是一致的。
讀文件大小超過管道大小遇到的問題
若管道大小不是結構體Producer大小的整數倍,必然會出現生產者進程寫入管道的數據大于管道大小。那么對于消費者進程來說,讀取的數據大小是固定長度的,那么必然會出現它沒有讀完,只讀取到管道大小數據就直接返回。按照Linux公平調度原則,這個消費者沒辦法馬上讀取剩余數據,只能通過read的返回值知道還有多少沒有讀。當管道再次未滿時,未發送完數據的生產者進程從阻塞態轉為運行態,把剩余數據繼續發送到管道并返回。而其它生產者進程會繼續把自己的Producer數據繼續寫管道。這樣又會造成一個問題。消費者按照固定長度讀取,必然會把前一個生產者寫入管道的剩余數據和后面生產者寫入管道的數據一同讀取。從而造成了內容錯亂問題。怎么辦呢?設計中我們把Producer的大小定義為管道大小的1/2。
源碼鏈接
源碼鏈接
總結
以上是生活随笔為你收集整理的多进程生产者消费者框架设计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: headroom.js_使用Headro
- 下一篇: go语言发送微信小程序模板消息