disruptor实现细节及源码分析
disruptor實現細節及源碼分析
一、?????背景介紹
???? Disruptor它是一個開源的并發框架,并獲得?2011 Duke’s?程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue并發操作。
???? 說明:下文所有內容基于disruptor3.34版本。
二、?????應用場景
???? 在消費者--生產者模式中或發布訂閱模式中使用。
????具有以下特點:
無鎖的設計及CAS式的原子訪問。
預分配存儲空間,避免垃圾回收帶來的資源消耗。
三、?????核心對象
????????RingBuffer:環形的一個數據結構,對象初始化時,會使用事件Event進行填充。Buffer的大小必須是2的冪次方,方便移位操作。
?????????
????????Event:無指定具體接口,用戶自己實現,可以攜帶任何業務數據。
?????????
????????EventFactory:產生事件Event的工廠,由用戶自己實現。
?????????
????????EventTranslator:事件發布的回調接口,由用戶實現,負責將業務參數設置到事件中。
?????????
????????Sequencer:序列產生器,也是協調生產者和消費者及實現高并發的核心。有MultiProducerSequencer 和 SingleProducerSequencer兩個實現類。
?????????
????????SequenceBarrier:擁有RingBuffer的發布事件Sequence引用和消費者依賴的Sequence引用。決定消費者消費可消費的Sequence。
?????????
????????EventHandler:事件的處理者,由用戶自己實現。
?????????
????????EventProcessor:事件的處理器,單獨在一個線程中運行。
?????????
????????WorkHandler:事件的處理者,由用戶自己實現。
?????????
????????WorkProcessor:事件的處理器,單獨在一個線程中運行。
?????????
????????WorkerPool:一組WorkProcessor的處理。
?????????
????????WaitStrategy:在消費者比生產者快時,消費者處理器的等待策略。
?
四、?????簡單示例
定義業務數據類:
定義事件類:
定義事件處理類:
定義事件工廠類:
定義事件發布輔助類:
主類:
五、?????實現原理及源碼分析
RingBuffer的實現:
? ? 封裝了一個對象數組,RingBuffer實例化時,用Event填充。生產者和消費者通過對序列(long的原子操作封裝)取模計算獲取對象數組中Event。
?
public?E?get(long?sequence){return?elementAt(sequence); }protected?final?E?elementAt(long?sequence){return?(E)?UNSAFE.getObject(entries,?REF_ARRAY_BASE?+?((sequence?&?indexMask)?<<?REF_ELEMENT_SHIFT));}單個生產者的實現:
? ? 保存有所有消費者當前消費的前一個序列值,在取下一個要發布的序列時,檢查要發布的序列是否覆蓋所有消費者正在處理的最小序列。如果未覆蓋,則獲取可發布的游標值,如果覆蓋(說明緩存已經滿了),則自旋等待,直到可以發布。發布事件時則先發布,后指定當前游標為發布的序列值。
?
public?long?next(int?n){if?(n?<?1){thrownew?IllegalArgumentException("n?must?be?>?0");}//當前生產者發布的的最大序列long?nextValue?=?this.nextValue;long?nextSequence?=?nextValue?+?n;//要發布的最大序列long?wrapPoint?=?nextSequence?-?bufferSize;//覆蓋點long?cachedGatingSequence?=?this.cachedValue;//消費者中處理序列最小的前一個序列//緩存已滿??或者處理器處理異常時if?(wrapPoint?>?cachedGatingSequence?||cachedGatingSequence?>?nextValue){long?minSequence;//等待直到有可用的緩存while?(wrapPoint?>?(minSequence?=?Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);//?TODO:?Use?waitStrategy?to?spin?}this.cachedValue?=?minSequence;}//更新當前生產者發布的的最大序列this.nextValue?=?nextSequence;return?nextSequence;}多個生產者的實現:
????保存有所有消費者當前消費的前一個序列值,并維護一個和RingBuffer一樣大小的數組,在取下一個要發布的序列時,檢查要發布的序列是否覆蓋所有消費者正在處理的最小序列。如果未覆蓋,則先發布,后指定當前游標為發布的序列值,如果未覆蓋,則獲取可發布的游標值,如果覆蓋(說明緩存已經滿了),則自旋等待,直到可以發布。一個生產者獲取可發布的序列后,立即更新當前游標。發布事件時生產者每發布一個序列,則記錄到數組指定位置。
public?long?next(int?n){if?(n?<?1){thrownew?IllegalArgumentException("n?must?be?>?0");}long?current;long?next;do{//當前游標current?=?cursor.get();//要發布的游標next?=?current?+?n;//覆蓋點long?wrapPoint?=?next?-?bufferSize;//消費者中處理序列最小的前一個序列long?cachedGatingSequence?=?gatingSequenceCache.get();//緩存已滿??或者處理器處理異常時if?(wrapPoint?>?cachedGatingSequence?||cachedGatingSequence?>?current){long?gatingSequence?=?Util.getMinimumSequence(gatingSequences,?current);if?(wrapPoint?>?gatingSequence){LockSupport.parkNanos(1);//?TODO,?should?we?spin?based?on?the?waitstrategy?//緩存滿時,繼續再次嘗試continue;}//更新當前生產者發布的的最大序列gatingSequenceCache.set(gatingSequence);}elseif?(cursor.compareAndSet(current,?next)){//成功獲取到發布序列并設置當前游標成功時跳出循環break;}}while?(true);return?next;}消費者的實現:
????消費者保持一個自己的序列,每次累加后nextSequence,去獲取可訪問的最大序列。對于一個生產者,就是nextSequence到RingBuffer當前游標的序列。對于多個生產者,就是nextSequence到RingBuffer當前游標之間,最大的連續的序列集。
public?long?waitFor(finallong?sequence)throws?AlertException,InterruptedException,?TimeoutException{checkAlert();//獲取最大的可消費的序列,依賴等待策略long?availableSequence?=?waitStrategy.waitFor(sequence,?cursorSequence,?dependentSequence,?this);if?(availableSequence?<?sequence){return?availableSequence;}return?sequencer.getHighestPublishedSequence(sequence,availableSequence);}? ? ?一個生產者:
public?long?getHighestPublishedSequence(long?lowerBound,?long?availableSequence){//?返回最大序列availableSequencereturn?availableSequence; }? ? ?多個生產者:
public?boolean?isAvailable(long?sequence){int?index?=?calculateIndex(sequence);int?flag?=?calculateAvailabilityFlag(sequence);long?bufferAddress?=?(index?*?SCALE)?+?BASE;//相應位置上的值相等,說明已經發布該序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress)?==?flag;}@Overridepublic?long?getHighestPublishedSequence(long?lowerBound,?long?availableSequence){//從數組中找出未發布序列,即由小到大連續的發布序列for?(long?sequence?=?lowerBound;?sequence?<=?availableSequence;sequence++){if?(!isAvailable(sequence)){//返回未發布序列的前一個序列return?sequence?-?1;}}return?availableSequence;}等待策略:
????????消費者在緩存中沒有可以消費的事件時,采取的等待策略:
?????????
????????BlockingWaitStrategy:通過線程阻塞的方式,等待生產者喚醒
?????????
????????BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU。
?????????
????????LiteBlockingWaitStrategy:通過線程阻塞的方式,等待生產者喚醒,比BlockingWaitStrategy要輕,某些情況下可以減少阻塞的次數。
?????????
????????PhasedBackoffWaitStrategy:根據指定的時間段參數和指定的等待策略決定采用哪種等待策略。
?????????
????????SleepingWaitStrategy:可通過參數設置,使線程通過Thread.yield()主動放棄執行,通過線程調度器重新調度;或一直自旋等待。
????????TimeoutBlockingWaitStrategy:通過參數設置阻塞時間,如果超時則拋出異常。
?????????
????????YieldingWaitStrategy: 通過Thread.yield()主動放棄執行,通過線程調度器重新調度。
????????轉載于:https://blog.51cto.com/11246272/1745472
總結
以上是生活随笔為你收集整理的disruptor实现细节及源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PHP 表单处理
- 下一篇: mybatis学习笔记(3)-入门程序一