rw_semaphore 原理与代码分析
代碼基于內核5.10版本
信號量的進階形式,對讀者寫者進行區分,寫者時互斥的,寫者持鎖時其余的寫者和讀者都只能等待。讀者是允許并發的,讀者持鎖時允許其他的讀者持鎖,但是寫者必須等待。
后續的分析不涉及死鎖檢測,也就是lockdep部分。
常用的API如下:
| void init_rwsem(struct rw_semaphore *sem); | 對讀寫信號量sem進行初始化。 |
| void down_read(struct rw_semaphore *sem); | 讀者用來獲取sem,若沒獲得時,則調用者睡眠等待。 |
| void up_read(struct rw_semaphore *sem); | 讀者釋放sem。 |
| int down_read_trylock(struct rw_semaphore *sem); | 讀者嘗試獲取sem,如果獲得返回1,如果沒有獲得返回0。可在中斷上下文使用。 |
| void down_write(struct rw_semaphore *sem); | 寫者用來獲取sem,若沒獲得時,則調用者睡眠等待。 |
| int down_write_trylock(struct rw_semaphore *sem); | 寫者嘗試獲取sem,如果獲得返回1,如果沒有獲得返回0。可在中斷上下文使用 |
| void up_write(struct rw_semaphore *sem); | 寫者釋放sem。 |
| void downgrade_write(struct rw_semaphore *sem); | 把寫者降級為讀者。 |
急用請看這
說實話,這部分機制還是比較難的,并且如果不是需要去深入研究其機制,完全沒必要了解這么多。只需要簡單debug,我這個給出一個快速的總結
struct rw_semaphore {atomic_long_t count; //用于記錄鎖的狀態atomic_long_t owner; //持鎖的寫者或者其中一個讀者struct optimistic_spin_queue osq; //用于樂觀自旋raw_spinlock_t wait_lock; //保護wait_list結構的spinlockstruct list_head wait_list; //等鎖者鏈表void *magic; //魔術數,記錄鎖的地址,當其被改變時可以認為鎖的數據結構被破壞struct lockdep_map dep_map; //死鎖檢測,暫不關注 } count bit* Bit 0 - writer locked bit* Bit 1 - waiters present bit* Bit 2 - lock handoff bit* Bits 3-7 - reserved* Bits 8-62 - 55-bit reader count* Bit 63 - read fail bit遇到讀寫鎖相關問題,那大概率是發生在進程卡在rw_sem上了,這時候需要快速找到持鎖者,可以從棧中拿到sem的地址以及數據,然后尋找真正的持鎖者。注意owner最后幾個bit需要清零才是task結構。
handoff機制
handoff這里應該翻譯為“接力”,它的出現是為了彌補樂觀自旋帶來的漏洞,既并非先到先得。鎖大多被設計為先到先得,是為了實現公平,防止某些進程一直得不到資源被餓死。而樂觀自旋是為了減少進程切換的開銷,降低鎖帶來的延遲,但是樂觀自旋會打破先到先得的規則。
semaphore的handoff是為了防止偷鎖的情況出現。等待隊列的進程都是在睡眠的,嘗試持鎖的進程可以進入樂觀自旋階段,這個階段無視等待隊列一旦有機會就會獲取鎖,造成本該最先拿到鎖的等待隊列隊首被跳過。隊首被喚醒持鎖,發現鎖已經被偷取,這時候就會設置handoff bit,后面所有的持鎖者看到這個bit就不能再偷鎖了。因此隊首進程可以順利獲取到鎖。
設置時機:
對于讀者,當讀者在隊首并且被喚醒嘗試持鎖失敗之后,如果超時設置handoff。
對于寫者,同樣在隊首,當自身是RT進程或者超時時設置handoff。
生效時機:
所有試圖持鎖者都需要檢測是否有handoff標志位,一旦存在均會持鎖失敗。設置者會在持鎖成功之后才清除這個標志位。
樂觀自旋
樂觀自旋是性能和速度的均衡考慮,讓進程在滿足一定的條件下循環等待某些數據,即期待持有鎖的進程盡快釋放鎖所以占用cpu一段時間來死等不退出,避免自身陷入睡眠然后很快被喚醒。一段時間是指調度器沒有標記當前進程需要被調度走。 偽代碼如下:
while(1) {檢測數據是否發生變變化是 退出,返回成功當前cpu時間片是否用盡?是 退出,返回失敗當前需要等待的進程是否還在cpu上運行否 退出,返回失敗 }數據結構
實際生效的定義如下: struct rw_semaphore {atomic_long_t count; //用于記錄鎖的狀態atomic_long_t owner; //持鎖的寫者或者其中一個讀者struct optimistic_spin_queue osq; //用于樂觀自旋raw_spinlock_t wait_lock; //保護wait_list結構的spinlockstruct list_head wait_list; //等鎖者鏈表void *magic; //魔術數,記錄鎖的地址,當其被改變時可以認為鎖的數據結構被破壞struct lockdep_map dep_map; //死鎖檢測,暫不關注 }count
count用來表示當前鎖的狀態較為復雜,其定義如下:
* Bit 0 - writer locked bit* Bit 1 - waiters present bit* Bit 2 - lock handoff bit* Bits 3-7 - reserved* Bits 8-62 - 55-bit reader count* Bit 63 - read fail bitbit 0用來表示是否有寫者持鎖,寫者持鎖成功的標志就是這個位被設置
bit 1表示等待隊列中是否存在等待者,有等待者時讀者持鎖成功需要喚醒等待隊列的其他讀者,讀寫者釋放鎖時需要喚醒等待隊列的隊首進程
bit 2表示hand off bit,這個是為了防止餓死設計的。handoff bit會阻止所有的偷鎖/持鎖行為保證鎖的交接。
bit 8-62 表示讀者的數目,這個數目并不能真正的反應持鎖成功的讀者個數。某些情況下讀者會采取先斬后奏的方式,先加上這個計數,然后再判斷自己能否持鎖,不行的話就減去恢復原來的計數。一般而言,當讀者持鎖時這個計數是真實的,但是寫者持鎖時也可能有讀者計數,這部分就是先斬后奏的讀者。
最高位63表示讀者個數溢出位,幾乎不會被設置,但是在某些路徑還是可能會被檢查
下面是用于判斷count的一些掩碼。
osq
struct optimistic_spin_queue osq 成員用于樂觀自旋,暫不做詳細分析,其本質為mcslock的一種變體,當持鎖者還在cpu上運行時,等鎖者可以期待持鎖者很快會釋放這個鎖,所以不會陷入睡眠而是自旋等待。反之持鎖者已經放棄了cpu,那么預計很長一段時間內持鎖者都不會釋放鎖,此時等鎖者陷入睡眠不再忙等。
owner
owner用來記錄當前持鎖的task,由于task必定是cacheline對齊的目前最小的cacheline也有16bit,所以owner的低位可以用來表示一些flag。同時可以有多個reader持鎖,因此繡著持鎖時owner就是持鎖者,但是讀者持鎖時owner有特殊表示。
#define RWSEM_READER_OWNED (1UL << 0) bit 0 用來表示當前是reader持鎖 #define RWSEM_RD_NONSPINNABLE (1UL << 1) bit 1 表示是否允許讀者自旋等待 #define RWSEM_WR_NONSPINNABLE (1UL << 2) bit 2 表示是否允許寫者等待 #define RWSEM_NONSPINNABLE (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE) #define RWSEM_OWNER_FLAGS_MASK (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)讀者的owner一般由rwsem_set_reader_owned設置,為 current | RWSEM_READER_OWNED | (之前owner & RWSEM_RD_NONSPINNABLE)
寫者的owner由rwsem_set_owner設置,一般為 current
wait_list
等待隊列用來放置等待鎖的繼承,但是由于包含樂觀自旋邏輯,等待隊列不是全部的等待者,部分等待著可能在樂觀自旋等待owner。
作為鏈表頭,連接所有的struct rwsem_waiter.list
初始化
存在兩種方式,一種是臨時定義一個棧上的變量并且初始化。
還可以動態初始化,動態初始化的結果和靜態一致。
#define init_rwsem(sem) \ do { \static struct lock_class_key __key; \\__init_rwsem((sem), #sem, &__key); \ } while (0)void __init_rwsem(struct rw_semaphore *sem, const char *name,struct lock_class_key *key) { #ifdef CONFIG_DEBUG_RWSEMSsem->magic = sem; #endifatomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);raw_spin_lock_init(&sem->wait_lock);INIT_LIST_HEAD(&sem->wait_list);atomic_long_set(&sem->owner, 0L); #ifdef CONFIG_RWSEM_SPIN_ON_OWNERosq_lock_init(&sem->osq); #endiftrace_android_vh_rwsem_init(sem); }down_read 讀者持鎖
讀者持鎖的判定條件較為復雜,即使有寫者持鎖時,也可能向count增加RWSEM_READER_BIAS,增加總是能成功的,但是不代表持鎖成功,增加之后一般會檢測是否有writer以及handoff,失敗需要回退,增加只是為了防止寫者搶占。讀者持鎖的條件如下:
note:
__down_read_trylock
讀者嘗試非阻塞的持鎖,僅當count == 0,也就是沒有reader也沒有writer也沒有waiter的情況下才能持鎖成功。成功之后設置自己的task | RWSEM_READER_OWNED 到owner。
返回1表示成功,0表示失敗。
__down_read_trylock {if(sem->count == 0)sem->count += RWSEM_READER_BIAS(1 << 8)sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLEreturn 1return 0 } static inline int __down_read_trylock(struct rw_semaphore *sem) {long tmp;//檢測magic是否被改寫DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);//設置tmp為鎖沒有持有者也沒有等待者的狀態的值,為0tmp = RWSEM_UNLOCKED_VALUE;do {//判斷sem->count == tmp//是,當前無人持鎖無人等待,設置sem->count = tmp + RWSEM_READER_BIAS,返回1表示更新成功//否,鎖不是空閑狀態,更新count失敗,返回0if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,tmp + RWSEM_READER_BIAS)) {//sem->owner &= current | RWSEM_READER_OWNED | RWSEM_RD_NONSPINNABLErwsem_set_reader_owned(sem);//持鎖成功返回1return 1;}} while (!(tmp & RWSEM_READ_FAILED_MASK));//嘗試拿鎖失敗,返回0return 0; }__down_read
阻塞性的等鎖直到持鎖成功。
__down_read {rwsem_read_trylock{sem->count += RWSEM_READER_BIASreturn !(cnt & (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL))}if (!rwsem_read_trylock) {rwsem_down_read_slowpath} else {rwsem_set_reader_owned}} static inline void __down_read(struct rw_semaphore *sem) {//讀者嘗試持鎖,0表示失敗//不管失敗成功與否,都會在count中增加讀者的計數RWSEM_READER_BIAS。if (!rwsem_read_trylock(sem)) {//讀者持鎖失敗,進入慢速路徑rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);} else {//讀者持鎖成功,設置owner = current | RWSEM_READER_OWNED,并且繼承之前設置的RWSEM_RD_NONSPINNABLErwsem_set_reader_owned(sem);} }rwsem_read_trylock
讀者嘗試持鎖,返回0表示失敗,1表示成功。不管失敗成功與否,都會在count中增加讀者的計數RWSEM_READER_BIAS。
失敗的原因有:
rwsem_down_read_slowpath
讀者直接持鎖失敗,進入慢速路徑。
注意,在此函數之前讀者已經將自己的標志位RWSEM_READER_BIAS加入了count。
rwsem_down_read_slowpath {if(rwsem_can_spin_on_owner) {//判斷自身是否需要調度,持鎖者是否在運行,以及flag中有沒有明確禁止當前身份的自旋減去之前rwsem_read_trylock中加上的RWSEM_READER_BIAS (2)atomic_long_add(-RWSEM_READER_BIAS, &sem->count); //減去之前rwsem_read_trylock中加上的RWSEM_READER_BIASif(rwsem_optimistic_spin){ //樂觀自旋,自旋等待成功會設置當前task為owner,設置count對應bit等待隊列不為空?rwsem_mark_wake 喚醒等待隊列其他讀者return 持鎖成功,退出} else if (rwsem_reader_phase_trylock()) //樂觀自旋失敗,再嘗試一次持鎖return 持鎖成功,退出}樂觀自旋徹底失敗,加入等待隊列初始化waiter結構,設置超時時間如果進程沒有經歷過樂觀自旋(1)處設置的count一直在生效,會阻止寫者持鎖,當前不是寫者持鎖和handoff狀態,那讀者持鎖成功了持鎖成功,退出否則加上RWSEM_FLAG_WAITERS,將自身加入等待隊列加入等待隊列之后鎖被釋放,或者另一個讀者持鎖我們是隊首,那么rwsem_mark_wake喚醒所有讀者持鎖包括我們自身持鎖成功,退出while(1)等待waiter.task被清除} 讀者將自身加入等待隊列時,會創建喚醒隊列和等待隊列結構 struct rwsem_waiter {struct list_head list; //鏈表頭struct task_struct *task; //等待持鎖是task,被喚醒時會清除這個成員enum rwsem_waiter_type type; //表明自身是等待讀鎖還是寫鎖unsigned long timeout; //保存超時時間,只對隊首進程生效,超時之后并不會退出,而是設置handoff標志unsigned long last_rowner; //保存當前的持鎖者 }struct wake_q_head {struct wake_q_node *first; //初始化為WAKE_Q_TAILstruct wake_q_node **lastp; //初始化為自身first成員的地址 } static struct rw_semaphore __sched * rwsem_down_read_slowpath(struct rw_semaphore *sem, int state) {//adjustment用來存儲將要count增加或者減少的值//初始化為非0,在樂觀自旋邏輯中清零,這樣可以通過該值判斷進程有沒有經歷過樂觀自旋的邏輯long count, adjustment = -RWSEM_READER_BIAS;struct rwsem_waiter waiter;//初始化喚醒隊列DEFINE_WAKE_Q(wake_q);bool wake = false;//保存當前owner信息waiter.last_rowner = atomic_long_read(&sem->owner);//如果當前不是讀者持鎖if (!(waiter.last_rowner & RWSEM_READER_OWNED))//在保存的owner中加上禁止讀者自旋等待的flagwaiter.last_rowner &= RWSEM_RD_NONSPINNABLE;//當前是否允許自旋等待,不允許則加入等待隊列if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE))goto queue;//樂觀自旋等待的部分 ---------------------------------------------------------------------- (1)//減去之前rwsem_read_trylock中加上的RWSEM_READER_BIASatomic_long_add(-RWSEM_READER_BIAS, &sem->count);adjustment = 0;if (rwsem_optimistic_spin(sem, false)) {//自旋等待成功,成功設置owner為當前task,持有count//讀者持鎖成功,就可以將其他等待的讀者喚醒了if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS)) {raw_spin_lock_irq(&sem->wait_lock);if (!list_empty(&sem->wait_list))//喚醒隊列中所有能喚醒的讀者rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q);raw_spin_unlock_irq(&sem->wait_lock);wake_up_q(&wake_q);}return sem;//樂觀自旋失敗,進行最后一次嘗試} else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {return sem;}//樂觀自旋徹底失敗,加入等待隊列 ---------------------------------------------------------------------- (2) queue:waiter.task = current;waiter.type = RWSEM_WAITING_FOR_READ;waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;raw_spin_lock_irq(&sem->wait_lock);//等待隊列為空,當前進程為隊首if (list_empty(&sem->wait_list)) {//adjustment!=0意味著沒有經歷過代碼(1)部分的樂觀自旋等待,即之前設置的count中的RWSEM_READER_BIAS沒有被減去//當前不是寫者持鎖和handoff狀態,那讀者持鎖成功了if (adjustment && !(atomic_long_read(&sem->count) &(RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {smp_acquire__after_ctrl_dep();raw_spin_unlock_irq(&sem->wait_lock);//count值前面已經增加過RWSEM_READER_BIAS了,現在不需要調整,設置owner之后就退出rwsem_set_reader_owned(sem);lockevent_inc(rwsem_rlock_fast);return sem;}//否則添加RWSEM_FLAG_WAITERS,表明等待隊列非空,當前進程不是隊首adjustment += RWSEM_FLAG_WAITERS;}//將自身加入等待隊列list_add_tail(&waiter.list, &sem->wait_list);if (adjustment)count = atomic_long_add_return(adjustment, &sem->count);else count = atomic_long_read(&sem->count);//加入等待隊列完畢 ----------------------------------------------------//加入等待隊列之后情況如果發生改變,鎖被釋放了if (!(count & RWSEM_LOCK_MASK)) {clear_wr_nonspinnable(sem);wake = true;}//或者沒有寫者持鎖并且我們是隊首//那么喚醒并且協助隊列中所有的讀者持鎖,包括我們自身//rwsem_mark_wake會清除所有被喚醒者的waiter.taskif (wake || (!(count & RWSEM_WRITER_MASK) &&(adjustment & RWSEM_FLAG_WAITERS))) //adjustment被設置了RWSEM_FLAG_WAITERS意味著我們是隊首rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);raw_spin_unlock_irq(&sem->wait_lock);wake_up_q(&wake_q);//在隊列中等待waiter.task被清除 ----------------------------------------------------------for (;;) {set_current_state(state);//當waiter.task被清除時,說明我們持鎖成功if (!smp_load_acquire(&waiter.task)) {/* Matches rwsem_mark_wake()'s smp_store_release(). */break;}//處理信號 ...schedule();lockevent_inc(rwsem_sleep_reader); }__set_current_state(TASK_RUNNING);lockevent_inc(rwsem_rlock);return sem;out_nolock:list_del(&waiter.list);if (list_empty(&sem->wait_list)) {atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,&sem->count);}raw_spin_unlock_irq(&sem->wait_lock);__set_current_state(TASK_RUNNING);lockevent_inc(rwsem_rlock_fail);return ERR_PTR(-EINTR); }rwsem_can_spin_on_owner
檢測是否能夠樂觀自旋。
static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem,unsigned long nonspinnable) {struct task_struct *owner;unsigned long flags;bool ret = true;//當前任務被調度器標記為需要調度,那么不能自旋等待if (need_resched()) {return false;} ...//獲取當前owner以及flagowner = rwsem_owner_flags(sem, &flags);//如果flag中禁止了當前身份的自旋或者當前是寫者持鎖并且寫者放棄了cpu,自旋失敗if ((flags & nonspinnable) ||(owner && !(flags & RWSEM_READER_OWNED) && !owner_on_cpu(owner)))ret = false; ...return ret; }rwsem_optimistic_spin
在owner上樂觀自旋,第一步通過樂觀自旋獲取sem->osq,只有持有這個鎖才能在sem->owner上自旋等待。返回自旋等待是否成功,如果是被迫退出自旋返回0.
自旋等待成功會設置當前task為owner。
rwsem_optimistic_spin {osq_lock 獲取樂觀自旋所osq失敗,退出while(1) {owner_state = rwsem_spin_on_owner 更新當前owner狀態OWNER_SPINNABLE? -> breakrwsem_try_xxx_lock_unqueued 調用不加入隊列嘗試獲取鎖的函數,此函數能夠搶占等待隊列的持鎖機會reader只檢測是否存在writer持鎖和handoffwriter只檢測鎖是否被人持有和handoff,兩者均不關心waiter成功 -> break寫者等待讀者持鎖的情況,讀者太多不能確定每個讀者是否都在線每16次循環檢測超時時間超時 -> breakRT任務不能被阻塞,讀者持鎖情況向讀者太多不能確定每個讀者是否都在線兩次機會獲取鎖或者帶寫者持鎖開始spin}} @sem 指向的指針鎖 @wlock 決定是否是寫者函數中使用owner_state表示當前owner的狀態 enum owner_state {OWNER_NULL = 1 owner為NULLOWNER_WRITER = 2 owner為寫者OWNER_READER = 4 owner為讀者OWNER_NONSPINNABLE = 8 當前不允許在owner上spin } #define OWNER_SPINNABLE (OWNER_NULL | OWNER_WRITER | OWNER_READER)static bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock) {bool taken = false;int prev_owner_state = OWNER_NULL;int loop = 0;u64 rspin_threshold = 0; unsigned long nonspinnable = wlock ? RWSEM_WR_NONSPINNABLE: RWSEM_RD_NONSPINNABLE;preempt_disable();/* 先獲取osqlock,osqlock也是一種樂觀自旋機制,在soqlock上自旋失敗,則不嘗試在owner上自旋等待,直接退出 */if (!osq_lock(&sem->osq))goto done;//由于到達此步之前需要持有osq,因此即使有多個讀寫者搶鎖也只能有一個自旋等待owner//樂觀自旋等待owner發生變化時持鎖for (;;) {enum owner_state owner_state;//自旋等待owner發生改變,返回改變之后的stateowner_state = rwsem_spin_on_owner(sem, nonspinnable);//除OWNER_NONSPINNABLE之外都是允許繼續spin的if (!(owner_state & OWNER_SPINNABLE))break;//根據身份調用不加入隊列嘗試獲取鎖的函數,不加入等待隊列意味著有可能搶占等待隊列進程的持鎖機會//對于寫者而言,等待count中既沒有讀者持鎖也沒有寫者持鎖,也不是handoff狀態//對于讀者而言,等待count中既沒有沒有寫者持鎖,也不是handoff狀態//成功返回1,失敗返回0taken = wlock ? rwsem_try_write_lock_unqueued(sem): rwsem_try_read_lock_unqueued(sem);//持鎖成功,退出循環if (taken)break;//其他的讀者可以直接持鎖,寫者需要等待所有讀者釋放鎖才能持鎖,寫者無法弄清楚是否所有讀者都在線//所以如果當前是寫者等鎖,并且讀者持有鎖,給自旋等待的時間加個限制if (wlock && (owner_state == OWNER_READER)) {//prev_owner_state初始化為OWNER_NULL,第一次此條件必定成立if (prev_owner_state != OWNER_READER) {//不允許寫者在owner上spin,那么退出if (rwsem_test_oflags(sem, nonspinnable))break;//計算寫者的最大等待時間rspin_threshold = rwsem_rspin_threshold(sem);//循環次數清零loop = 0;并且已經超過了該讀者的預定等}//每16次循環計算當前等待是否超時,如果超時設置RWSEM_NONSPINNABLE禁止所有的在owner上的spinelse if (!(++loop & 0xf) && (sched_clock() > rspin_threshold)) {rwsem_set_nonspinnable(sem);lockevent_inc(rwsem_opt_nospin);break;}}//讀者可能有很多,RT任務同樣不能弄清楚每個讀者的狀態//上次不是寫者持鎖,則RT任務不再等待,讀者一直持鎖RT任務將有兩次等待機會if (owner_state != OWNER_WRITER) {//調度器認為我們需要讓出cpu,那么退出等待if (need_resched())break;//讀者持鎖的情況向RT任務有兩次機會spinif (rt_task(current) &&(prev_owner_state != OWNER_WRITER))break;}//記錄這次循環的持鎖者,下次循環時會用到prev_owner_state = owner_state;cpu_relax();}osq_unlock(&sem->osq); done:preempt_enable();lockevent_cond_inc(rwsem_opt_fail, !taken);return taken; }rwsem_spin_on_owner
? 自旋等待owner或者flag發生改變,返回新的state
static noinline enum owner_state rwsem_spin_on_owner(struct rw_semaphore *sem, unsigned long nonspinnable) {struct task_struct *new, *owner;unsigned long flags, new_flags;enum owner_state state; //獲取ownerowner = rwsem_owner_flags(sem, &flags);/* 獲取onwer的狀態,這里有4種情況* 1.owner中存在禁止當前身份spin的flag,比如當前是讀者owner中存在RWSEM_RD_NONSPINNABLE,返回 OWNER_NONSPINNABLE* 2.當前是讀者持鎖,返回 OWNER_READER* 3.在等待的過程中所已經被釋放了,返回 OWNER_NULL* 4.寫者持鎖,OWNER_WRITER */state = rwsem_owner_state(owner, flags, nonspinnable);//當前不是寫者持鎖,停止等待owner//因為讀者持鎖時間可能很長,OWNER_NONSPINNABLE意味著禁止自旋等待if (state != OWNER_WRITER)return state;rcu_read_lock();for (;;) {//讀取新的flag和ownernew = rwsem_owner_flags(sem, &new_flags);//如果owner或者flag發生了改變if ((new != owner) || (new_flags != flags)) {//重新獲取stat,終止循環state = rwsem_owner_state(new, new_flags, nonspinnable);break;} ...//如果自選條件不滿足if (need_resched() || !owner_on_cpu(owner)) {state = OWNER_NONSPINNABLE;break;}cpu_relax();}rcu_read_unlock();return state; }rwsem_try_xxx_lock_unqueued
不加入等待隊列的嘗試持鎖,這此持鎖沒有判斷等待隊列是否有等待者,可能會搶占等待隊列中進程的持鎖機會,所以需要進行handoff的判斷,避免等待著被強占次數過多餓死。
static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem) {long count = atomic_long_read(&sem->count);//寫者等待無人持鎖以及非handoff狀態,之后嘗試持鎖//使用循環是為了給寫者更多的持鎖機會,讀者持鎖就只有一次判斷while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,count | RWSEM_WRITER_LOCKED)) {rwsem_set_owner(sem);lockevent_inc(rwsem_opt_wlock);return true;}}return false; }前面讀者實際上是有非阻塞持鎖的函數的,rwsem_read_trylock,與此函數區別在于是否判斷waiter。
static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem) {long count = atomic_long_read(&sem->count);//判斷是否有寫者持鎖以及是否在handoff狀態,讀者持鎖不影響另一個讀者if (count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))return false;//不在的話嘗試持鎖,將count + RWSEM_READER_BIAScount = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count);//成功則開始設置ownerif (!(count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {rwsem_set_reader_owned(sem);lockevent_inc(rwsem_opt_rlock);return true;}//失敗則回退自己設置的RWSEM_READER_BIASatomic_long_add(-RWSEM_READER_BIAS, &sem->count);return false; }rwsem_mark_wake
喚醒等待隊列的進程。
如果隊首是寫者但是喚醒類型不是RWSEM_WAKE_ANY,那么喚醒不了任何進程。
隊首是讀者,當前不是讀者持鎖喚醒其他讀者,那么幫助這個讀者提前占據鎖。搶鎖失敗并且超時,就設置handoff位。
@wake_type 需要喚醒對象的類型 enum rwsem_wake_type {RWSEM_WAKE_ANY, /* 喚醒隊列的第一個等待者 */RWSEM_WAKE_READERS, /* 只喚醒讀者 */RWSEM_WAKE_READ_OWNED /* 讀者持鎖喚醒其他讀者 */ }; @wake_q 喚醒隊列static void rwsem_mark_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type,struct wake_q_head *wake_q) {struct rwsem_waiter *waiter, *tmp;long oldcount, woken = 0, adjustment = 0;struct list_head wlist;lockdep_assert_held(&sem->wait_lock);//如果第一個等待者是寫者,并且喚醒對象是任意隊首等待者,那么不喚醒所有進程進行競爭,只喚醒這個寫者//如果第一個等待者是寫者,但是喚醒對象是其他的,比如讀者持鎖喚醒其他讀者或者僅限于喚醒讀者的情況//直接返回不進行喚醒動作,因為第一個等待的寫者應該最先被喚醒waiter = rwsem_first_waiter(sem);if (waiter->type == RWSEM_WAITING_FOR_WRITE) {if (wake_type == RWSEM_WAKE_ANY) {wake_q_add(wake_q, waiter->task);lockevent_inc(rwsem_wake_writer);}return;}//此處之后處理隊首是讀者的情況 -------------------------------------------------------------------------//讀者計數溢出處理if (unlikely(atomic_long_read(&sem->count) < 0))return;//不是讀者持鎖喚醒其他讀者的情況,說明當前鎖是未被人持有的狀態,需要搶鎖if (wake_type != RWSEM_WAKE_READ_OWNED) {struct task_struct *owner;//先構造一個虛假的讀者持鎖,幫助這個隊首的讀者提前獲取鎖adjustment = RWSEM_READER_BIAS;oldcount = atomic_long_fetch_add(adjustment, &sem->count);//搶鎖失敗了,寫者偷到了鎖,隊首的讀者錯過了一次持鎖機會,并且已經超過了該讀者的預定等待時間//現在設置handoff,屏蔽所有競爭鎖以及竊取鎖的動作,讓下個持鎖者直接把鎖交給隊首的讀者,退出if (unlikely(oldcount & RWSEM_WRITER_MASK)) {if (!(oldcount & RWSEM_FLAG_HANDOFF) &&time_after(jiffies, waiter->timeout)) {adjustment -= RWSEM_FLAG_HANDOFF;lockevent_inc(rwsem_rlock_handoff);}atomic_long_add(-adjustment, &sem->count); return;}//幫助隊首的讀者搶鎖成功了,設置隊首進程為新的ownerowner = waiter->task;if (waiter->last_rowner & RWSEM_RD_NONSPINNABLE) {owner = (void *)((unsigned long)owner | RWSEM_RD_NONSPINNABLE);lockevent_inc(rwsem_opt_norspin);}__rwsem_set_reader_owned(sem, owner);}/* 這里之后,隊首是讀者,持鎖者也是讀者,需要做的是喚醒隊列中的其他讀者 1.讀者持鎖喚醒其他讀者, 2.幫助隊首的讀者獲取了鎖,現在需要繼續喚醒剩余的讀者*/ -------------------------------------------------------------------//現在,持鎖者是讀者了,我們可以喚醒隊列中至多MAX_READERS_WAKEUP個其他的讀者,一般是1600個INIT_LIST_HEAD(&wlist);list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {if (waiter->type == RWSEM_WAITING_FOR_WRITE)continue;woken++;list_move_tail(&waiter->list, &wlist);if (woken >= MAX_READERS_WAKEUP)break;}//增加需要喚醒的讀者的計數,如果等待隊列為空需要移除掉waiter標志adjustment = woken * RWSEM_READER_BIAS - adjustment;lockevent_cond_inc(rwsem_wake_reader, woken);if (list_empty(&sem->wait_list)) {adjustment -= RWSEM_FLAG_WAITERS;}//handoff只作用于隊首進程,現在我們已經協助隊首拿到鎖了,清除它if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))adjustment -= RWSEM_FLAG_HANDOFF;//將增加的讀者數目寫入countif (adjustment)atomic_long_add(adjustment, &sem->count);/* 將需要喚醒的進程放入wake_q */list_for_each_entry_safe(waiter, tmp, &wlist, list) {struct task_struct *tsk;tsk = waiter->task;get_task_struct(tsk);//喚醒之前清除waiter->tasksmp_store_release(&waiter->task, NULL);wake_q_add_safe(wake_q, tsk);} }up_read 讀者釋放鎖
如果owner是當前task,清除掉owner中task_struct部分,保留flag部分。
將count減去RWSEM_READER_BIAS,即減少一名讀者計數。
若無人持鎖并且等待隊列不為空,調用rwsem_wake喚醒等待隊列進程,最終盜用的是rwsem_mark_wake,喚醒類型為RWSEM_WAKE_ANY。
void up_read(struct rw_semaphore *sem) {rwsem_release(&sem->dep_map, _RET_IP_);__up_read(sem); }static inline void __up_read(struct rw_semaphore *sem) {long tmp;DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);//onwer是當前進程,將flag中的task_struct清除,保留flagrwsem_clear_reader_owned(sem);//減少讀者數目tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);DEBUG_RWSEMS_WARN_ON(tmp < 0, sem);//當前無人持鎖if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==RWSEM_FLAG_WAITERS)) {clear_wr_nonspinnable(sem);//喚醒等待隊列rwsem_wake(sem, tmp);} }down_write 寫者持鎖
寫者和其他寫著以及讀者互斥,因此需要沒有任何人持鎖的情況下才能獲取鎖。
寫者持鎖的條件比讀者簡單,只需要設置count的RWSEM_WRITER_LOCKED bit就算持鎖成功。
讀者會有其他人協助幫忙持鎖,自身被喚醒只用檢查waiter.task是否被清空,寫者無人協助,被喚醒之后會自己去檢查鎖的狀態,然后拿鎖。
挾制持鎖,owner必定是自身。
down_write 和讀者持鎖流程一樣,都是先嘗試__down_write_trylock失敗則調用__down_write。
偽代碼為: if !__down_write_trylock__down_writevoid __sched down_write(struct rw_semaphore *sem) {might_sleep();rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);LOCK_CONTENDED(sem, __down_write_trylock, __down_write); }__down_write_trylock
寫者嘗試非阻塞持鎖。僅當count == 0時,將其設置為RWSEM_WRITER_LOCKED,并且設置owner為當前task。
static inline int __down_write_trylock(struct rw_semaphore *sem) {long tmp;DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem);tmp = RWSEM_UNLOCKED_VALUE;//如果count == 0,那么設置count = RWSEM_WRITER_LOCKED,即表明寫者持鎖if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,RWSEM_WRITER_LOCKED)) { //設置owner = current rwsem_set_owner(sem);return true;}return false; }__down_write
static inline void __down_write(struct rw_semaphore *sem) {long tmp = RWSEM_UNLOCKED_VALUE;//再嘗試一次__down_write_trylock的流程if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED)))//失敗則進入慢速路徑rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE);elserwsem_set_owner(sem); }rwsem_down_write_slowpath
樂觀自旋持鎖以及加入等待隊列死等的慢速路徑。
rwsem_down_write_slowpath {和讀者類似的樂觀自旋嘗試加入等待隊列不是隊首,寫者持鎖什么都不干,讀者持鎖喚醒隊列其他讀者,無人持鎖喚醒隊列隊首while(1) {rwsem_try_write_lock 嘗試持鎖,當前隊首進程等待超時設置handoff當前進程設置了handoff,那么嘗試在owner上樂觀自旋} } wstate 用來表明當前寫著的狀態 WRITER_FIRST 當前是等待隊列的隊首 WRITER_NOT_FIRST 當前不是隊首 WRITER_HANDOFF 當前進程已經等待超時或者當前時RT進程,允許在count中設置handoffstatic struct rw_semaphore * rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) {long count;bool disable_rspin;enum writer_wait_state wstate;struct rwsem_waiter waiter;struct rw_semaphore *ret = sem;DEFINE_WAKE_Q(wake_q);//嘗試樂觀自旋偷鎖//和前面讀者流程一致,不過是以寫者身份spin,等待count中沒有寫者持鎖,讀者持鎖,handoff標記if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) &&rwsem_optimistic_spin(sem, true)) {/* rwsem_optimistic_spin() implies ACQUIRE on success */return sem;}//獲取禁止spin的flagdisable_rspin = atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE;//初始化waiter,準備加入等待隊列waiter.task = current;waiter.type = RWSEM_WAITING_FOR_WRITE;waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;raw_spin_lock_irq(&sem->wait_lock); //當前進程是等待隊列的隊首嗎?wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;list_add_tail(&waiter.list, &sem->wait_list);//如果我們不是隊首,需要處理下等待隊列if (wstate == WRITER_NOT_FIRST) {count = atomic_long_read(&sem->count);//當前是寫者持鎖,沒什么好做的慢慢等待即可if (count & RWSEM_WRITER_MASK)goto wait;//當前是讀者持鎖,嘗試使用RWSEM_WAKE_READERS喚醒隊列內的其他讀者持鎖//當前無人持鎖,RWSEM_WAKE_READERS喚醒隊列的隊首rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)? RWSEM_WAKE_READERS: RWSEM_WAKE_ANY, &wake_q);//喚醒進程if (!wake_q_empty(&wake_q)) { raw_spin_unlock_irq(&sem->wait_lock);wake_up_q(&wake_q);wake_q_init(&wake_q); /* Used again, reinit */raw_spin_lock_irq(&sem->wait_lock);}} else {//我們是隊首,增加RWSEM_FLAG_WAITERS標志atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);}wait:/* 死循環等鎖 */set_current_state(state);for (;;) {//以寫者的身份嘗試獲取鎖,成功返回1if (rwsem_try_write_lock(sem, wstate)) {break;}raw_spin_unlock_irq(&sem->wait_lock);//如果當前進程允許設置handoff還是持鎖失敗,那么嘗試在owner上自旋等待owner為NULL,避免睡眠//因為設置了handoff之后應該不會再有競爭者if (wstate == WRITER_HANDOFF &&rwsem_spin_on_owner(sem, RWSEM_NONSPINNABLE) == OWNER_NULL)goto trylock_again;/* Block until there are no active lockers. */for (;;) {...//讓出cpu,等待喚醒schedule();//喚醒之后的節點,更新自身以及鎖的狀態,發生改變就進入外層循環再次嘗試持鎖,否則回來繼續睡眠 ------------------------------------------------------//當前進程能夠設置handoff狀態,馬上退出內存循環到外層循環rwsem_try_write_lock給count設置handoffif (wstate == WRITER_HANDOFF)break;//當前不是隊首進程,更新下自身狀態看下是否已經成為新的隊首if ((wstate == WRITER_NOT_FIRST) &&(rwsem_first_waiter(sem) == &waiter))wstate = WRITER_FIRST;//更新鎖的狀態,如果無人持鎖馬上去外層循環嘗試持鎖count = atomic_long_read(&sem->count);if (!(count & RWSEM_LOCK_MASK))break;//當前是隊首并且是RT進程或者等待超時,那么允許設置handoff狀態if ((wstate == WRITER_FIRST) && (rt_task(current) ||time_after(jiffies, waiter.timeout))) {wstate = WRITER_HANDOFF;lockevent_inc(rwsem_wlock_handoff);break;}} trylock_again:raw_spin_lock_irq(&sem->wait_lock);}//持鎖成功,恢復進程狀態,將自身從等待隊列刪除__set_current_state(TASK_RUNNING);list_del(&waiter.list);rwsem_disable_reader_optspin(sem, disable_rspin);raw_spin_unlock_irq(&sem->wait_lock);lockevent_inc(rwsem_wlock);return ret;out_nolock:__set_current_state(TASK_RUNNING);raw_spin_lock_irq(&sem->wait_lock); list_del(&waiter.list);if (unlikely(wstate == WRITER_HANDOFF))atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);if (list_empty(&sem->wait_list))atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);elserwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);raw_spin_unlock_irq(&sem->wait_lock);wake_up_q(&wake_q);lockevent_inc(rwsem_wlock_fail);return ERR_PTR(-EINTR); }rwsem_try_write_lock
寫者嘗試獲取鎖,主要用來處理寫者的handoff的標記。
總結
以上是生活随笔為你收集整理的rw_semaphore 原理与代码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二分查找与时间复杂度计算分析
- 下一篇: 双月数据生成及其常见算法(一)