每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
1.來源
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內部的內存隊列的延遲問題,而不是分布式隊列?;贒isruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。
2.應用背景和介紹
據目前資料顯示:應用Disruptor的知名項目有如下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有很多不少的應用,或者說有一些借鑒了它的設計機制。
Disruptor是一個高性能的線程間異步通信的框架,即在同一個JVM進程中的多線程間消息傳遞。
?
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。與Kafka、RabbitMQ用于服務間的消息隊列不同,disruptor一般用于線程間消息的傳遞?;贒isruptor開發的系統單線程能支撐每秒600萬訂單。
disruptor是用于一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠好于ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
?官方也對disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,目測性能只有有5~10倍左右的提升。
隊列
隊列是屬于一種數據結構,隊列采用的FIFO(first in firstout),新元素(等待進入隊列的元素)總是被插入到尾部,而讀取的時候總是從頭部開始讀取。在計算中隊列一般用來做排隊(如線程池的等待排隊,鎖的等待排隊),用來做解耦(生產者消費者模式),異步等等
在jdk中的隊列都實現了java.util.Queue接口,在隊列中又分為兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬于線程安全,而在我們真實的環境中,我們的機器都是屬于多線程,當多線程對同一個隊列進行排隊操作的時候,如果使用線程不安全會出現,覆蓋數據,數據丟失等無法預測的事情,所以我們這個時候只能選擇線程安全的隊列。
其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表,在隊列中,一般獲取這個隊列元素之后緊接著會獲取下一個元素,或者一次獲取多個隊列元素都有可能,而數組在內存中地址是連續的,在操作系統中會有緩存的優化(下面也會介紹緩存行),所以訪問的速度會略勝一籌,我們也會盡量去選擇ArrayBlockingQueue。而事實證明在很多第三方的框架中,比如早期的log4j異步,都是選擇的ArrayBlockingQueue。
在jdk中提供的線程安全的隊列下面簡單列舉部分隊列:\
?
?
我們可以看見,我們無鎖的隊列是無界的,有鎖的隊列是有界的,這里就會涉及到一個問題,我們在真正的線上環境中,無界的隊列,對我們系統的影響比較大,有可能會導致我們內存直接溢出,所以我們首先得排除無界隊列,當然并不是無界隊列就沒用了,只是在某些場景下得排除。其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個隊列,他們兩個都是用ReentrantLock控制的線程安全,他們兩個的區別一個是數組,一個是鏈表。
(LinkedBlockingQueue 其實也是有界隊列,但是不設置大小時就時Integer.MAX_VALUE),ArrayBlockingQueue,LinkedBlockingQueue也有自己的弊端,就是性能比較低,為什么jdk會增加一些無鎖的隊列,其實就是為了增加性能,很苦惱,又需要無鎖,又需要有界,答案就是Disruptor
Disruptor
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,并且是一個開源的并發框架,并獲得2011Duke’s程序框架創新獎。能夠在無鎖的情況下實現網絡的Queue并發操作,基于Disruptor開發的系統單線程能支撐每秒600萬訂單。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在內部集成了Disruptor用來替代jdk的隊列,以此來獲得高性能。
為什么這么牛逼?
在Disruptor中有三大殺器:
- CAS
- 消除偽共享
- RingBuffer
?
3.1.1鎖和CAS
我們ArrayBlockingQueue為什么會被拋棄的一點,就是因為用了重量級lock鎖,在我們加鎖過程中我們會把鎖掛起,解鎖后,又會把線程恢復,這一過程會有一定的開銷,并且我們一旦沒有獲取鎖,這個線程就只能一直等待,這個線程什么事也不能做。
CAS(compare and swap),顧名思義先比較在交換,一般是比較是否是老的值,如果是的進行交換設置,大家熟悉樂觀鎖的人都知道CAS可以用來實現樂觀鎖,CAS中沒有線程的上下文切換,減少了不必要的開銷
而我們的Disruptor也是基于CAS。
3.1.2偽共享
到了偽共享就不得不說計算機CPU緩存,緩存大小是CPU的重要指標之一,而且緩存的結構和大小對CPU速度的影響非常大,CPU內緩存的運行頻率極高,一般是和處理器同頻運作,工作效率遠遠大于系統內存和硬盤。實際工作時,CPU往往需要重復讀取同樣的數據塊,而緩存容量的增大,可以大幅度提升CPU內部讀取數據的命中率,而不用再到內存或者硬盤上尋找,以此提高系統性能。但是從CPU芯片面積和成本的因素來考慮,緩存都很小。
CPU緩存可以分為一級緩存,二級緩存,如今主流CPU還有三級緩存,甚至有些CPU還有四級緩存。每一級緩存中所儲存的全部數據都是下一級緩存的一部分,這三種緩存的技術難度和制造成本是相對遞減的,所以其容量也是相對遞增的。
每一次你聽見intel發布新的cpu什么,比如i7-7700k,8700k,都會對cpu緩存大小進行優化,感興趣可以自行下來搜索,這些的發布會或者發布文章。
Martin和Mike的 QConpresentation演講中給出了一些每個緩存時間:
緩存行
在cpu的多級緩存中,并不是以獨立的項來保存的,而是類似一種pageCahe的一種策略,以緩存行來保存,而緩存行的大小通常是64字節,在Java中Long是8個字節,所以可以存儲8個Long,舉個例子,你訪問一個long的變量的時候,他會把幫助再加載7個,我們上面說為什么選擇數組不選擇鏈表,也就是這個原因,在數組中可以依靠緩沖行得到很快的訪問。
?
緩存行是萬能的嗎?NO,因為他依然帶來了一個缺點,我在這里舉個例子說明這個缺點,可以想象有個數組隊列,ArrayQueue,他的數據結構如下:
?
對于maxSize是我們一開始就定義好的,數組的大小,對于currentIndex,是標志我們當前隊列的位置,這個變化比較快,可以想象你訪問maxSize的時候,是不是把currentIndex也加載進來了,這個時候,其他線程更新currentIndex,就會把cpu中的緩存行置位無效,請注意這是CPU規定的,他并不是只吧currentIndex置位無效,如果此時又繼續訪問maxSize他依然得繼續從內存中讀取,但是MaxSize卻是我們一開始定義好的,我們應該訪問緩存即可,但是卻被我們經常改變的currentIndex所影響。
Padding的魔法
為了解決上面緩存行出現的問題,在Disruptor中采用了Padding的方式
?
其中的Value就被其他一些無用的long變量給填充了。這樣你修改Value的時候,就不會影響到其他變量的緩存行。
最后順便一提,在jdk8中提供了@Contended的注解,當然一般來說只允許Jdk中內部,如果你自己使用那就得配置Jvm參數 -RestricContentended = fase,將限制這個注解置位取消。很多文章分析了ConcurrentHashMap,但是都把這個注解給忽略掉了,在ConcurrentHashMap中就使用了這個注解,在ConcurrentHashMap每個桶都是單獨的用計數器去做計算,而這個計數器由于時刻都在變化,所以被用這個注解進行填充緩存行優化,以此來增加性能。
作者:tracy_668
鏈接:https://www.jianshu.com/p/bad7b4b44e48
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
?
下面的例子是測試利用cache line的特性和不利用cache line的特性的效果對比.
?
什么是偽共享
ArrayBlockingQueue有三個成員變量:
這三個變量很容易放到一個緩存行中, 但是之間修改沒有太多的關聯. 所以每次修改, 都會使之前緩存的數據失效, 從而不能完全達到共享的效果.
如上圖所示, 當生產者線程put一個元素到ArrayBlockingQueue時, putIndex會修改, 從而導致消費者線程的緩存中的緩存行無效, 需要從主存中重新讀取.
這種無法充分使用緩存行特性的現象, 稱為偽共享
3.1.3RingBuffer
ringbuffer到底是什么
它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞數據的buffer。
?
基本來說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(如下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置
?
?
隨著你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。
?
要找到數組中當前序號指向的元素,可以通過sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用這個方式來定位數組元素的,這種方式比取模的速度更快。
常用的隊列之間的區別
- 沒有尾指針。只維護了一個指向下一個可用位置的序號。
- 不刪除buffer中的數據,也就是說這些數據一直存放在buffer中,直到新的數據覆蓋他們
ringbuffer采用這種數據結構原因
- 因為它是數組,所以要比鏈表快,數組內元素的內存地址的連續性存儲的。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,因此在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。因為只要一個元素被加載到緩存行,其他相鄰的幾個元素也會被加載進同一個緩存行。
- 其次,你可以為數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味著不需要花大量的時間用于垃圾回收。此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的內存清理操作。
如何從Ringbuffer讀取
消費者(Consumer)是一個想從Ring Buffer里讀取數據的線程,它可以訪問ConsumerBarrier對象——這個對象由RingBuffer創建并且代表消費者與RingBuffer進行交互。就像Ring Buffer顯然需要一個序號才能找到下一個可用節點一樣,消費者也需要知道它將要處理的序號——每個消費者都需要找到下一個它要訪問的序號。在上面的例子中,消費者處理完了Ring Buffer里序號8之前(包括8)的所有數據,那么它期待訪問的下一個序號是9。
消費者可以調用ConsumerBarrier對象的waitFor()方法,傳遞它所需要的下一個序號.
?
final long availableSeq = consumerBarrier.waitFor(nextSequence);ConsumerBarrier返回RingBuffer的最大可訪問序號——在上面的例子中是12。ConsumerBarrier有一個WaitStrategy方法來決定它如何等待這個序號.
接下來
接下來,消費者會一直逛來逛去,等待更多數據被寫入 Ring Buffer。并且,寫入數據后消費者會收到通知——節點 9,10,11 和 12 已寫入。現在序號 12 到了,消費者可以指示 ConsumerBarrier 去拿這些序號里的數據了。
?
在Disruptor中采用了數組的方式保存了我們的數據,上面我們也介紹了采用數組保存我們訪問時很好的利用緩存,但是在Disruptor中進一步選擇采用了環形數組進行保存數據,也就是RingBuffer。在這里先說明一下環形數組并不是真正的環形數組,在RingBuffer中是采用取余的方式進行訪問的,比如數組大小為 10,0訪問的是數組下標為0這個位置,其實10,20等訪問的也是數組的下標為0的這個位置。
實際上,在這些框架中取余并不是使用%運算,都是使用的&與運算,這就要求你設置的大小一般是2的N次方也就是,10,100,1000等等,這樣減去1的話就是,1,11,111,就能很好的使用index & (size -1),這樣利用位運算就增加了訪問速度。
如果在Disruptor中你不用2的N次方進行大小設置,他會拋出buffersize必須為2的N次方異常。
- Producer會向這個RingBuffer中填充元素,填充元素的流程是首先從RingBuffer讀取下一個Sequence,之后在這個Sequence位置的槽填充數據,之后發布。
- Consumer消費RingBuffer中的數據,通過SequenceBarrier來協調不同的Consumer的消費先后順序,以及獲取下一個消費位置Sequence。
- Producer在RingBuffer寫滿時,會從頭開始繼續寫替換掉以前的數據。但是如果有SequenceBarrier指向下一個位置,則不會覆蓋這個位置,阻塞到這個位置被消費完成。Consumer同理,在所有Barrier被消費完之后,會阻塞到有新的數據進來。
Disruptor的設計方案
Disruptor通過以下設計來解決隊列速度慢的問題:
- 環形數組結構
為了避免垃圾回收, 采用數組而非鏈表. 同時, 數組對處理器的緩存機制更加友好. - 元素位置定位
數組長度2^n, 通過位運算, 加快定位的速度. 下標采取遞增的形式. 不用擔心index溢出的問題. index是long類型, 即使100萬QPS的處理速度, 也需要30萬年才能用完. - 無鎖設計
每個生產者或者消費者線程, 會先申請可以操作的元素在數組中的位置, 申請到之后, 直接在該位置寫入或者讀取數據.
下面忽略數組的環形結構, 介紹一下如何實現無鎖設計. 整個過程通過原子變量CAS, 保證操作的線程安全.
一個生產者
生產者單線程寫數據的流程比較簡單:
若是返回的正確, 則生產者開始寫入元素.
多個生產者
多個生產者的情況下, 會遇到“如何防止多個線程重復寫同一個元素”的問題. Disruptor的解決方法是, 每個線程獲取不同的一段數組空間進行操作. 這個通過CAS很容易達到. 只需要在分配元素的時候, 通過CAS判斷一下這段空間是否已經分配出去即可.
但是會遇到一個新問題: 如何防止讀取的時候, 讀到還未寫的元素. Disruptor在多個生產者的情況下, 引入了一個與Ring Buffer大小相同的buffer: available Buffer. 當某個位置寫入成功的時候, 便把availble Buffer相應的位置置位, 標記為寫入成功. 讀取的時候, 會遍歷available Buffer, 來判斷元素是否已經就緒.
讀數據
生產者多線程寫入的情況會復雜很多:
如下圖所示, 讀線程讀到下標為2的元素, 三個線程Writer1/Writer2/Writer3正在向RingBuffer相應位置寫數據, 寫線程被分配到的最大元素下標是11.
讀線程申請讀取到下標從3到11的元素, 判斷writer cursor>=11. 然后開始讀取availableBuffer, 從3開始, 往后讀取, 發現下標為7的元素沒有生產成功, 于是WaitFor(11)返回6.
然后, 消費者讀取下標從3到6共計4個元素.
?
?
寫數據
多個生產者寫入的時候:
如下圖所示, Writer1和Writer2兩個線程寫入數組, 都申請可寫的數組空間. Writer1被分配了下標3到下表5的空間, Writer2被分配了下標6到下標9的空間.
Writer1寫入下標3位置的元素, 同時把available Buffer相應位置置位, 標記已經寫入成功, 往后移一位, 開始寫下標4位置的元素. Writer2同樣的方式. 最終都寫入完成.
?
防止不同生產者對同一段空間寫入的代碼, 如下所示:
通過do/while循環的條件cursor.compareAndSet(current, next), 來判斷每次申請的空間是否已經被其他生產者占據. 假如已經被占據, 該函數會返回失敗, While循環重新執行, 申請寫入空間.
消費者的流程與生產者非常類似, 這兒就不多描述了. Disruptor通過精巧的無鎖設計實現了在高并發情形下的高性能.
3.2Disruptor怎么使用
?
在Disruptor中有幾個比較關鍵的:
- ThreadFactory:這是一個線程工廠,用于我們Disruptor中生產、消費的時候需要的線程。
- EventFactory:事件工廠,用于產生我們隊列元素的工廠。在Disruptor中,他會在初始化的時候直接填充滿RingBuffer,一次到位。
- EventHandler:用于處理Event的handler,這里一個EventHandler可以看做是一個消費者,但是多個EventHandler他們都是獨立消費的隊列。
- WorkHandler:也是用于處理Event的handler,和上面區別在于,多個消費者都是共享同一個隊列。
- WaitStrategy:等待策略,在Disruptor中有多種策略,來決定消費者在消費時,如果沒有數據采取的策略是什么?下面簡單列舉一下Disruptor中的部分策略
?
總結
以上是生活随笔為你收集整理的每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SQL注入(1)
- 下一篇: 201571030128/2015710