Linux信号量
文章目錄
- POSIX信號量
- 信號量的原理
- 信號量的概念
- 信號量函數
 
- 二元信號量模擬實現互斥功能
- 基于環形隊列的生產消費模型
- 空間資源和數據資源
- 生產者和消費者申請和釋放資源
- 必須遵守的兩個規則
- 代碼實現
- 信號量保護環形隊列的原理
 
POSIX信號量
信號量的原理
- 我們將可能會被多個執行流同時訪問的資源叫做臨界資源,臨界資源需要進行保護否則會出現數據不一致等問題。
- 當我們僅用一個互斥鎖對臨界資源進行保護時,相當于我們將這塊臨界資源看作一個整體,同一時刻只允許一個執行流對這塊臨界資源進行訪問。
- 但實際我們可以將這塊臨界資源再分割為多個區域,當多個執行流需要訪問臨界資源時,如果這些執行流訪問的是臨界資源的不同區域,那么我們可以讓這些執行流同時訪問臨界資源的不同區域,此時不會出現數據不一致等問題。
信號量的概念
信號量(信號燈)本質是一個計數器,是描述臨界資源中資源數目的計數器,信號量能夠更細粒度的對臨界資源進行管理。
每個執行流在進入臨界區之前都應該先申請信號量,申請成功就有了操作特點的臨界資源的權限,當操作完畢后就應該釋放信號量。
 
 信號量的PV操作:
- P操作:我們將申請信號量稱為P操作,申請信號量的本質就是申請獲得臨界資源中某塊資源的使用權限,當申請成功時臨界資源中資源的數目應該減一,因此P操作的本質就是讓計數器減一。
- V操作:我們將釋放信號量稱為V操作,釋放信號量的本質就是歸還臨界資源中某塊資源的使用權限,當釋放成功時臨界資源中資源的數目就應該加一,因此V操作的本質就是讓計數器加一。
PV操作必須是原子操作
多個執行流為了訪問臨界資源會競爭式的申請信號量,因此信號量是會被多個執行流同時訪問的,也就是說信號量本質也是臨界資源。
但信號量本質就是用于保護臨界資源的,我們不可能再用信號量去保護信號量,所以信號量的PV操作必須是原子操作。
注意: 內存當中變量的++、--操作并不是原子操作,因此信號量不可能只是簡單的對一個全局變量進行++、--操作。
申請信號量失敗被掛起等待
當執行流在申請信號量時,可能此時信號量的值為0,也就是說信號量描述的臨界資源已經全部被申請了,此時該執行流就應該在該信號量的等待隊列當中進行等待,直到有信號量被釋放時再被喚醒。
注意: 信號量的本質是計數器,但不意味著只有計數器,信號量還包括一個等待隊列。
信號量函數
初始化信號量
初始化信號量的函數叫做sem_init,該函數的函數原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);參數說明:
- sem:需要初始化的信號量。
- pshared:傳入0值表示線程間共享,傳入非零值表示進程間共享。
- value:信號量的初始值(計數器的初始值)。
返回值說明:
- 初始化信號量成功返回0,失敗返回-1。
注意: POSIX信號量和System V信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的,但POSIX信號量可以用于線程間同步。
銷毀信號量
銷毀信號量的函數叫做sem_destroy,該函數的函數原型如下:
int sem_destroy(sem_t *sem);參數說明:
- sem:需要銷毀的信號量。
返回值說明:
- 銷毀信號量成功返回0,失敗返回-1。
等待信號量(申請信號量)
等待信號量的函數叫做sem_wait,該函數的函數原型如下:
int sem_wait(sem_t *sem);參數說明:
- sem:需要等待的信號量。
返回值說明:
- 等待信號量成功返回0,信號量的值減一。
- 等待信號量失敗返回-1,信號量的值保持不變。
發布信號量(釋放信號量)
發布信號量的函數叫做sem_post,該函數的函數原型如下:
int sem_post(sem_t *sem);參數說明:
- sem:需要發布的信號量。
返回值說明:
- 發布信號量成功返回0,信號量的值加一。
- 發布信號量失敗返回-1,信號量的值保持不變。
二元信號量模擬實現互斥功能
信號量本質是一個計數器,如果將信號量的初始值設置為1,那么此時該信號量叫做二元信號量。
信號量的初始值為1,說明信號量所描述的臨界資源只有一份,此時信號量的作用基本等價于互斥鎖。
例如,下面我們實現一個多線程搶票系統,其中我們用二元信號量模擬實現多線程互斥。
我們在主線程當中創建四個新線程,讓這四個新線程執行搶票邏輯,并且每次搶完票后打印輸出此時剩余的票數,其中我們用全局變量tickets記錄當前剩余的票數,此時tickets是會被多個執行流同時訪問的臨界資源,在下面的代碼中我們并沒有對tickets進行任何保護操作。
#include <iostream> #include <string> #include <unistd.h> #include <pthread.h>int tickets = 2000; void* TicketGrabbing(void* arg) {std::string name = (char*)arg;while (true){if (tickets > 0){usleep(1000);std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;}else{break;}}std::cout << name << " quit..." << std::endl;pthread_exit((void*)0); }int main() {pthread_t tid1, tid2, tid3, tid4;pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0; }運行代碼后可以看到,線程打印輸出剩余票數時出現了票數剩余為負數的情況,這是不符合我們預期的。
 
 下面我們在搶票邏輯當中加入二元信號量,讓每個線程在訪問全局變量tickets之前先申請信號量,訪問完畢后再釋放信號量,此時二元信號量就達到了互斥的效果。
運行代碼后就不會出現剩余票數為負的情況了,因為此時同一時刻只會有一個執行流對全局變量tickets進行訪問,不會出現數據不一致的問題。
 
基于環形隊列的生產消費模型
空間資源和數據資源
生產者關注的是空間資源,消費者關注的是數據資源
對于生產者和消費者來說,它們關注的資源是不同的:
- 生產者關注的是環形隊列當中是否有空間(blank),只要有空間生產者就可以進行生產。
- 消費者關注的是環形隊列當中是否有數據(data),只要有數據消費者就可以進行消費。
blank_sem和data_sem的初始值設置
現在我們用信號量來描述環形隊列當中的空間資源(blank_sem)和數據資源(data_sem),在我們初始信號量時給它們設置的初始值是不同的:
- blank_sem的初始值我們應該設置為環形隊列的容量,因為剛開始時環形隊列當中全是空間。
- data_sem的初始值我們應該設置為0,因為剛開始時環形隊列當中沒有數據。
生產者和消費者申請和釋放資源
生產者申請空間資源,釋放數據資源
對于生產者來說,生產者每次生產數據前都需要先申請blank_sem:
- 如果blank_sem的值不為0,則信號量申請成功,此時生產者可以進行生產操作。
- 如果blank_sem的值為0,則信號量申請失敗,此時生產者需要在blank_sem的等待隊列下進行阻塞等待,直到環形隊列當中有新的空間后再被喚醒。
當生產者生產完數據后,應該釋放data_sem:
- 雖然生產者在進行生產前是對blank_sem進行的P操作,但是當生產者生產完數據,應該對data_sem進行V操作而不是blank_sem。
- 生產者在生產數據前申請到的是blank位置,當生產者生產完數據后,該位置當中存儲的是生產者生產的數據,在該數據被消費者消費之前,該位置不再是blank位置,而應該是data位置。
- 當生產者生產完數據后,意味著環形隊列當中多了一個data位置,因此我們應該對data_sem進行V操作。
消費者申請數據資源,釋放空間資源
對于消費者來說,消費者每次消費數據前都需要先申請data_sem:
- 如果data_sem的值不為0,則信號量申請成功,此時消費者可以進行消費操作。
- 如果data_sem的值為0,則信號量申請失敗,此時消費者需要在data_sem的等待隊列下進行阻塞等待,直到環形隊列當中有新的數據后再被喚醒。
當消費者消費完數據后,應該釋放blank_sem:
- 雖然消費者在進行消費前是對data_sem進行的P操作,但是當消費者消費完數據,應該對blank_sem進行V操作而不是data_sem。
- 消費者在消費數據前申請到的是data位置,當消費者消費完數據后,該位置當中的數據已經被消費過了,再次被消費就沒有意義了,為了讓生產者后續可以在該位置生產新的數據,我們應該將該位置算作blank位置,而不是data位置。
- 當消費者消費完數據后,意味著環形隊列當中多了一個blank位置,因此我們應該對blank_sem進行V操作。
必須遵守的兩個規則
在基于環形隊列的生產者和消費者模型當中,生產者和消費者必須遵守如下兩個規則。
第一個規則:生產者和消費者不能對同一個位置進行訪問。
生產者和消費者在訪問環形隊列時:
- 如果生產者和消費者訪問的是環形隊列當中的同一個位置,那么此時生產者和消費者就相當于同時對這一塊臨界資源進行了訪問,這當然是不允許的。
- 而如果生產者和消費者訪問的是環形隊列當中的不同位置,那么此時生產者和消費者是可以同時進行生產和消費的,此時不會出現數據不一致等問題。
如下圖:
 
第二個規則:無論是生產者還是消費者,都不應該將對方套一個圈以上。
- 生產者從消費者的位置開始一直按順時針方向進行生產,如果生產者生產的速度比消費者消費的速度快,那么當生產者繞著消費者生產了一圈數據后再次遇到消費者,此時生產者就不應該再繼續生產了,因為再生產就會覆蓋還未被消費者消費的數據。
- 同理,消費者從生產者的位置開始一直按順時針方向進行消費,如果消費者消費的速度比生產者生產的速度快,那么當消費者繞著生產者消費了一圈數據后再次遇到生產者,此時消費者就不應該再繼續消費了,因為再消費就會消費到緩沖區中保存的廢棄數據。
如下圖:
 
代碼實現
其中的RingQueue就是生產者消費者模型當中的交易場所,我們可以用C++STL庫當中的vector進行實現。
#pragma once#include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <vector>#define NUM 8template<class T> class RingQueue { private://P操作void P(sem_t& s){sem_wait(&s);}//V操作void V(sem_t& s){sem_post(&s);} public:RingQueue(int cap = NUM): _cap(cap), _p_pos(0), _c_pos(0){_q.resize(_cap);sem_init(&_blank_sem, 0, _cap); //blank_sem初始值設置為環形隊列的容量sem_init(&_data_sem, 0, 0); //data_sem初始值設置為0}~RingQueue(){sem_destroy(&_blank_sem);sem_destroy(&_data_sem);}//向環形隊列插入數據(生產者調用)void Push(const T& data){P(_blank_sem); //生產者關注空間資源_q[_p_pos] = data;V(_data_sem); //生產//更新下一次生產的位置_p_pos++;_p_pos %= _cap;}//從環形隊列獲取數據(消費者調用)void Pop(T& data){P(_data_sem); //消費者關注數據資源data = _q[_c_pos];V(_blank_sem);//更新下一次消費的位置_c_pos++;_c_pos %= _cap;} private:std::vector<T> _q; //環形隊列int _cap; //環形隊列的容量上限int _p_pos; //生產位置int _c_pos; //消費位置sem_t _blank_sem; //描述空間資源sem_t _data_sem; //描述數據資源 };相關說明:
- 當不設置環形隊列的大小時,我們默認將環形隊列的容量上限設置為8。
- 代碼中的RingQueue是用vector實現的,生產者每次生產的數據放到vector下標為p_pos的位置,消費者每次消費的數據來源于vector下標為c_pos的位置。
- 生產者每次生產數據后p_pos都會進行++,標記下一次生產數據的存放位置,++后的下標會與環形隊列的容量進行取模運算,實現“環形”的效果。
- 消費者每次消費數據后c_pos都會進行++,標記下一次消費數據的來源位置,++后的下標會與環形隊列的容量進行取模運算,實現“環形”的效果。
- p_pos只會由生產者線程進行更新,c_pos只會由消費者線程進行更新,對這兩個變量訪問時不需要進行保護,因此代碼中將p_pos和c_pos的更新放到了V操作之后,就是為了盡量減少臨界區的代碼。
為了方便理解,我們這里實現單生產者、單消費者的生產者消費者模型。于是在主函數我們就只需要創建一個生產者線程和一個消費者線程,生產者線程不斷生產數據放入環形隊列,消費者線程不斷從環形隊列里取出數據進行消費。
#include "RingQueue.hpp"void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} } int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;RingQueue<int>* rq = new RingQueue<int>;pthread_create(&producer, nullptr, Producer, rq);pthread_create(&consumer, nullptr, Consumer, rq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete rq;return 0; }相關說明:
- 環形隊列要讓生產者線程向隊列中Push數據,讓消費者線程從隊列中Pop數據,因此這個環形隊列必須要讓這兩個線程同時看到,所以我們在創建生產者線程和消費者線程時,需要將環形隊列作為線程執行例程的參數進行傳入。
- 代碼中生產者生產數據就是將獲取到的隨機數Push到環形隊列,而消費者就是從環形隊列Pop數據,為了便于觀察,我們可以將生產者生產的數據和消費者消費的數據進行打印輸出。
生產者消費者步調一致
由于代碼中生產者是每隔一秒生產一個數據,而消費者是每隔一秒消費一個數據,因此運行代碼后我們可以看到生產者和消費者的執行步調是一致的。
 
生產者生產的快,消費者消費的慢
我們可以讓生產者不停的進行生產,而消費者每隔一秒進行消費。
void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }此時由于生產者生產的很快,運行代碼后一瞬間生產者就將環形隊列打滿了,此時生產者想要再進行生產,但空間資源已經為0了,于是生產者只能在blank_sem的等待隊列下進行阻塞等待,直到由消費者消費完一個數據后對blank_sem進行了V操作,生產者才會被喚醒進而繼續進行生產。
但由于生產者的生產速度很快,生產者生產完一個數據后又會進行等待,因此后續生產者和消費者的步調又變成一致的了。
 
生產者生產的慢,消費者消費的快
當然我們也可以讓生產者每隔一秒進行生產,而消費者不停的進行消費。
void* Producer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){sleep(1);int data = rand() % 100 + 1;rq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {RingQueue<int>* rq = (RingQueue<int>*)arg;while (true){int data = 0;rq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }雖然消費者消費的很快,但一開始環形隊列當中的數據資源為0,因此消費者只能在data_sem的等待隊列下進行阻塞等待,直到生產者生產完一個數據后對data_sem進行了V操作,消費者才會被喚醒進而進行消費。
但由于消費者的消費速度很快,消費者消費完一個數據后又會進行等待,因此后續生產者和消費者的步調又變成一致的了。
 
信號量保護環形隊列的原理
在blank_sem和data_sem兩個信號量的保護后,該環形隊列中不可能會出現數據不一致的問題。
因為只有當生產者和消費者指向同一個位置并訪問時,才會導致數據不一致的問題,而此時生產者和消費者在對環形隊列進行寫入或讀取數據時,只有兩種情況會指向同一個位置:
- 環形隊列為空時。
- 環形隊列為滿時。
但是在這兩種情況下,生產者和消費者不會同時對環形隊列進行訪問:
- 當環形隊列為空的時,消費者一定不能進行消費,因為此時數據資源為0。
- 當環形隊列為滿的時,生產者一定不能進行生產,因為此時空間資源為0。
也就是說,當環形隊列為空和滿時,我們已經通過信號量保證了生產者和消費者的串行化過程。而除了這兩種情況之外,生產者和消費者指向的都不是同一個位置,因此該環形隊列當中不可能會出現數據不一致的問題。并且大部分情況下生產者和消費者指向并不是同一個位置,因此大部分情況下該環形隊列可以讓生產者和消費者并發的執行
總結
 
                            
                        - 上一篇: KITTI数据集解析和可视化
- 下一篇: c语言屏蔽一段程序,C语言#if 0阻止
