rcp rapido_为什么气流非常适合Rapido
rcp rapido
Back in 2019, when we were building our data platform, we started building the data platform with Hadoop 2.8 and Apache Hive, managing our own HDFS. The need for managing workflows whether it’s data pipelines, i.e. ETL’s, machine learning predictive, and general generic pipelines was a core requirement where each task is different.
在2019年其他回,當(dāng)我們建設(shè)我們的數(shù)據(jù)平臺(tái),我們開始用Hadoop 2.8和Apache蜂巢建設(shè)數(shù)據(jù)平臺(tái),管理我們自己的HDFS。 無論是數(shù)據(jù)管道(即ETL,機(jī)器學(xué)習(xí)預(yù)測性管道還是通用管道),都需要管理工作流,這是每個(gè)任務(wù)都不同的核心要求。
Some just schedule a task on GCP like creating a Cloud Run an instance or Dataproc Spark cluster at a scheduled time and some tasks are different because they can only be scheduled based on data availability or Hive partition availability.
有些僅在GCP上安排任務(wù),例如在創(chuàng)建Cloud時(shí)運(yùn)行實(shí)例或Dataproc Spark集群,而有些任務(wù)則不同,因?yàn)樗鼈冎荒芑跀?shù)據(jù)可用性或Hive分區(qū)可用性進(jìn)行計(jì)劃。
To sum up, the requirements at that time in the data platform which required a scheduling system were like:
綜上所述,當(dāng)時(shí)需要調(diào)度系統(tǒng)的數(shù)據(jù)平臺(tái)需求如下:
- ETL pipelines ETL管道
- Machine learning workflows 機(jī)器學(xué)習(xí)工作流程
- Maintenance: Database backups 維護(hù):數(shù)據(jù)庫備份
API Calls: Example can be Kafka Connect connectors management
API調(diào)用:示例可以是Kafka Connect連接 器管理
Run naive cron based jobs where the task is to spin up some infra in a public cloud, for example, to spin up a new cluster or scale-up existing Dataproc cluster, stop the instances or scale the Cloud Run, etc..
運(yùn)行基于天真的cron的作業(yè),其中的任務(wù)是在公共云中啟動(dòng)一些基礎(chǔ)架構(gòu),例如,啟動(dòng)新集群或擴(kuò)展現(xiàn)有Dataproc集群,停止實(shí)例或擴(kuò)展Cloud Run等。
- Deployments: Git -> Code deployments 部署:Git->代碼部署
ETL pipelines consist of a complex network of dependencies which are not just data-dependent, and these dependencies can vary based on the use cases metrics, creating canonical forms of datasets, model training
ETL管道由復(fù)雜的依賴關(guān)系網(wǎng)絡(luò)組成,這些依賴關(guān)系不僅僅依賴于數(shù)據(jù),而且這些依賴關(guān)系可以根據(jù)用例指標(biāo),創(chuàng)建數(shù)據(jù)集的規(guī)范形式,模型訓(xùn)練而變化
To create immutable datasets (no update, upsert, or delete) we started with a stack of Apache Hive, Apache Hadoop (HDFS), Apache Druid, Apache Kafka, and Apache Spark with the following requirements or goals:
為了創(chuàng)建不可變的數(shù)據(jù)集(不進(jìn)行更新,更新或刪除),我們從滿足以下要求或目標(biāo)的Apache Hive , Apache Hadoop ( HDFS ), Apache Druid , Apache Kafka和Apache Spark堆棧開始:
Creating reproducible pipelines, i.e. Pipelines output need to be deterministic like with Functional programming if there is retry or we retrigger the tasks the outcome should be same, i.e. Pipelines and tasks need to be idempotent
創(chuàng)建可重現(xiàn)的管道,例如,如果有重試,則管道輸出必須像函數(shù)式編程一樣具有確定性;如果重試或重新觸發(fā)任務(wù), 結(jié)果應(yīng)該是相同的,即管道和任務(wù)必須是冪等的
Backfilling is a must since data can evolve.
因?yàn)閿?shù)據(jù)可以發(fā)展,所以回填是必須的。
Robust — Easy changes to the configuration
堅(jiān)固耐用 —輕松更改配置
Versioning of configuration, data, and tasks, i.e. easily add or remove new tasks over time or update the existing dags code
配置,數(shù)據(jù)和任務(wù)的版本控制 ,即隨時(shí)間輕松添加或刪除新任務(wù),或更新現(xiàn)有的dags代碼
Transparency with data flow: we discussed that something similar to Jaeger Tracing for data platform would be tremendous and checked at possible options like Atlas and Falcon
數(shù)據(jù)流的透明度 :我們討論過,類似于Jaeger Tracing的數(shù)據(jù)平臺(tái)將是巨大的,并在Atlas和Falcon等可能的選項(xiàng)中進(jìn)行了檢查
Cloud Scheduler之旅(托管cron) (Journey with Cloud Scheduler(Managed cron))
We started using Cloud Scheduler with shell scripts, and python scripts since it was fully managed by google cloud, and setting up cron jobs is just a few clicks. Cloud Scheduler is a fully managed cron job scheduler.
我們開始將Cloud Scheduler與Shell腳本和python腳本一起使用,因?yàn)樗耆蒅oogle Cloud管理,并且只需單擊幾下即可設(shè)置cron作業(yè)。 Cloud Scheduler是完全托管的cron作業(yè)調(diào)度程序。
The main reason to go with Cloud Scheduler was unlike the self-managed cron instance, there is no single point of failure, and it’s designed to provide “at least once” delivery on jobs from cron tasks to automating resource creation like we used to run jobs which were creating Virtual Machine’s, Cloud Run, etc.
使用Cloud Scheduler的主要原因與自我管理的cron實(shí)例不同, 沒有單點(diǎn)故障 ,它旨在為從cron任務(wù)到自動(dòng)化資源創(chuàng)建的作業(yè)提供“ 至少一次”交付,就像我們以前運(yùn)行的那樣創(chuàng)建虛擬機(jī),Cloud Run等的作業(yè)。
Cloud Scheduler Web UICloud Scheduler Web UI Cloud Scheduler Create Job via UICloud Scheduler通過UI創(chuàng)建作業(yè)Cloud scheduler or cron doesn’t offer dependency management, so we have to “hope dependent pipelines finished in the correct order”. Had to scratch from the start for each pipeline or task (starting from blank for each pipeline won’t scale), though cloud scheduler has timezone supported we faced few timezone problems in druid ingestions and subsequent dependent tasks since the jobs were submitted manually via UI brings it can introduce human errors in pipelines, Cloud scheduler can also retry in case of failure to reduce manual toil and intervention, but there is no task dependency and managing complex workflows or backfilling was not available out of the box. So in few weeks, we decided that this may not be suitable for running data and ML pipelines since these involve lof of backfilling requirements, also cross DAG dependencies, and also may require data sharing between tasks.
云調(diào)度程序或cron不提供依賴項(xiàng)管理,因此我們必須“希望以正確的順序完成依賴的管道”。 盡管云調(diào)度程序已支持時(shí)區(qū),但由于調(diào)度程序是通過UI手動(dòng)提交的,因此在德魯伊和后續(xù)依賴任務(wù)中幾乎沒有時(shí)區(qū)問題 ,盡管云調(diào)度程序已支持時(shí)區(qū),但必須從頭開始為每個(gè)管道或任務(wù)從頭開始(每個(gè)管道的空白都不會(huì)擴(kuò)展)。帶來的好處是可以在管道中引入人為錯(cuò)誤,Cloud Scheduler還可以在失敗的情況下重試,以減少人工勞動(dòng)和干預(yù),但是它沒有任務(wù)依賴性 ,管理復(fù)雜的工作流程或開箱即用也無法回填 。 因此,在幾周內(nèi),我們認(rèn)為這可能不適合運(yùn)行數(shù)據(jù)和ML管道,因?yàn)樗鼈兩婕盎靥钜蟮牟蛔?#xff0c;還涉及DAG依賴性,并且還可能需要任務(wù)之間進(jìn)行數(shù)據(jù)共享。
Then, we started looking into the popular open-source workflow management platforms which can handle 100’s of task with failures and callback strategies and tried to code a few tasks, and deploy them in GCP to complete the POC. Projects which were considered were Apache Airflow, Apache Oozie, Luigi, Argo, and Azkaban.
然后,我們開始研究流行的開源工作流管理平臺(tái) ,該平臺(tái)可處理帶有故障和回調(diào)策略的 100多個(gè)任務(wù),并嘗試編寫一些任務(wù),并將其部署在GCP中以完成POC。 被考慮的項(xiàng)目是Apache Airflow , Apache Oozie , Luigi , Argo和Azkaban 。
Both Apache Oozie and Azkaban were top projects at that time with the stable release. Oozie is a reliable workflow scheduler system to manage Apache Hadoop jobs. Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop, and Distcp) as well as system-specific tasks (such as Java programs and shell scripts).
穩(wěn)定版發(fā)布時(shí),Apache Oozie和Azkaban都是當(dāng)時(shí)的頂級項(xiàng)目。 Oozie是一個(gè)可靠的工作流計(jì)劃程序系統(tǒng),用于管理Apache Hadoop作業(yè)。 Oozie與其余Hadoop堆棧集成在一起,支持開箱即用的幾種類型的Hadoop作業(yè)(例如Java map-reduce ,Streaming map-reduce,Pig,Hive,Sqoop和Distcp)以及特定于系統(tǒng)的任務(wù)(例如Java程序和Shell腳本)。
Apache Oozie Job BrowserApache Oozie作業(yè)瀏覽器Still, with Oozie, we had to deal with XML definitions or had to zip a directory which contains the task-related dependencies, development workflow wasn’t as convincing as Airflow. Instead of managing multiple directories of XML configs and worrying about the dependencies between directories, the option to write python code can be tested, and since it’s a code all the software best practices can be applied to it.
仍然,對于Oozie,我們必須處理XML定義或壓縮包含與任務(wù)相關(guān)的依賴關(guān)系的目錄,開發(fā)工作流程并不像Airflow那樣令人信服。 無需管理XML配置的多個(gè)目錄并擔(dān)心目錄之間的依賴關(guān)系,而是可以測試編寫python代碼的選項(xiàng),并且由于它是一種代碼,因此可以將所有軟件最佳實(shí)踐應(yīng)用于該代碼。
Azkaban Flow阿茲卡班流Azkaban has distributed multiple-executor mode and beautiful UI visualizations, option to retry failed jobs, good alerting options are available, can track user actions, i.e. auditing was available, but since the workflows are defined using property files finally, we didn’t consider this option.
Azkaban已分發(fā)了多個(gè)執(zhí)行程序模式和精美的UI可視化效果,可以選擇重試失敗的作業(yè),可以使用良好的警報(bào)選項(xiàng),可以跟蹤用戶操作,即可以進(jìn)行審核,但是由于最終使用屬性文件定義了工作流程,因此我們沒有考慮此選項(xiàng)。
Luigi was promising since the deployment in Kubernetes was so simple, and it handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
Luigi很有前途,因?yàn)樵贙ubernetes中的部署是如此簡單,它可以處理依賴關(guān)系解析,工作流管理,可視化,處理故障,命令行集成等。
Luigi Task Status路易吉任務(wù)狀態(tài)But re-running old tasks or pipelines was not clear, i.e. No option to retrigger the tasks. Since it uses the cron feature to schedule we have to wait for Luigi scheduler to schedule it again after updating the code where Airflow has its own scheduler hence we can retrigger the dag using Airflow CLI or UI.
但是重新運(yùn)行舊任務(wù)或管道并不清楚,即沒有重新觸發(fā)任務(wù)的選項(xiàng)。 由于它使用cron功能進(jìn)行計(jì)劃,因此我們必須等待Luigi計(jì)劃程序在更新代碼后重新安排它,其中Airflow擁有自己的計(jì)劃程序,因此我們可以使用Airflow CLI或UI重新觸發(fā)dag。
Luigi being cron scheduler scaling seem difficult whereas in Airflow the Kubernetes executor was very promising. Creating a task was not as simple as Airflow, also maintaining the dag is a bit difficult as there were no resources for dag versioning.
Luigi作為cron調(diào)度程序的擴(kuò)展似乎很困難,而在Airflow中,Kubernetes執(zhí)行器非常有前途。 創(chuàng)建任務(wù)并不像Airflow那樣簡單,而且維護(hù)dag有點(diǎn)困難,因?yàn)闆]有用于dag版本控制的資源。
Comparison Between Luigi, Airflow & Oozie on the basis of FeaturesLuigi,Airflow和Oozie之間基于功能的比較Finally, it was down to Airflow and Argo:
最后,歸結(jié)為Airflow和Argo :
- Both are designed for batch workflows involving the directed Acyclic Graph (DAG) of tasks. 兩者都是為涉及任務(wù)的有向無環(huán)圖(DAG)的批處理工作流而設(shè)計(jì)的。
- Both provide flow control for error handling and conditional logic based on the output of upstream steps. 兩者都基于上游步驟的輸出提供錯(cuò)誤處理和條件邏輯的流控制。
- Both have a great community and actively maintained by a community of contributors. 兩者都有一個(gè)偉大的社區(qū),并由貢獻(xiàn)者社區(qū)積極維護(hù)。
為什么選擇Airflow而不是Argo? (Why choose Airflow over Argo?)
But few main points at the time of decision were Airflow is tightly coupled to the Python ecosystem, and it’s all about dag code. At the same time, Argo provides flexibility to schedule steps which is very useful as anything which can run in the container may be used with Argo. Still, the problem is a longer development time since we will have to prepare each task’s docker container and push to Google Container Registry, which is our private Docker repository via CI/CD.
但是在做出決定時(shí),很少有要點(diǎn)是Airflow與Python生態(tài)系統(tǒng)緊密相關(guān),而這全都是關(guān)于dag代碼的。 同時(shí), Argo可以靈活地安排步驟,這非常有用, 因?yàn)榭梢栽谌萜髦羞\(yùn)行的任何內(nèi)容都可以與Argo一起使用。 盡管如此,問題仍然是更長的開發(fā) 時(shí)間,因?yàn)槲覀儗⒉坏貌粶?zhǔn)備每個(gè)任務(wù)的docker容器并通過CI / CD推送到Google Container Registry ,這是我們的私有Docker存儲(chǔ)庫 。
Argo Workflow UIArgo工作流程用戶界面Argo natively schedules steps to run in a Kubernetes cluster, potentially across several hosts or nodes. Airflow also has K8 Pod Operator and Kubernetes Executor which sounded exciting since it will create a new pod for every task instance and no need worry about scaling celery pods
Argo本地計(jì)劃在Kubernetes集群中運(yùn)行的步驟 ,可能跨多個(gè)主機(jī)或節(jié)點(diǎn)運(yùn)行。 Airflow還具有K8 Pod Operator和Kubernetes Executor ,這聽起來很令人興奮,因?yàn)樗鼘槊總€(gè)任務(wù)實(shí)例創(chuàng)建一個(gè)新的Pod,而無需擔(dān)心縮放芹菜Pod
Airflow and Argo CLI are equally good, Airflow DAGs are expressed in a Python-based DSL, while Argo DAGs are expressed in a K8s YAML syntax with docker containers packing all the task code.
Airflow和Argo CLI都一樣好,Airflow DAG用基于Python的DSL表示,而Argo DAG用K8s YAML語法表示,并且Docker容器包裝了所有任務(wù)代碼。
Airflow DAG — ETL job which runs Spark job and updates the Hive tableAirflow DAG —運(yùn)行Spark作業(yè)并更新Hive表的ETL作業(yè)Airflow has a colossal adoption; hence there is a massive list of “Operators” and “Hooks” with support for other runtimes like Bash, Spark, Hive, Druid, Pinot, etc. Hence, Airflow was the clear winner.
氣流的采用非常廣泛; 因此,有大量的“ 操作員”和“ 鉤子”列表支持其他運(yùn)行時(shí),例如Bash,Spark,Hive,Druid,Pinot等。因此,Airflow無疑是贏家。
To sum up, Airflow provides:
綜上所述,Airflow提供:
Reliability: Airflow provides retries, i.e. can handle task failures by retrying it, that is if upstream dependencies succeed, then downstream tasks can retry if things fail.
可靠性 :Airflow提供重試功能,即可以通過重試來處理任務(wù)失敗,也就是說,如果上游依賴項(xiàng)成功,那么如果事情失敗,則下游任務(wù)可以重試。
Alerting: Airflow can report if dag failed or if dag didn’t meet an SLA and inform on any failure.
警報(bào) :Airflow可以報(bào)告dag是否失敗或dag不符合SLA,并通知任何失敗。
Priority-based queue management which ensures the most critical tasks are completed first
基于優(yōu)先級的隊(duì)列管理 ,可確保首先完成最關(guān)鍵的任務(wù)
Resource pools can be used to limit the execution of parallelism on arbitrary sets of tasks.
資源池可用于限制并行執(zhí)行任意任務(wù)集。
Centralized configuration
集中配置
Centralized metrics of tasks
集中的任務(wù)指標(biāo)
Centralized Performace Views: With views like Gantt we can look at the actual performance of the dag and check if this specific dag has spent five or ten minutes waiting for some data to land in then once data arrived it trigger the spark job which might do some aggregation on that data. So these Views help us to analyze the performance over time.
集中的Performace視圖 :使用類似Gantt的視圖,我們可以查看dag的實(shí)際性能,并檢查此特定dag是否花了五到十分鐘等待一些數(shù)據(jù)進(jìn)入,一旦數(shù)據(jù)到達(dá),它就會(huì)觸發(fā)火花作業(yè),這可能會(huì)做一些對該數(shù)據(jù)進(jìn)行匯總。 因此,這些視圖可以幫助我們分析一段時(shí)間內(nèi)的效果。
Future of Airflow:
氣流的未來:
Since we already have 100s of dags running in production and with Fluidity(inhouse airflow dag generator) we expect the number of dags to grow by twice or thrice in the next few months itself, one of the most-watched features from Airflow 2.0 is the separation of the DAG parsing from DAG scheduling which can reduce the amount of time(time where no tasks are running) wasted in waiting and reduce the task time via fast follow of airflow tasks from workers.
由于我們已經(jīng)有100多個(gè)dag在生產(chǎn)中運(yùn)行,并且使用Fluidity(內(nèi)部氣流dag發(fā)生器),我們預(yù)計(jì)在接下來的幾個(gè)月中dag的數(shù)量將增長兩倍或三倍,因此Airflow 2.0最受關(guān)注的功能之一是DAG解析與DAG調(diào)度的分離 ,可以減少等待中浪費(fèi)的時(shí)間(無任務(wù)運(yùn)行的時(shí)間),并通過快速跟蹤工人的氣流任務(wù)來減少任務(wù)時(shí)間 。
Improving the DAG versioning to avoid manual creation of versioned dag [i.e. to add new task we go from DAG_SCALE_UP to DAG_SCALE_UP_V1]
改進(jìn)DAG版本控制,以避免手動(dòng)創(chuàng)建版本控制的數(shù)據(jù)[例如,添加新任務(wù),我們從DAG_SCALE_UP轉(zhuǎn)到DAG_SCALE_UP_V1]
High availability of scheduler for performance scalability and resiliency reasons is most with Active-Active models.
出于性能可伸縮性和彈性的原因,調(diào)度程序的高可用性在Active-Active模型中最為常見。
The development speed of Airflow is generally slow, and it involves a steep learning curve, so there is Fluidity(full blog coming soon), and the same dev replica exactly like the production environment using Docker and Minikube is spawned.
Airflow的開發(fā)速度通常很慢,并且學(xué)習(xí)曲線陡峭,因此存在Fluidity(即將推出完整博客),并且產(chǎn)生了與使用Docker和Minikube的生產(chǎn)環(huán)境完全相同的相同dev副本。
Work on data evaluation, reports and data lineage with Airflow
使用Airflow進(jìn)行數(shù)據(jù)評估,報(bào)告和數(shù)據(jù)沿襲
If you enjoyed this blog post, check out what we’ve posted so far over here, and keep an eye out on the same space for some really cool upcoming blogs in the near future. If you have any questions about the problems we face as Data Platform at Rapido or about anything else, please reach out to chethan@rapido.bike, looking forward to answering any questions!
如果您喜歡這篇博客文章,請查看我們到目前為止在這里發(fā)布的內(nèi)容,并在不久的將來留意相同的空間來關(guān)注一些即將發(fā)布的非常酷的博客。 如果您對我們在Rapido上使用數(shù)據(jù)平臺(tái)時(shí)遇到的問題或其他任何問題有任何疑問,請聯(lián)系chethan@rapido.bike ,期待回答任何問題!
*Special Thanks to the Rapido Data Team for making this blog possible.
*特別感謝Rapido數(shù)據(jù)團(tuán)隊(duì)使此博客成為可能。
翻譯自: https://medium.com/rapido-labs/why-is-airflow-a-great-fit-for-rapido-d8438ca0d1ab
rcp rapido
總結(jié)
以上是生活随笔為你收集整理的rcp rapido_为什么气流非常适合Rapido的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到头发秃了一块是什么预兆
- 下一篇: 算法组合 优化算法_算法交易简化了风险价