Michael-Scott非阻塞队列(lock-free)算法的C实现
Michael-Scott非阻塞隊(duì)列算法,即MS-queue算法,是1 9 9 6 年由Maged . M .Michael and M. L. Scott提出的,是最為經(jīng)典的并發(fā)FIFO隊(duì)列上的算法,目前很多對(duì)并發(fā)FIFO隊(duì)列的研究都是基于這個(gè)算法來加以改進(jìn)的。在共享內(nèi)存的多核處理器上,這種基于Compare-and-swap(CAS)的算法在性能上要遠(yuǎn)遠(yuǎn)優(yōu)于以前基于鎖的算法,并且已經(jīng)被Java并發(fā)包所采用。它的主要特點(diǎn)在于允許多線程并發(fā)的、無干擾的訪問隊(duì)列的頭和尾。
MS-queue算法依賴于CAS原子操作,CAS操作是與處理器體系結(jié)構(gòu)有關(guān)的,GCC中已經(jīng)提供了內(nèi)建的CAS相關(guān)的API,具體參見這里。
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...); /* 對(duì)應(yīng)的偽代碼 */ { if (*ptr == oldval) { *ptr = newval; return true; } else { return false; } } { if (*ptr == oldval) { *ptr = newval; } return oldval; }?
與CAS API一起的,還包括另一組自增、自減、與、或、非、異或原子操作的API。
type __sync_fetch_and_add(type *ptr, type value, ...); // m+n type __sync_fetch_and_sub(type *ptr, type value, ...); // m-n type __sync_fetch_and_or(type *ptr, type value, ...); // m|n type __sync_fetch_and_and(type *ptr, type value, ...); // m&n type __sync_fetch_and_xor(type *ptr, type value, ...); // m^n type __sync_fetch_and_nand(type *ptr, type value, ...); // (~m)&n /* 對(duì)應(yīng)的偽代碼 */ { tmp = *ptr; *ptr op= value; return tmp; } { tmp = *ptr; *ptr = (~tmp) & value; return tmp; } // nand?
使用這組API有很多好處,比如C/C++中自增自減及賦值操作都不是原子操作,如果是多線程程序需要使用全局計(jì)數(shù)器,程序就需要使用鎖或者互斥量,對(duì)于較高并發(fā)的程序,會(huì)造成一定的性能瓶頸。而通過使用這組API,GCC通過在匯編級(jí)別的代碼來保證賦值類操作的原子性,相對(duì)于涉及到操作系統(tǒng)系統(tǒng)調(diào)用和應(yīng)用層同步的鎖和互斥量,這組api的效率要高很多。
言歸正傳,回到MS-queue無鎖(lock-free)隊(duì)列上來。雖說MS-queue已經(jīng)是大名鼎鼎了,不過找一個(gè)現(xiàn)成的C實(shí)現(xiàn)貌似還真不容易,C++的實(shí)現(xiàn)這里已經(jīng)有了,是基于Boost的。另一個(gè)是復(fù)旦大學(xué)一個(gè)研究組的實(shí)現(xiàn)(這里),不過主要是針對(duì)64位機(jī),CAS原語直接用匯編指令搞定的,覺得直接在32位下用或arm的GCC編譯下會(huì)有問題。由于平時(shí)的項(xiàng)目開發(fā)用的基本是GCC編譯器或arm的GCC,因此,自己實(shí)現(xiàn)了一個(gè)適用于32位機(jī)的、采用GCC內(nèi)置CAS API的MS-queue。
ms_queue.h:
if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&old_tail))this is correct:if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&tail))*/if (CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tail)){if (next.ptr == NULL){tmp.ptr = node;tmp.count = next.count+1;if (CAS((uint64_t*)&tail.ptr->next, *(const uint64_t*)&next, *(const uint64_t*)&tmp)){break;}}else{tmp.ptr = next.ptr;tmp.count = tail.count+1;CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp);}}}tmp.ptr = node;tmp.count = tail.count+1;CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp); }int dequeue(queue_t *q, data_type* pvalue) {pointer_t old_head, head, tail, next, tmp;while(1){head = q->Head;old_head = head;tail = q->Tail;next = head.ptr->next;/* head may be changed in CAS after compare but before assign to q->Head,* so this is incorrect:if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&old_head))this is correct:if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&head))*/if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&head)){if (head.ptr == tail.ptr){if (next.ptr == NULL){return 0;}tmp.ptr = next.ptr;tmp.count = tail.count+1;CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp);}else{if (pvalue){*pvalue = next.ptr->value;}tmp.ptr = next.ptr;tmp.count = head.count+1;if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&tmp)){break;}}}}free(head.ptr);return 1; }
?
test_queue.c:
#include <stdio.h> #include <assert.h> #include "ms_queue.h"pthread_t a_id[10]; pthread_t b_id[10];queue_t queue; void* put(void* a) {int i = 0, j;int n = (int)a;for(j = n*10000000; j<(n+1)*10000000; j++){enqueue(&queue, j);}printf("put thread: %d exit\n", n); }void* get(void* a) {int v;int n = (int)a;int cnt = 10000000;while(cnt--){while(0 == dequeue(&queue, &v)){usleep(100);}}printf("get thread: %d exit\n", n); }int main() {int i, j;initialize(&queue);assert(NULL != queue.Head.ptr);assert(NULL != queue.Tail.ptr);for ( i = 0; i < 10; i++ ){pthread_create(&a_id[i], NULL, put, i);pthread_create(&b_id[i], NULL, get, i);}for ( i = 0; i < 10; i++ ){pthread_join(a_id[i], NULL);pthread_join(b_id[i], NULL);}assert(0 == dequeue(&queue, &j)); }?
轉(zhuǎn)載于:https://www.cnblogs.com/lijingcheng/p/4454848.html
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Michael-Scott非阻塞队列(lock-free)算法的C实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 算法导论笔记:06堆排序
- 下一篇: Xamarin Studio支持Type