Flume 1.7 源码分析(三)程序入口
Flume 1.7 源碼分析(一)源碼編譯 
 Flume 1.7 源碼分析(二)整體架構 
 Flume 1.7 源碼分析(三)程序入口 
 Flume 1.7 源碼分析(四)從Source寫數據到Channel 
4 程序入口
啟動Flume的過程可以簡單分為2個步驟: 
 1. 獲取相關配置文件(一般來說就是flume-conf.properties)。 
 2. 啟動各組件。不特別說明,本文中的組件是指實現了LifecycleAware接口的類的對象,一般就是Source、Channel、Sink這3種對象。
4.1 獲取啟動配置
4.1.1 Main函數
啟動Flume的Main函數在flume-ng-node模塊的org.apache.flume.node.Application。該函數的功能可以簡單劃分為以下三個步驟: 
 1. 使用commons.cli類獲取命令行參數(就是啟動時傳入的參數) 
 2. 根據啟動參數確定的讀取配置的方式。讀取配置的方式總共有4種,分別根據配置是保存在zookeeper上還是本地properties文件、以及是否reload(自動重載配置文件)分為4種方式。 
 3. 根據相應的配置啟動程序,并注冊關閉鉤子。 
 接下來以properties文件、不重載的方式為例,主要的代碼如下:
上面的代碼,有兩處比較關鍵:
- configurationProvider.getConfiguration()會返回一個MaterializedConfiguration類型的對象,用于從文件形式的配置轉為物化的配置,即包含實際的channel、sinkRunner等對象的實例,在“物化配置”一節分析。
- handleConfigurationEvent用于停止所有components,并使用新的配置進行啟動,在“使用新配置重啟”一節分析。
4.1.2 物化配置
configurationProvider.getConfiguration()方法主要做了以下兩件事: 
 1. 讀取配置文件(flume-conf.properties),保存在AgentConfiguration對象中。
到這個步驟還僅僅是做好了分類的文本形式的配置項。 
 2. 創建出配置中的各組件實例,并添加到MaterializedConfiguration實例中。
在這個實例中,可以獲取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相關組件的實例。
4.2 啟動所有組件
4.2.1 使用新配置重啟
有了上面的MaterializedConfiguration實例,我們就可以啟動組件了。 
 在handleConfigurationEvent方法中,首先會停止所有組件,然后再啟動所有組件。
在startAllComponents方法中,會遍歷組件列表(SourceRunners、SinkRunners、Channels),分別調用supervise方法。以Channel為例:
for (Entry<String, Channel> entry :materializedConfiguration.getChannels().entrySet()) {try {logger.info("Starting Channel " + entry.getKey());supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);} catch (Exception e) {logger.error("Error while starting {}", entry.getValue(), e);} }這個supervise方法簡單來說,就是將相應組件的狀態轉化為期望的狀態。例如上面代碼中的LifecycleState.START就是期望的狀態。
4.2.2 LifecycleSupervisor
上節的supervisor是一個LifecycleSupervisor對象。前面有說到,在創建Application的時候初始化了一個LifecycleSupervisor對象,就是這里的supervisor。這個對象,我理解為各組件生命周期的管理者,用于實時監控所有組件的狀態,如果不是期望的狀態(desiredState),則進行狀態轉換。
上節的代碼中調用了supervisor.supervise方法,接下來分析一下supervise這個方法:
public synchronized void supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState) {//省略狀態檢查的代碼 Supervisoree process = new Supervisoree();process.status = new Status();process.policy = policy;process.status.desiredState = desiredState;process.status.error = false;MonitorRunnable monitorRunnable = new MonitorRunnable();monitorRunnable.lifecycleAware = lifecycleAware;monitorRunnable.supervisoree = process;monitorRunnable.monitorService = monitorService;supervisedProcesses.put(lifecycleAware, process);ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(monitorRunnable, 0, 3, TimeUnit.SECONDS);monitorFutures.put(lifecycleAware, future); }由于所有的組件都實現了LifecycleAware接口,所以這里的supervise方法傳入的是LifecycleAware接口的對象。
可以看到創建了一個Supervisoree對象,顧名思義,就是被監控的的對象,該對象有以下幾種狀態:IDLE, START, STOP, ERROR。 
 scheduleWithFixedDelay每隔3秒觸發一次監控任務(monitorRunnable)。
4.2.3 MonitorRunnable
在MonitorRunnable中主要是檢查組件的狀態,并實現從lifecycleState到desiredState的轉變。
switch (supervisoree.status.desiredState) {case START:try {lifecycleAware.start();} catch (Throwable e) {省略}break;case STOP:try {lifecycleAware.stop();} catch (Throwable e) {省略}break;default:logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); }到這里為止,可以看到監控的進程,調用了組件自己的start和stop方法來啟動、停止。前面有提到有3種類型的組件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化計數器,沒什么實質內容,所以接下來從SourceRunner的啟動(從Source寫數據到Channel)和SinkRunner的啟動(從Channel獲取數據寫入Sink)來展開說明。
總結
以上是生活随笔為你收集整理的Flume 1.7 源码分析(三)程序入口的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Flume 1.7 源码分析(二)整体架
- 下一篇: Flume 1.7 源码分析(四)从So
