Oozie和Azkaban的技术选型和对比
https://blog.csdn.net/gaoqida/article/details/52165204
一.Azkaban和Oozie的工作流程
1.1 Azkaban工作流程
Azkaban將需要操作的信息打包成zip文件發送給Server端,Server對用戶的信息進行存儲。用戶在Web UI?或者通過HTTP Client發送操作請求后,Server會根據用戶定義的*.job文件(KV?匹配),執行zip包中的Jar文件。
源碼的執行過程:
?
1.從Web頁面提交工作流程:
Method.GET
/executor?projectId=33&project=testSpark&ajax=executeFlow&flow=test1&disabled=%5B%5D&failureEmailsOverride=false&successEmailsOverride=false&failureAction=finishCurrent&failureEmails=&successEmails=?ifyFailureFirst=false?ifyFailureLast=false&concurrentOption=ignore
?
用戶提交任務后,發送任務的詳情到服務器中,Azkaban客戶端會對任務以及用戶的信息進行校驗,封裝后首先將執行的信息(任務,時間,用戶等)存入數據庫中(表active_executing_flows),之后執行dispatch方法,對需要執行的任務流進行調度。
在dispatch方法中,首先會更新executions_flows表,然后將操作的語句發送到指定的ip和端口進行執行。
?
2.服務器接收到了請求:如果是執行操作那么接收到的action的type為execute。接著服務器會從數據庫中獲取相應的工作流flow,服務器將flow封裝成FlowRunner。
FlowRunner的屬性
?
| ExecutorService | 線程池對象 |
| ExecId | 從數據庫中獲取相應的flow |
| numJobThreads | 默認10個線程 |
| JobTypeManager | 定義Job的插件,有以下幾種插件 ? |
| Set<JobRunner> | 將有向無環圖中的node抽象成一個JobRunner進行運行 |
其中任務的執行是使用一個遞歸操作runReadyJob(),循環操作其中的node,也就是每個JobRunner。
JobRunner的主要屬性:
| Job | 執行任務的父類接口。 |
| JobtypeManager | 根據輸入的type類型返回此節點需要執行的任務類型 |
| JobId | 唯一標識符 |
| 配置文件,Job的路徑,監控FlowWatch..... | |
?
其中會根據需要操作的Flow來定義Job的type。返回相應的類型。例如MR ?返回的是JavaProcessJob。
也就是說:每一個節點,是通過新建一個進程去運行。在這個進程中會執行多條command,通過process.run(),運行用戶定義的job。
PS.每條command都需要重新建立一個process。
?
1.2 ?Oozie工作流程
在Oozie中,用戶需要準備以下文件:
| Job.properties | Job文件存儲HDFS,ResourceManager的配置 |
| Workflow.xml | 配置每個節點之間的依賴關系 |
| Lib | 存放著指定運行jar的關聯包 |
| .jar | 運行的jar包 |
?
用戶需要將這些文件放置在一個文件夾下,然后上傳至HDFS中。在客戶端或者終端中發送請求去執行。
源碼執行流程:
使用控制行操作:
1.首先調用:org.apache.oozie.cli.OozieCLI。首先根據不同的command類型調用不同的發送請求,例如使用MRCommand
?
在這個方法中會生成一個Client去Submit指定的Properties(根據Client和Command生成)。提交的對象是HTTPJobSubmit。調用該對象中的call方法和Server進行通信。最終返回一個jobId。METHOD.POST
?
2.服務器端:首先調用相應的Servlet,調用提交作業方法,生成一個DAG圖(DAGEngine,然后所有的操作都是基于DAG來實現的)。
A.如果我們在提交一個作業時生成了jobType那么,此時會選定不同的提交類型(類似于一個工廠模式),返回指定的信息。
B.首先它會調用SubmitXCommand.call()方法,將job的信息加入數據庫中并且返回一個jobId。
C.之后執行start(jobId)的方法,調用Xcommand.call()方法,生意Instrument對任務進行監控,在這個方法中會調用一個SignalXCommand.execute()方法。
在Oozie的后端中會維護一個異步隊列,在上述的execute中會根據job中的每一個action的類型,去生成相應的Command加入異步隊列中。類型如下:
| skipAction | SignXcommand |
| startAction | ActionStartXCommand |
| ForkAction | ActionStartXCommand?和上面的jobType不同 |
類似還有killActionXCommand,workflowNotifyActionXCommand等
PS如果是MR?或者?Spark ?映射ActionStartXCommand?類型。
D.在后端異步隊列CallableQueueService中。(在這個方法中使用Instrument對Java進行進行監控)。會調用這些XCommand的execute方法,不同的類型會實例化不同的executor,例如MR?和?Spark都會實例化JavaActionExecutor(同時還有SubWorkflowActionExecutor執行提交任務)。
E.在上述對象的execute方法中會根據配置生成JobClient,來獲取正在運行的Running Job的信息以及提交Job ?SubmitJob,返回一個jobId。如果獲取正在運行的runningJob在這個對象中還有job.trackerUrl也就是任務的日志。可以供以后展示。
?
?
測試用例提交流程:
?
看測試用例提交Hadoop作業中,首先對連接進行驗證,然后每次提交會生成一個JobClient,該Oozie作為一個Client給Hadoop服務器發送操作job的請求。
?
其中操作hive hadoop spark?作業均是JavaActionExecutor,該執行器中會調用submitLauncher提交Hadoop作業。
1.3 ?小結
Azkaban的工作流運行是依靠操作進程來提交不同的命令的,它操作任務成功和失敗的信息在于進程的相應,但是這并不能有效的管理任務的成功與失敗。
Oozie?執行MR?任務是依靠Hadoop的Jar包,以Server作為Client發送請求至集群進行操作。在此之前需要將任務所依賴執行的jar包上傳至HDFS中才可執行。
通過了解Oozie和Azkaban的執行過程,個人任務使用Oozie作為底層的流程引擎比較合適,因為通過JobClient可以有效的監控正在執行的任務,獲取任務的信息,如果使用Azkaban則只能獲取進程執行的詳情。
?
二.workflow.xml配置工作流流程
在Oozie中每個工作流有不同的狀態,具體如下:
| PERP | 工作流已經被定義但是沒有執行 |
| RUNNING | 當一個工作流開始執行。它不會達到結束的狀態只會出錯結束或者掛起 |
| SUSPENDED | 工作流給掛起狀態從RUNNING狀態過來 |
| SUCCESSED | 工作流到達END節點 |
| KILLED | 工作流處于RUNNING或SUSPENDED狀態被殺死 |
| FAILED | 工作流遇到錯誤停止 |
?
工作流節點有以下幾種類型:
控制流節點:控制工作流開始和結束以及控制執行的路徑
| Start | <start?to="[NODE-NAME]"?/> 第一個執行的節點 |
| End | <end?name="[NODE-NAME]"?/> 執行到該節點任務成功,一個工作流只能有一個end |
| Kill | <kill?name="[NODE-NAME]"> <message>[MESSAGE-TO-LOG]</message> </kill> 被殺死節點的名稱和備注,達到該節點時,任務狀態為KILLED |
| Decision | <decision?name="[NODE-NAME]"> <switch> <case?to="[NODE_NAME]">[PREDICATE]</case> <default?to="[NODE_NAME]"?/> </switch> </decision>?工作流執行到此處時會根據條件進行判斷,滿足條件的路徑將被執行 |
| Fork | <fork?name="[FORK-NODE-NAME]"> <path?start="[NODE-NAME]"?/>... </fork>?多個并發路徑 |
| Join | <join?name="[JOIN-NODE-NAME]"?to="[NODE-NAME]"?/> Fork的多條路徑會在Join處匯合,只有所有路徑都到了,才會執行join. |
?
動作類型節點:能夠觸發一個計算任務或者處理任務執行的節點。該類節點有以下的基本特性:
1.異步:Oozie會啟動一個異步隊列來執行某個工作流job,并通過回調機制以及輪詢來獲取任務的執行狀態.
2.節點要么成功要么失敗。
3.一個任務如果在某個節點失敗了,那么Oozie提供一套恢復運行的策略,如果是狀態轉移失敗,那么自動運行,否則需手動運行。
動作類節點主要有以下幾大類:
| MR | <action?name="[NODE-NAME]"> <map-reduce>...啟動一個MRJOB的執行,并且可以配置其中的其他任務,如streaming,pipes,file,archive |
| Hive | <hive?xmlns="uri:oozie:hive-action:0.2"> <script>[HIVE-SCRIPT]</script> <param>[PARAM-VALUE]</param> 執行hive查詢sql |
| Sqoop | <sqoop?xmlns="uri:oozie:sqoop-action:0.2"> |
| Pig | 啟動腳本實現Job |
| Fs | <fs> <delete?path='[PATH]'?/> <mkdir?path='[PATH]'?/> <move?source='[SOURCE-PATH]'?target='[TARGET-PATH]'?/> .... </fs>??操作HDFS |
| Java | 在Oozie中Java是有main方法執行的程序,他在服務器中以MR Job進行執行,這個Job只有一個Map程序,需要執行 namenode,jobTracker以及JVM和傳輸給主函數的參數 |
| Sub-workflow | 子流程動作,主流程執行過程中,遇到子流程點執行時,會一直等到子流程執行完后才跳轉到下一個要執行的節點。 |
| Shell | <shell?xmlns="uri:oozie:shell-action:0.2"> <exec>[SHELL-COMMAND]</exec> <argument>[ARGUMENT-VALUE]</argument> 執行shell語句 |
?
三.Oozie根據xml執行job
3.1新建workflow
可以根據hue中的方法進行新建,重寫hue中的editor/workflow/new方法,不過得將python轉java。
3.2執行workflow
參考oozie中的提交作業的流程,看下操作的主要對象的屬性信息:
WorkflowJobBean
| startTimestamp | 開始時間 |
| endTimestamp | 結束時間 |
| app_path | jar包位置 |
| Conf | 配置文件的信息BLOB二進制大文件 |
| Actions | List<WorkflowActionBean>?一系列的執行節點 |
| 等等。。 | |
?
這個是一個任務的基本屬性,主要包含了一堆actions節點和conf配置文件。在提交代碼的過程中,以MR?作業為例:
首先,在提交的過程中,會將用戶的任務信息封裝成一個workFlowJob以及workflowInstance(job的狀態,執行路徑等)并判斷job的行為狀態。
然后,對這個Job中的每個action進行遍歷,判斷action屬于哪種類型,然后放入后端的異步隊列中。
異步隊列會執行其中每一個action,執行時生成一個executor,這個執行器在操作的過程中會根據每一個action的xml文件生成org.apache.hadoop.conf.configuration actionConfig對象,循環遍歷每個action的節點xml映射去填充這個對象的屬性。
最終根據actionConf生成一個jobClient,發送用戶的請求。
?
四.如何運行Spark作業
4.1 Oozie
在Oozie中對spark作業的執行有其自定義的一套執行器----sparkActionExecutor,這個執行器繼承了JavaActionExecutor。
在這個執行器中,主要作用是定義好spark作業的配置信息以及在生成Client的時候定義的Configuration actionConfig對象的初始化。
也就是說,spark對象會根據不同的配置初始化相應的JobClient用于發送spark任務jar包,其具體的流程和HadoopActionExecutor相似,都是調用JavaActionExecutor的execute()方法。
4.2 Azkaban
Azkaban的底層是將命令封裝成一個進程進行執行,在這個過程中我們可以自定義相關命令。發送jar包進行執行。
4.3?小結
如果從操作的角度上來說,那么Azkaban直接上傳jar包然后執行,其過程更為簡易,并且用戶操作相對于Oozie來說更為簡單,困難在于,不能直接將所需要操作的shell語句編寫入口提供給用戶。需要根據WEB UI的返回值,生成操作命令。
Oozie的配置相對于復雜,但是它已經提供了一套相對于比較完整的WEB?頁面接口以及HUE中配置workflow.xml的代碼。困難點在于將用戶編寫的操作流程以xml形式形象的展示出來。
?
五.Oozie和Azkaban如何判斷任務是否完成
5.1 Oozie判斷任務是否完成
如果任務正在運行的過程中,那么當前這個任務會被存儲在數據庫中,并且狀態標記為RUNNING。當任務在執行的過程中,如果不出錯且不出現掛起的狀態,則任務狀態不會變化。
當任務操作結束后(無論錯誤還是成功執行完成),Oozie會操作回調接口,具體操作流程如下:
1.生成CompletedActionXCommand,封裝當前action的信息。
2.在這個對象的execute方法中,如果當前action的狀態為PREP,則將繼續輪詢,會將輪詢的命令加入執行的異步隊列中,并設置相應的延時執行。
3.如果任務正處于RUNNING中,那么會在異步隊列中加入ActionCheckXCommand對象,在這其中例如使用MR,則會生成JavaActionExecutor?類型的執行器。
4.執行這個執行器中的check方法。根據jobId生成jobClient獲取HADOOP中正在運行的RUNNING JOB?。
5.如果job.isComplete(),會判斷任務是否結束。結束是否運行成功,有相應接口。(判斷成功與否包括org.apache.hadoop.mapred.Counters)代碼位于:
/oozie/action/hadoop/LauncherMapperHelper/isMainSucessful。成功返回SUCCESSED,失敗FAILED
6.如果任務未結束,則任務設置為RUNNING。
最終每次jobClient查詢結束需要close()。Oozie會將每次運行的狀態信息存儲于數據庫中。
?
5.2Azkaban判斷任務是否完成
當Azkaban在提交任務之后會在Client運行一個Process,不斷的向Server發送查詢請求。發送的請求:
/executor?execid=55&ajax=fetchexecflowupdate&lastUpdateTime=1470735951842
在Server中Azkaban會維護一個ConcurrentHashMap存儲著執行的flow。這個hashmap是放在內存中的。由于Azkaban操作的顆粒度是進程,進程的執行成功或者失敗都會影響這個hashmap。
但是進程的執行結果無法直接反應任務是否執行成功。
六.總結
綜上述的幾點對比Oozie以及Azkaban,個人覺得選擇Oozie作為流程引擎的選型比較好,理由如下:
1.Oozie是基于Hadoop系統進行操作,而Azkaban是基于命令行進行操作。使用hadoop提供的第三方包JobClient比直接在底層跑shell命令開發成本小,可能遇到的坑也少(一個是基于平臺,一個是基于系統)。
2.Oozie的操作是放在Hadoop中,而Azkaban的運行是服務器運行shell命令。為保證服務器的穩定,使用Oozie靠譜點。
3.Ooize提供查詢任務執行狀態,Azkaban查詢的是進程執行的結果,如果某進程執行的shell命令出錯,其進程仍展示位成功,混淆了任務輸出。
4.Oozie將任務執行的狀態持久化到數據庫中,Azkaban將任務的狀態存儲在服務器內存中,如果掉電,則Azkaban會丟失任務信息。
5.Ooize中定義的action類型更為豐富,而Azkaban中的依賴較為簡單,當面對復雜的邏輯時Oozie執行的比較順暢(網上說的,但是沒有實踐的數據。。。)。
?
以Oozie作為流程引擎的難點:
1.定義workflow.xml的過程,需要保證有效的完成用戶的邏輯且運行的過程中job不出錯。
2.部署有點麻煩。
3.學習的成本會略高。
轉載于:https://www.cnblogs.com/davidwang456/articles/9511326.html
總結
以上是生活随笔為你收集整理的Oozie和Azkaban的技术选型和对比的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Solr之搭建Solr6.0服务并从My
- 下一篇: MaxCompute+ Geabase