Disruptor 源码阅读笔记--转
原文地址:http://coderbee.net/index.php/open-source/20130812/400
一、Disruptor 是什么?
Disruptor 是一個高性能異步處理框架,也可以認為是一個消息框架,它實現了觀察者模式。
Disruptor 比傳統的基于鎖的消息框架的優勢在于:它是無鎖的、CPU友好;它不會清除緩存中的數據,只會覆蓋,降低了垃圾回收機制啟動的頻率。
這個解讀是在最新版 3.1.1 的源碼上進行。
關于Disruptor的更多介紹可見:?http://ifeve.com/disruptor/
二、Disruptor 為什么快
- 不使用鎖。通過內存屏障和原子性的CAS操作替換鎖。
?
-
緩存基于數組而不是鏈表,用位運算替代求模。緩存的長度總是2的n次方,這樣可以用位運算?i & (length - 1)?替代?i % length。
-
去除偽共享。CPU的緩存一般是以緩存行為最小單位的,對應主存的一塊相應大小的單元;當前的緩存行大小一般是64字節,每個緩存行一次只能被一個CPU核訪問,如果一個緩存行被多個CPU核訪問,就會造成競爭,導致某個核必須等其他核處理完了才能繼續處理,響應性能。去除偽共享就是確保CPU核訪問某個緩存行時不會出現爭用。
-
預分配緩存對象,通過更新緩存里對象的屬性而不是刪除對象來減少垃圾回收。
?
三、Disruptor 架構
核心類和接口
- EventHandler:用戶提供具體的實現,在里面實現事件的處理邏輯。
- Sequence:代表事件序號或一個指向緩存某個位置的序號。
- WaitStrategy:功能包括:當沒有可消費的事件時,根據特定的實現進行等待,有可消費事件時返回可事件序號;有新事件發布時通知等待的?SequenceBarrier。
- Sequencer:生產者用于訪問緩存的控制器,它持有消費者序號的引用;新事件發布后通過WaitStrategy?通知正在等待的SequenceBarrier。
- SequenceBarrier:消費者關卡。消費者用于訪問緩存的控制器,每個訪問控制器還持有前置訪問控制器的引用,用于維持正確的事件處理順序;通過WaitStrategy獲取可消費事件序號。
- EventProcessor:事件處理器,是可執行單元,運行在指定的Executor里;它會不斷地通過SequenceBarrier獲取可消費事件,當有可消費事件時調用用戶提供的?EventHandler實現處理事件。
- EventTranslator:事件轉換器,由于Disruptor只會覆蓋緩存,需要通過此接口的實現來更新緩存里的事件來覆蓋舊事件。
- RingBuffer:基于數組的緩存實現,它內部持有對Executor、WaitStrategy、生產者和消費者訪問控制器的引用。
- Disruptor:提供了對?RingBuffer?的封裝,并提供了一些DSL風格的方法,方便使用。
每個事件處理器EventProcessor都持有一個表示它最后處理的事件的序號的Sequence,所以可以用Sequence來代表事件處理器;
下面是Disruptor里事件處理的一個示例圖:
四、實現
Sequence 類
Sequence類表示一個序號,是對long型字段的線程安全的封裝,用于跟蹤ringBuffer的進度和事件處理器的進度。
支持一些并發操作,包括CAS和有序寫。
嘗試在volatile字段周圍填充內容來避免偽共享,變得更高效。
實現
public class Sequence {static final long INITIAL_VALUE = -1L;private static final Unsafe UNSAFE;private static final long VALUE_OFFSET;static {UNSAFE = Util.getUnsafe();final int base = UNSAFE.arrayBaseOffset( long[].class );final int scale = UNSAFE.arrayIndexScale( long[].class );VALUE_OFFSET = base + (scale * 7);}// 15個元素,從0開始,有效值處于第7個,這樣前后各有7個long字段填充,// 8個long型占共用64字節,而當前CPU的緩存行大小也是64字節,這樣可以避免對Sequence的讀寫出現偽共享。private final long [] paddedValue = new long [15];// 原子地讀public long get() {return UNSAFE .getLongVolatile(paddedValue, VALUE_OFFSET);}// 原子地寫public void set(final long value) {UNSAFE.putOrderedLong(paddedValue , VALUE_OFFSET, value);}// CASpublic boolean compareAndSet(final long expectedValue, final long newValue) {return UNSAFE .compareAndSwapLong(paddedValue, VALUE_OFFSET, expectedValue, newValue);}public long addAndGet(final long increment) {long currentValue;long newValue;do {currentValue = get();newValue = currentValue + increment;} while (!compareAndSet(currentValue, newValue));return newValue;}// 還有其他一些方法,都是借助 sun.misc.Unsafe 類來實現的。偽共享
關于偽共享可參考:
- 《剖析Disruptor:為什么會這么快?(二)神奇的緩存行填充》?http://ifeve.com/disruptor-cacheline-padding/
?
-
《偽共享(False Sharing)》-?http://ifeve.com/falsesharing/
Disruptor類
Disruptor類是這個類庫的門面,用DSL的形式直觀地提供了組裝事件回調處理的關系鏈的功能,并提供獲取事件、發布事件的方法,緩存容器生命周期管理。
屬性
private final RingBuffer ringBuffer ; // 核心,絕大多數功能都委托給ringBuffer處理 private final Executor executor ; // 用于執行事件處理器的線程池 private final ConsumerRepository consumerRepository = new ConsumerRepository(); // 事件處理器倉庫,就是事件處理器的集合 private final AtomicBoolean started = new AtomicBoolean( false); // 啟動時檢查,只能啟動一次 private ExceptionHandler exceptionHandler; // 異常處理器設置EventHandler事件處理器
/** barrierSequences是eventHandlers的前置事件處理關卡,是用來保證事件處理的時序性的關鍵;* */ EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences,final EventHandler[] eventHandlers) {checkNotStarted(); // 確保在容器啟動前設置final Sequence[] processorSequences = new Sequence[eventHandlers.length ]; // 存放游標的數組final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 獲取前置的序號關卡for ( int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {final EventHandler eventHandler = eventHandlers[i];// 封裝為批量事件處理器BatchEventProcessor,其實現了Runnable接口,所以可以放到executor去執行處理邏輯;處理器還會自動建立一個序號Sequence。final BatchEventProcessor batchEventProcessor = new BatchEventProcessor(ringBuffer , barrier, eventHandler);if (exceptionHandler != null) { // 如果有則設置異常處理器batchEventProcessor.setExceptionHandler( exceptionHandler);}// 添加到消費者倉庫,會先封裝為EventProcessorInfo對象(表示事件處理的一個階段),consumerRepository.add(batchEventProcessor, eventHandler, barrier);processorSequences[i] = batchEventProcessor.getSequence();}if (processorSequences. length > 0) {// 如果有前置關卡,則取消之前的前置關卡對應的EventProcessor 的 鏈的終點標記。consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}// EventHandlerGroup是一組EventProcessor,作為disruptor的一部分,提供DSL形式的方法,作為方法鏈的起點,用于設置事件處理器。return new EventHandlerGroup(this, consumerRepository, processorSequences); }這里要注意的是,EventHandler只能在啟動前添加。
從代碼來看,EventHandler是用戶提供的,單純的的事件處理邏輯的實現,在被添加到消費者倉庫之前,它會被封裝為一個EventProcessor對象。
RingBuffer 類
屬性
首先來看下RingBuffer類的屬性:
// 屬性的初始化聲明public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE ;private final int indexMask ;private final Object[] entries ;private final int bufferSize ;private final Sequencer sequencer ;// 屬性的初始化代碼RingBuffer(EventFactory eventFactory,Sequencer sequencer) {this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();if (bufferSize < 1) {throw new IllegalArgumentException("bufferSize must not be less than 1");}if (Integer.bitCount( bufferSize) != 1) {throw new IllegalArgumentException("bufferSize must be a power of 2");}this.indexMask = bufferSize - 1;this.entries = new Object[sequencer.getBufferSize()];fill(eventFactory);}上這些代碼可以看出:
?
緩存的大小必須是2的X次方,這是為了用位運算提高性能;由于數組緩存的容量總是有限,當緩存填滿后,又要從 下標0 開始填充,如果緩存大小不是2的X次方,那只能用求模運算來獲得新下標,所以還有個indexMask 來保存下標掩碼;通過與indexMask 進行按位與可以得到一個安全的下標,不再需要進行下標檢查,如:(E)entries[( int)sequence & indexMask ]。
從緩存獲取事件
// 從緩存獲取指定序號的事件 public E get(long sequence) {// 這個按位與操作說明了為什么ringBuffer的大小必須是2的n次方:用高效的 按位與 代替 低效的求模操作。return (E) entries[(int ) sequence & indexMask]; }發布事件
發布事件有3步:獲取新事件的序號,覆蓋舊事件,通知等待著。最簡單的發布事件形式:
public void publishEvent(EventTranslator translator) {final long sequence = sequencer .next(); // 通過生產者序號控制器獲取可用序號translateAndPublish(translator, sequence); // 轉換事件到隊列緩存并發布事件 }private void translateAndPublish(EventTranslator translator, long sequence) {try {// 發布事件前要先獲取對應位置上的舊事件,再用translator把新事件的屬性轉換到舊事件的屬性,從而達到發布的目的。// 這就是說,Disruptor對于已消費的事件是不刪除的,有新事件時只是用新事件的屬性去替換舊事件的屬性。// 這帶來的一個問題就是內存占用translator.translateTo(get(sequence), sequence);} finally {sequencer.publish(sequence); // 原子性地更新生產者的序號,并通知在等待的消費者關卡。} }需要注意的是,生產者序號控制器與消費者關卡是共用同一個等待策略的,一個Disruptor容器只有一個等待策略實例。
EventProcessor
事件處理器的執行單元。有兩個實現:NoOpEventProcessor?和?BatchEventProcessor,其中NoOpEventProcessor?是不處理事件的,就不關注了。
BatchEventProcessor
Disruptor提供的唯一有用的?EventProcessor?實現類。
Disruptor容器啟動時,會調用?ConsumerInfo?的?start方法,如果?ConsumerInfo?封裝的是用戶提交的EventHandler?實例,那么會在線程池里運行?EventProcessor,也就是?BatchEventProcessor?實例的?run方法。
核心run方法
該方法的文檔說明提到調用?halt?方法后是可以重新執行這個方法的。
public void run() {// 確保一次只有一個線程執行此方法,這樣訪問自身的序號就不要加鎖if (! running.compareAndSet(false, true)) {throw new IllegalStateException("Thread is already running");}sequenceBarrier.clearAlert(); // 清除前置序號關卡的通知狀態notifyStart(); // 聲明周期通知,開始前回調T event = null;long nextSequence = sequence.get() + 1L; // sequence指向上一個已處理的事件,默認是-1.try {while (true ) {try {// 從它的前置序號關卡獲取下一個可處理的事件序號。// 如果這個事件處理器不依賴于其他的事件處理器,則前置關卡就是生產者序號;// 如果這個事件處理器依賴于1個或多個事件處理器,那么這個前置關卡就是這些前置事件處理器中最慢的一個。// 通過這樣,可以確保事件處理器不會超前處理地事件。final long availableSequence = sequenceBarrier.waitFor(nextSequence);// 處理一批事件while (nextSequence <= availableSequence) {event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}// 設置它自己最后處理的事件序號,這樣依賴于它的處理器可以它處理剛處理過的事件。sequence.set(availableSequence);} catch (final TimeoutException e) {// 獲取事件序號超時處理notifyTimeout( sequence.get());} catch (final AlertException ex) {// 處理通知事件;檢測是否要停止,如果非則繼續處理事件if (!running .get()) {break;}} catch (final Throwable ex) {// 其他異常,用事件處理器處理;然后繼續處理下一個事件exceptionHandler.handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}} finally {// 聲明周期通知,停止事件回調;復位運行狀態標志,確保可以再次運行此方法。notifyShutdown();running.set(false );} }從?while?循環可以看出,事件處理可以分為三步:
sequence .set?和?sequence .get?方法都是原子性地讀取、更新序號的,這樣就避免了加鎖,從而提供性能。
sequenceBarrier .waitFor?最終也會調用?sequence .get?方法。
SequenceBarrier
協作式關卡,用于跟蹤生產者游標和依賴的事件處理器的序號的數據結構。有兩個實現DummySequenceBarrier?和?ProcessingSequenceBarrier,類如其名,前者是虛擬的,只有空方法;后者是實用的。
SequenceBarrier接口定義
public interface SequenceBarrier {// 等待指定的序號變得可消費long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;// 返回當前可讀的游標(一個序號)long getCursor();// 當前是否有通知狀態給此關卡boolean isAlerted();// 通知事件處理器狀態發生改變,并保持這個狀態直到被清除void alert();// 清除當前通知狀態void clearAlert();// 檢查通知狀態,如果有異常則拋出void checkAlert() throws AlertException;ProcessingSequenceBarrier
生成
ProcessingSequenceBarrier的實例是由框架控制的。
首先在Disruptor類的createEventProcessors方法內: final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 獲取前置的序號關卡RingBufferd 的newBarrier方法: public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {return sequencer.newBarrier(sequencesToTrack); // 是通過生產者序號控制器生成的。 }AbstractSequencer的newBarrier方法。 public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); }構造函數
/*** @param sequencer 生產者序號控制器* @param waitStrategy 等待策略* @param cursorSequence 生產者序號* @param dependentSequences 依賴的Sequence*/ public ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,final Sequence cursorSequence, final Sequence[] dependentSequences) {this. sequencer = sequencer;this. waitStrategy = waitStrategy;this. cursorSequence = cursorSequence;// 如果事件處理器不依賴于任何前置處理器,那么dependentSequence也指向生產者的序號。if (0 == dependentSequences. length) {dependentSequence = cursorSequence;} else { // 如果有多個前置處理器,則對其進行封裝,實現了組合模式。dependentSequence = new FixedSequenceGroup(dependentSequences);} }獲取序號的方法
/*** 該方法不保證總是返回未處理的序號;如果有更多的可處理序號時,返回的序號也可能是超過指定序號的。*/ public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {// 首先檢查有無通知checkAlert();// 通過等待策略來獲取可處理事件序號,long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence , this);// 這個方法不保證總是返回可處理的序號return availableSequence;if (availableSequence < sequence) {}// 再通過生產者序號控制器返回最大的可處理序號return sequencer.getHighestPublishedSequence(sequence, availableSequence); }WaitStrategy
WaitStrategy定義了一個EventProcessor在Sequence沒有可消費事件時的等待策略。
接口定義
public interface WaitStrategy {/*** 如果事件處理器不依賴于任何前置處理器,那么cursor與dependentSequence都將指向生產者的序號。* * sequence:要獲取的序號* cursor:指向了生產者的Sequence* dependentSequence:調用者依賴的前置關卡* barrier:調用者自身,通過調用barrier.checkAlert可以及時響應通知*/long waitFor( long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)throws AlertException, InterruptedException, TimeoutException;// 當游標前進的時候(有可處理的事件)通知EventProcessorvoid signalAllWhenBlocking(); }實現
BlockingWaitStrategy:沒有可消費事件時阻塞等待生產者喚醒。
BusySpinWaitStrategy:忙等策略。
PhasedBackoffWaitStrategy:
TimeoutBlockingWaitStrategy:
YieldingWaitStrategy:通過調用Thread.yield方法來讓出CPU,達到等待的目的,等待時長沒保證,取決于線程的調度系統。
小結
通過緩沖行填充和適當的封裝,Disruptor提供了一個CPU友好、線程安全的序號表示。
通過每個消費者持有自己的事件序號,沒有相互依賴的消費者可以并行地處理事件。在消費者之間引入消費者關卡,輕易地實現了消費者之間的前后依賴關系。
對于生產者,即使有多個線程同時訪問,由于他們都通過序號器Sequencer訪問ringBuffer,Disruptor框架通過CAS取代了加鎖和同步塊,這也是并發編程的一個指導原則:把同步塊最小化到一個變量上。
通過原子讀寫序號、CAS操作消除了對鎖的使用,提高了性能。
Distuptor的一個缺點是內存占用:因為它不清除舊事件數據。
附錄:
http://developer.51cto.com/art/201306/399370.htm
轉載于:https://www.cnblogs.com/davidwang456/p/4579865.html
總結
以上是生活随笔為你收集整理的Disruptor 源码阅读笔记--转的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jodd-StringTemplateP
- 下一篇: java中如何忽略字符串中的转义字符--