Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源碼分析(一)源碼編譯 
 Flume 1.7 源碼分析(二)整體架構 
 Flume 1.7 源碼分析(三)程序入口 
 Flume 1.7 源碼分析(四)從Source寫數據到Channel 
 Flume 1.7 源碼分析(五)從Channel獲取數據寫入Sink 
5 從Source寫數據到Channel
5.1 Source部分
5.1.1 SourceRunner
SourceRunner就是專門用于運行Source的一個類。 
 在”物化配置”一節獲取配置信息后,會根據Source去獲取具體的SourceRunner,調用的是SourceRunner的forSource方法。
可以看到source分為了2種類型,并有對應的sourceRunner(PollableSourceRunner、EventDrivenSourceRunner)。這2種source區別在于是否需要外部的驅動去獲取數據,不需要外部驅動(采用自身的事件驅動機制)的稱為EventDrivenSource,需要外部驅動的稱為PollableSource。
- 常見的EventDrivenSource:AvroSource、ExecSource、SpoolDirectorySource。
- 常見的PollableSource:TaildirSource、kafkaSource、JMSSource。
以EventDrivenSourceRunner為例,由MonitorRunnable調用其start方法:
public void start() {Source source = getSource();ChannelProcessor cp = source.getChannelProcessor();cp.initialize();//用于初始化Interceptorsource.start();lifecycleState = LifecycleState.START; }這里的ChannelProcessor是比較重要的一個類,后面會具體說。接下來調用了Source的start方法。可以對照一下之前的整體架構的圖,start方法實現的就是這個部分:
5.1.2 ExecSource
以ExecSource的start方法為例:
public void start() {executor = Executors.newSingleThreadExecutor();runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);runnerFuture = executor.submit(runner);sourceCounter.start();super.start(); }主要啟動了一個線程runner,初始化了一下計數器。具體實現還是要看ExecRunable類的run方法:
public void run() {do {timedFlushService = Executors.newSingleThreadScheduledExecutor(…); //使用配置的參數啟動Shell命令String[] commandArgs = command.split("\\s+");process = new ProcessBuilder(commandArgs).start(); //設置標準輸入流reader = new BufferedReader(new InputStreamReader(process.getInputStream()…));//設置錯誤流 StderrReader stderrReader = new StderrReader(…);stderrReader.start(); //啟動定時任務,將eventList中數據批量寫入到Channelfuture = timedFlushService.scheduleWithFixedDelay(new Runnable() {public void run() {synchronized (eventList) {if (!eventList.isEmpty() && timeout()) {flushEventBatch(eventList);}}}},batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); //按行讀取標準輸出流的內容,并寫入eventListwhile ((line = reader.readLine()) != null) {synchronized (eventList) {sourceCounter.incrementEventReceivedCount();eventList.add(EventBuilder.withBody(line.getBytes(charset))) //超出配置的大小或者超時后,將eventList寫到Channelif (eventList.size() >= bufferCount || timeout()) {flushEventBatch(eventList);} } }synchronized (eventList) {if (!eventList.isEmpty()){flushEventBatch(eventList);}}} while (restart);//如果配置了自動重啟,當Shell命令的進程結束時,自動重啟命令。 }在該方法中啟動了2個reader,分別取讀取標準輸入流和錯誤流,將標準輸入流中的內容寫入eventList。
與此同時啟動另外一個線程,調用flushEventBatch方法,定期將eventList中的數據寫入到Channel。
private void flushEventBatch(List<Event> eventList) {channelProcessor.processEventBatch(eventList);//假如這里異常的話,eventList還沒有清空sourceCounter.addToEventAcceptedCount(eventList.size());eventList.clear();lastPushToChannel = systemClock.currentTimeMillis(); }可以看到這里調用了channelProcessor.processEventBatch()來寫入Channel。
5.2 Channel部分
5.2.1 ChannelProcessor
ChannelProcessor的作用是執行所有interceptor,并將eventList中的數據,發送到各個reqChannel、optChannel。ReqChannel和optChannel是通過channelSelector來獲取的。
public interface ChannelSelector extends NamedComponent, Configurable {public void setChannels(List<Channel> channels);public List<Channel> getRequiredChannels(Event event);public List<Channel> getOptionalChannels(Event event);public List<Channel> getAllChannels();//獲取在當前Source中配置的全部Channel }如果要自定義一個ChannelSelector,只需要繼承AbstractChannelSelector后,實現getRequiredChannels和getOptionalChannels即可。
ReqChannel代表一定保證存儲的Channel(失敗會不斷重試),optChannel代表可能存儲的Channel(即失敗后不重試)。
ReqChannel與optChannel的區別從代碼上來看,前者在出現異常時,會在執行完回滾后往上層拋,而optChannel則只執行回滾。注意到回滾操作只清空putList(5.2.4節會說明),而這一層如果沒有拋出異常的話,調用方(也就是上節的flushEventBatch)會清空eventList,也就是異常之后的數據丟失了。
發送其中一條數據的代碼如下:
try {tx.begin();reqChannel.put(event);tx.commit(); } catch (Throwable t) {tx.rollback();//省略部分代碼 }其中put調用Channel的doPut方法,commit調用Channel的doCommit方法。 
 Channel主要包含4個主要方法:doPut、doTake、doCommit、doRollback。下面以MemoryChannel為例說明。
5.2.2 doPut方法
在這個方法中,只包含了遞增計數器和將事件添加到putList。
protected void doPut(Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount();int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("");}putByteCounter += eventByteSize; }假如這個方法中出現了異常,則會拋到ChannelProcessor中執行回滾操作。
5.2.3 doCommit方法
這個方法是比較復雜的方法之一,原因在于put和take操作的commit都是通過這個方法來進行的,所以代碼里面其實混合了2個功能(即put和take操作)所需的提交代碼。
單純從Source寫數據到Channel這件事情,流程為eventList->putList->queue。
由于前面已經完成了把數據放到putList中,那接下來要做的事情就是將putList中數據放入queue中就可以了。這個部分先說明到這里,下一個章節結合take操作一起看這個方法。
5.2.4 doRollback方法
與doCommit方法類似,這里的回滾,也分為2種情況:由take操作引起的和由put方法引起的。
這里先說由put發起的,該transaction的流程如下: 
 eventList->putList->queue
由于doPut和doCommit執行出現異常就直接跳出了,還沒執行清空語句(這里可以參考“ExecSource“章節的最后一段代碼的注釋部分),也就是eventList還沒有清空,所以可以直接清空putList,這樣下次循環還會重新讀取該eventList中的數據。
附注:在put操作commit的時候,如果部分數據已經放進queue的話,這個時候回滾,那是否存在數據重復問題呢?根據代碼,由于在放隊列這個操作之前已經做過很多判斷(容量等等),這個操作只是取出放進隊列的操作,而這個代碼之后,也只是一些設置計數器的操作,理論上不會出現異常導致回滾了。
總結
以上是生活随笔為你收集整理的Flume 1.7 源码分析(四)从Source写数据到Channel的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Flume 1.7 源码分析(三)程序入
- 下一篇: Flume 1.7 源码分析(五)从Ch
