操作系统:线程同步
操作系統:線程同步
使用Linux無名信號量實現了讀寫者線程的互斥和同步。
實驗環境
- 環境:Linux
- 語言:C
- CMake:3.17.1
- GCC:7.5.0
- IDE:Clion 2020.3.1
實驗目標
實驗內容
以“生產者-消費者模型”為基礎,在Linux環境下創建一個控制進程,在該進程中創建讀者寫者線程各一個,模擬生產者和消費者,它們使用N個不同的緩沖區(N為一個確定的數值,本實驗中取N=16)。寫者線程寫入數據,然后將數據放置在空緩沖區中供讀者線程讀取。讀者線程從緩沖區中獲得數據,然后釋放緩沖區。當寫者線程寫入數據時,如果沒有空緩沖區可用,那么寫者線程必須等待讀者線程釋放出一個空緩沖區。當讀者線程讀取數據時,如果沒有滿的緩沖區,那么讀者線程將被阻塞,直到緩沖區被寫者線程寫滿。
可以作如下的實驗嘗試,并觀察和記錄進程同步效果:
實驗過程和結果
函數介紹
pthread.h
#include <pthread.h>/* Create a new thread, starting with execution of START-ROUTINEgetting passed ARG. Creation attributed come from ATTR. The newhandle is stored in *NEWTHREAD. */ extern int pthread_create (pthread_t *__restrict __newthread,const pthread_attr_t *__restrict __attr,void *(*__start_routine) (void *),void *__restrict __arg) __THROWNL __nonnull ((1, 3));/* Terminate calling thread.The registered cleanup handlers are called via exception handlingso we cannot mark this function with __THROW.*/ extern void pthread_exit (void *__retval) __attribute__ ((__noreturn__));/* Make calling thread wait for termination of the thread TH. Theexit status of the thread is stored in *THREAD_RETURN, if THREAD_RETURNis not NULL.This function is a cancellation point and therefore not marked with__THROW. */ extern int pthread_join (pthread_t __th, void **__thread_return);/* Obtain the identifier of the current thread. */ extern pthread_t pthread_self (void) __THROW __attribute__ ((__const__));/* Thread identifiers. The structure of the attribute type is notexposed on purpose. */ typedef unsigned long int pthread_t;semaphore.h
在 POSIX 標準中,信號量分兩種,一種是無名信號量,一種是有名信號量。無名信號量一般用于線程間同步或互斥,而有名信號量一般用于進程間同步或互斥。它們的區別和管道及命名管道的區別類似,無名信號量則直接保存在內存中,而有名信號量要求創建一個文件。
無名信號量需要semaphore.h頭文件,有名信號量則需要sys/sem.h頭文件。本文實現線程間同步和互斥,故使用更為簡便的無名信號量。
#include <semaphore.h>typedef union {char __size[__SIZEOF_SEM_T];long int __align; } sem_t;/* Initialize semaphore object SEM to VALUE. If PSHARED then share itwith other processes. */ extern int sem_init (sem_t *__sem, int __pshared, unsigned int __value)__THROW;/* Free resources associated with semaphore object SEM. */ extern int sem_destroy (sem_t *__sem) __THROW;This function is a cancellation point and therefore not marked with__THROW. */ extern int sem_wait (sem_t *__sem);/* Post SEM. */ extern int sem_post (sem_t *__sem) __THROWNL;CMake
# based on clion cmake_minimum_required(VERSION 3.17)set(CMAKE_CXX_STANDARD 14)find_package(Threads REQUIRED) add_executable(thread main.c ${SOURCE_FILES}) target_link_libraries(thread Threads::Threads)信號量用于互斥與同步
一、讀、寫者互斥
創建讀者寫者void *Read(void *arg)和void *Write(void *arg),由于只能有一個讀者、一個寫者,可知讀者之間是互斥的、寫者之間也是互斥的,故設置信號量sem_t read_sem和sem_t write_sem. 設置并初始化全局變量int buffer = 0;模擬緩沖區的寫入和讀出。設置并初始化int read_write_flag[2] = {1, 1};判斷是否發生讀寫沖突。
// // Created by Sylvan Ding on 2022/5/10. //#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h>#define N 16 #define MAX_PROCESS_NUM 100int buffer = 0; int read_write_flag[2] = {1, 1};// sem sem_t read_sem; sem_t write_sem;void *Read(void *arg) {sem_wait(&read_sem);read_write_flag[0] = 0;int buf_temp;int duration = rand() % 4;pthread_t tid = pthread_self();printf("\033[32m[R] tid: %lu starts reading...\033[0m\n", tid);if (!read_write_flag[1])printf("\033[31m[R] tid: %lu conflicts with writing process!\033[0m\n", tid);// readbuf_temp = buffer;sleep(duration);buffer = 0;printf("[R] tid: %lu has read all %d buffers!\n", tid, buf_temp);if (buf_temp < N)printf("\033[33m[R] tid: %lu did not read entire 16 buffers!\033[0m\n", tid);read_write_flag[0] = 1;sem_post(&read_sem);return NULL; }void *Write(void *arg) {sem_wait(&write_sem);read_write_flag[1] = 0;int res;int buf_temp;int size = rand() % 16 + 1;int duration = rand() % 5;pthread_t tid = pthread_self();printf("\033[32m[W] tid: %lu starts writing...\033[0m\n", tid);// writebuf_temp = buffer;res = size + buf_temp;sleep(duration);if (res > N)buffer = 16;elsebuffer = res;printf("[W] tid: %lu has written %d buffers, with %d buffers occupied!\n", tid, res > N ? N - buf_temp : size,buffer);read_write_flag[1] = 1;sem_post(&write_sem);return NULL; }int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];// init semif (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");exit(0);}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");exit(0);}for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2)target = Read;elsetarget = Write;res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);return 0; }二、讀寫者互斥
為實現讀、寫互斥,設置mutex信號量,編寫void *Read1(void *arg)和void *Write1(void *arg).
sem_t mutex;void *Read1(void *arg) {sem_wait(&mutex);Read(arg);sem_post(&mutex);return NULL; }void *Write1(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);return NULL; }void all_sem_init() {// init semint res = 0;if (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");res = -1;}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");res = -1;}if (sem_init(&mutex, 0, 1) < 0) {printf("failed to init mutex");res = -1;}if (res)exit(0); }void all_sem_des() {// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);sem_destroy(&mutex); }int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];all_sem_init();for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2) // target = Read;target = Read1;else // target = Write;target = Write1;res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);all_sem_des();return 0; }三、讀寫者同步
為了解決在緩沖區未滿時讀進程就讀出緩沖區的問題,設置full信號量。同時,在緩沖區寫滿時,阻塞寫進程,設置empty信號量。二者均初始化為0.
sem_t full; sem_t empty;void all_sem_init() {// init semint res = 0;// ...if (sem_init(&full, 0, 0) < 0) {printf("failed to init full");res = -1;}if (sem_init(&empty, 0, 0) < 0) {printf("failed to init empty");res = -1;}if (res)exit(0); }void all_sem_des() {// destroy sem// ...sem_destroy(&full);sem_destroy(&empty); }void *Read2(void *arg) {sem_wait(&full);Read(arg);sem_post(&empty);return NULL; }void *Write2(void *arg) {Write(arg);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL; }int main(int argc, char *argv[]) {// ...for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2) { // target = Read; // target = Read1;target = Read2;} else { // target = Write; // target = Write1;target = Write2;}// ... }四、讀寫者同步且互斥
結合二、三的信號量,實現讀寫同步且互斥。
void *Read3(void *arg) {sem_wait(&full);sem_wait(&mutex);Read(arg);sem_post(&mutex);sem_post(&empty);return NULL; }void *Write3(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL; }改進
因為只有一個讀者、一個寫者進程,并非多讀寫者,所以可以不用在main()中開多個線程,可以只創建read()和write()兩線程,然后在線程內部使用while循環去訪問buffer. 這個工作就留給讀者自己去實現啦~😄
附錄(完整實驗代碼)
// // Created by Sylvan Ding on 2022/5/10. //#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h>#define N 16 #define MAX_PROCESS_NUM 100int buffer = 0; int read_write_flag[2] = {1, 1};// sem sem_t read_sem; sem_t write_sem; sem_t mutex; sem_t full; sem_t empty;void *Read(void *arg) {sem_wait(&read_sem);read_write_flag[0] = 0;int buf_temp;int duration = rand() % 4;pthread_t tid = pthread_self();printf("\033[32m[R] tid: %lu starts reading...\033[0m\n", tid);if (!read_write_flag[1])printf("\033[31m[R] tid: %lu conflicts with writing process!\033[0m\n", tid);// readbuf_temp = buffer;sleep(duration);buffer = 0;printf("[R] tid: %lu has read all %d buffers!\n", tid, buf_temp);if (buf_temp < N)printf("\033[33m[R] tid: %lu did not read entire 16 buffers!\033[0m\n", tid);read_write_flag[0] = 1;sem_post(&read_sem);return NULL; }void *Write(void *arg) {sem_wait(&write_sem);read_write_flag[1] = 0;int res;int buf_temp;int size = rand() % 16 + 1;int duration = rand() % 5;pthread_t tid = pthread_self();printf("\033[32m[W] tid: %lu starts writing...\033[0m\n", tid);// writebuf_temp = buffer;res = size + buf_temp;sleep(duration);if (res > N)buffer = 16;elsebuffer = res;printf("[W] tid: %lu has written %d buffers, with %d buffers occupied!\n", tid, res > N ? N - buf_temp : size,buffer);read_write_flag[1] = 1;sem_post(&write_sem);return NULL; }void *Read1(void *arg) {sem_wait(&mutex);Read(arg);sem_post(&mutex);return NULL; }void *Write1(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);return NULL; }void *Read2(void *arg) {sem_wait(&full);Read(arg);sem_post(&empty);return NULL; }void *Write2(void *arg) {Write(arg);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL; }void *Read3(void *arg) {sem_wait(&full);sem_wait(&mutex);Read(arg);sem_post(&mutex);sem_post(&empty);return NULL; }void *Write3(void *arg) {sem_wait(&mutex);Write(arg);sem_post(&mutex);if (buffer >= N) {sem_post(&full);sem_wait(&empty);}return NULL; }void all_sem_init() {// init semint res = 0;if (sem_init(&read_sem, 0, 1) < 0) {printf("failed to init read_sem");res = -1;}if (sem_init(&write_sem, 0, 1) < 0) {printf("failed to init write_sem");res = -1;}if (sem_init(&mutex, 0, 1) < 0) {printf("failed to init mutex");res = -1;}if (sem_init(&full, 0, 0) < 0) {printf("failed to init full");res = -1;}if (sem_init(&empty, 0, 0) < 0) {printf("failed to init empty");res = -1;}if (res)exit(0); }void all_sem_des() {// destroy semsem_destroy(&read_sem);sem_destroy(&write_sem);sem_destroy(&mutex);sem_destroy(&full);sem_destroy(&empty); }int main(int argc, char *argv[]) {srand(888);int res = 0;int randm = 0;void *(*target)(void *) =NULL;pthread_t myThreads[MAX_PROCESS_NUM];all_sem_init();for (int i = 0; i < MAX_PROCESS_NUM; ++i) {randm = rand() % 4;if (i % 2) { // target = Read; // target = Read1; // target = Read2;target = Read3;} else { // target = Write; // target = Write1; // target = Write2;target = Write3;}res = pthread_create(&myThreads[i], NULL, target, NULL);if (res < 0) {printf("failed to create the thread!");exit(0);}sleep(randm);}void *thread_result = NULL;for (int i = 0; i < MAX_PROCESS_NUM; ++i)res = pthread_join(myThreads[i], &thread_result);all_sem_des();return 0; }版權聲明:原創文章,轉載請注明出處 ?? Sylvan Ding
參考文獻
總結
- 上一篇: 红旗linux集采,计算机操作系统教程(
- 下一篇: Linux系统安装Zookeeper