一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing
Abstract
彈性非常適用于流系統,以保證針對工作負載動態的低延遲,例如到達率的激增和數據分布的波動。現有系統使用以resource-centric的方法實現彈性,該方法在并行實例(即執行程序)之間重新分配Key,以平衡工作負載和擴展Operator。然而,這種Operator級別的重新分區需要全局同步并且禁止快速彈性。我們提出了一種以executor-centric的方法,它避免了Operator級別的Key數據重新分區,并將執行程序作為彈性的構建塊。通過這種新方法,我們設計了具有兩級優化的Elasticutor框架:i)執行器的新穎實現,即elastic executors,通過有效的executror內負載均衡和executor擴展來執行彈性多核執行,以及ii)a基于全局模型的調度程序,可根據瞬時工作負載為執行程序動態分配CPU內核。我們實施了一個原型Elasticutor并進行了大量實驗。我們表明,與實際應用程序的動態工作負載方法相比,Elasticutor的吞吐量增加了一倍,延遲降低了兩個數量級。
1. Introduction
?????? 分布式流系統[8,12,40,43,45,50,51]實現了對連續流的實時數據處理,并已廣泛用于欺詐檢測,監控分析和定量融資等應用。 在這樣的系統中,應用程序邏輯被建模為計算圖,其中每個頂點表示與用戶定義的處理邏輯相關聯的Operator,并且每個邊指定運算符之間的數據流的輸入-輸出關系。 為了實現大規模數據處理,輸入數據流通常被定義為通過key 空間分區到下游子分區中。并行執行實例(即執行程序)以將每個key的子空間靜態綁定到一定量的計算資源,通常是CPU核心。 結果,每個執行器可以獨立地進行與其key SubSpace相關聯的計算。
?????? 然而,在股票交易和視頻分析等實際應用中,工作量隨時間波動很大,導致嚴重的性能下降[15,39]。 從時間角度來看,發送到Operator的總工作量可能會在短時間內顯著激增,例如10秒,這使得Operator成為整個處理流程的瓶頸。 從空間角度來看,Key空間上的工作負載分布可能不穩定,導致執行程序中的工作負載偏差,其中一些CPU利用率較低,而另一些則過載。 為了適應工作負載波動,先前的工作[14,15,39,41]提出了實現彈性的解決方案,即Operator擴展和負載平衡。 所有這些解決方案都以資源為中心,因為執行程序受特定資源的約束,并且通過跨執行程序動態地重新分配Key來實現彈性。
?????? 圖1(a)說明了由于工作負載分配不平衡而導致執行程序過載的情況。為了減輕性能瓶頸,重新分區Key空間,以便將重載執行程序中的一定數量的工作負載與相應的Key一起遷移到負載較輕的執行程序。但是,這個過程需要一個耗時的協議[15,39]來維持狀態一致性。特別是,系統需要執行以下操作:(a)阻止上游執行程序向下游發送元組; (b)等待所有飛行中的元組進行處理; (c)根據新的Key空間劃分,將狀態遷移到新的key space分區匯總; (d)更新上游執行者的路由表;最后(e)恢復向下游發送元組的上游執行者。由于Operator間路由更新和執行器間狀態遷移都需要昂貴的全局同步,因此Key Space重新分配可能持續數秒,在此期間無法處理新的傳入元組并導致嚴重延遲。
?????? 為了實現快速彈性,我們提出了一種以執行者為中心的范式。 核心思想是在執行程序之間靜態劃分Operator的Key Space,但根據其瞬時工作負載動態地為每個執行程序分配CPU核心。 圖1(b)說明了新方法不是對Key Space進行重新分區,而是通過將CPU內核從較輕負載的執行程序重新分配給過載的執行程序來平衡工作負載。 由于每個執行器擁有固定Key SubSpace,新方法實現了Operator間獨立性,即上游Operator不需要與下游Operator同步,并且執行器間獨立性,即與Key SubSpace相關聯的狀態不需要遷移跨執行者。 換句話說,這種新方法優雅地解耦了Operator Key Space重新分區和計算資源的動態供應之間的綁定。
?????? 基于以執行器為中心的方法,我們設計了具有兩個優化級別的Elasticutor框架。 在執行程序級別,作為輕量級分布式子系統實現,每個彈性執行程序在其分配的CPU核心上均勻分配其工作負載,并在調度程序分配/取消分配CPU核心時快速擴展。 在整體層面,基于模型的動態調度程序設計用于根據測量的性能指標Metrics優化核心到執行程序的分配,以便以最小的狀態遷移開銷和最大的計算局部性來適應工作負載動態。 我們實現了Elasticutor的原型,并使用合成和真實數據集進行了大量的實驗。 結果表明,Elasticutor使吞吐量翻倍,并且比現有方法實現了更低的延遲。
?????? 本文的其余部分安排如下。 第2節介紹了以執行者為中心的范例,并概述了Elasticutor框架。 第3節和第4節分別介紹了彈性執行器和動態調度器的設計。 第5節討論了實驗結果。 第6節回顧了相關工作。 第7節總結了論文。
2. PARADIGM AND FRAMEWORK
2.1 Basic Concepts
?????? 我們考慮在由快速網絡設備連接的稱為節點的機器群集上的實時有狀態流處理系統。流是一個無限的序列。來自輸入流的元組連續到達系統并立即處理。用戶應用程序被建模為計算的有向圖,稱為Topology,其中頂點是具有用戶定義的處理邏輯的運算符,并且邊表示運算符之間的處理序列。對于每對相鄰Operator,流的元組由上游Operator生成并由下游Operator消費。在有狀態計算中,Operator維護內部狀態,該狀態用于計算并將在輸入元組的處理期間更新。為了分配和并行化計算,操作符的狀態被實現為在key space上定義的可分割數據結構。系統將key space劃分為sub space,并創建一個稱為執行程序的并行實例,每個實例具有相同的數據處理邏輯。為了保證在這樣的分布式系統上維護的狀態的一致性,需要將元組正確路由到下游執行器。因為以不同順序處理相同的輸入元組序列可能導致不同的輸出元組和狀態,所以有狀態計算中的另一個基本要求是按到達順序處理相同key的元組。
?????? 流處理工作負載通常是動態的,因為對Operator的輸入速率和元組的key分配隨時間波動。為了保證動態工作負載下的性能,應該向Operator適當地提供計算資源,即CPU核心,以便確保1)Operator擴展,即CPU核心根據其工作負載動態地分配給Operator; 2)負載平衡,即每個Operator的工作負載均勻分布在分配的CPU核心上。如果不實現前者,一些Operator可能會過載或過度配置,分別成為性能瓶頸或浪費計算資源。如果不實現后者,一些CPU內核將會過載,而其他CPU內核將未得到充分利用,從而導致性能下降。我們將Operator縮放和負載平衡的機制稱為彈性。為了在動態工作負載下保持高性能,快速彈性是一項至關重要的要求。
2.2 The Executor-Centric Paradigm
?????? 表1總結了現有兩種彈性范式的主要特征:靜態和以資源為中心的方法。 靜態方法使用固定數量的執行程序實現每個Operator,并使用靜態Operator Key Space在執行程序之間分配工作負載。 每個執行程序由綁定到指定CPU核心的單個數據處理線程組成。 由于靜態Key分區和CPU內核與執行器的一對一綁定,靜態方法簡化了系統實現,并在大多數最先進的系統中采用[30,43]。 但是,由于既不能平衡分配的CPU內核的工作負載,也不能調整分配給特定Operator的CPU內核數量,因此這種方法對分區模式非常敏感,并且由于缺乏彈性而在動態工作負載下效率低下。
?????? 以資源為中心的方法通過支持動態Operator級Key分區來解決靜態方法的限制,同時遵循與靜態方法相同的執行程序實現。 憑借Operator級Key重新分區的功能,以資源為中心的方法實現了彈性,因為它可以將一些Key及其相應的工作負載從重載執行程序遷移到負載較輕的執行程序,以平衡工作負載,或從現有執行程序遷移到新創建的執行者以擴展Operator。 但是,如引言部分所述,此Operator級Key重新分區是一個耗時的過程,在此過程中需要昂貴的全局同步來遷移狀態并更新所有上游執行程序的路由表。 因此,以資源為中心的方法不能實現快速彈性,只能解決非常有限的工作量動態。
?????? 為了實現快速彈性,我們提出了一種新的執行范式:以executor-centric的方法。我們的想法來自觀察到Operator級別的Key重新分區太昂貴而無法實現快速彈性。因此,以執行器為中心的方法使用靜態Operator級別的Key分區,但將每個執行程序實現為彈性的構建塊以處理工作負載波動。特別是,每個執行程序都旨在通過動態創建或刪除數據處理線程來利用各種計算資源。因此,為了實現負載平衡和Operator擴展,系統可以為每個彈性執行器動態分配適當數量的CPU內核,而不是執行昂貴的Operator Key重新分區。與Operator Key重新分區相比,可以有效地實現CPU內核的重新分配和內部執行器負載平衡,因為它們不需要任何Operator間或執行器間同步。有趣的是,我們的新方法通過避免全局同步實現了快速彈性。
2.3 Overview of Elasticutor Framework
?????? 遵循以執行者為中心的方法,我們設計了專注于支持有狀態流處理的Elasticutor。 為了處理流系統上的大規模數據,我們假設數據和狀態是在Key Space下定義的,基于哪些分區數據流和狀態可以由分布式計算單元并行處理和維護。 我們假設Key Space足夠細粒度,以便甚至可以在越來越多的計算資源(即CPU核心)上分配和平衡扭曲的工作負載。 對于像Heron,Flink和Samza這樣的其他最先進的流處理系統,還需要這種假設來實現高度并行化的有狀態流處理。
?????? 我們的設計目標是實現實時響應,歸結為保證低延遲。 然而,過度延遲可能是由于較高的數據到達率導致的系統資源不足,或導致工作負載不平衡的低效資源分配和調度。 前者需要資源縮放,而后者則不需要。 基于關注點分離原則,我們將Elasticutor設計為兩級架構,如圖2所示。
?????? 高級調度程序(在第4節中描述)處理可能在一段時間內激增的動態工作負載,在此期間現有系統容量不足并需要擴展。不需要過度配置,但我們假設可以從基于云的平臺按需獲取資源。我們假設工作負載的總體激增不會太頻繁發生,例如,在幾分鐘到幾小時的時間范圍內。動態調度程序確定每個彈性執行程序應在瞬時工作負載下提供的所需CPU核心數。它采用基于排隊網絡的性能模型,并使用彈性執行器的收集性能Metrics指標作為輸入來生成資源分配決策。根據現有的核心到執行器分配和集群中CPU核心的可用性,調度程序改進了分配以適應新的資源分配計劃,同時考慮了CPU重新分配開銷和計算資源的位置。
?????? 每個low-level執行程序(在第3節中描述)被設計為一個輕量級,自包含的分布式子系統,稱為彈性執行程序,負責處理固定key subspace下的輸入。 為適應工作負載波動,彈性執行器可以使用動態數量的CPU核心,可能來自多個節點,由動態調度程序決定。 為了在工作負載波動的情況下充分利用其分配的CPU內核,彈性執行器具有高效的內部負載平衡機制,可以在更短的時間范圍內將分配的CPU內核的輸入流的計算均勻分布。
?????? 以有狀態處理為目標的流處理系統的設計空間還包括諸如狀態大小和數據流特征之類的維度,即每元組的計算和大小,以及key space下數據流的傾斜程度和動態性。我們將討論權衡Elasticutor與第5節中的變更方法進行比較。
3. ELASTIC EXECUTOR
?????? 為了有效地利用CPU資源,彈性執行器被設計為適應兩種動態:1)key分配的變化和2)CPU core重新分配,如圖3所示。前者來自輸入流的波動,而后者由調度程序確定全局優化。 為了在其計算資源上分配工作負載,彈性執行程序為每個分配的CPU核心創建任務,并在其上分配輸入數據元組。 在CPU重新分配時,將創建新任務或刪除現有任務。 這兩種動態都會在任務之間引入不平衡的工作負載,從而導致資源利用不足或性能下降。 因此,一個中心設計問題是如何在存在這種動態的情況下在任務之間保持平衡的工作負載分配。
3.1 Components and Working Mechanism
?????? 如圖4所示,彈性執行器被實現為輕量級,自包含的分布式子系統,其可以利用多個物理節點上的計算資源。 每個彈性執行程序主要駐留在一個稱為其本地節點的物理節點中,在該節點中,它運行本地主進程以接收輸入元組并發送輸出元組。 對于每個分配的CPU核心,在該過程中創建作為數據處理線程實現的任務。 要在遠程節點上使用CPU核心,可以創建遠程進程來托管遠程數據處理的遠程任務。
Intra-Executor Routing: 我們采用雙層設計,在圖4中央結構中顯示的路由表中實現,根據瞬時工作負載分配動態地將輸入元組映射到任務。第一層使用靜態散列函數將key Sub Space靜態劃分為Shard;第二層顯式維護動態Shard到任務映射,該映射在Shard重新分配時更新。我們在粗粒度而非每個Key的基礎上平衡工作負載,主要是因為細粒度方法需要維護每個Key的工作負載,因此遭受高內存消耗。Shard數量的選擇提供了負載平衡質量和維護開銷之間的權衡。然而,在實踐中,合理數量的Shard(例如,任務數量的4或8倍)實現了良好的平衡質量,同時保持低維護開銷。我們將在5.3節討論Shard數如何影響某些極端設置中的系統性能。
Executor-Level Fault Tolerance:在流處理系統中已經廣泛研究了容錯[8,11,14,35,49],因此既不是本文的重點也不是本文的貢獻。 在這里,我們只討論如何從故障中恢復每個彈性執行器的遠程任務,以便Elasticutor可以利用最先進的狀態檢查點技術,如流水線快照協議[11],實現容錯。
?????? 彈性執行程序的主要過程在邏輯上維護其任務狀態的主副本。默認情況下,狀態在主進程的內存中維護以進行高效訪問,但是當狀態太大而無法容納在內存中時,也可以將狀態存儲在外部存儲中。主進程還用增加的元組ID標記每個輸入數據元組。對于每個遠程任務T,彈性執行程序維護一個待處理的元組隊列以備份發送到T的數據元組,并且值ts表示已經處理了ID小于ts的所有數據元組,并且處理這些元組產生的狀態更新已被沖回主副本。每個遠程任務周期性地,例如每10秒,將其本地狀態的更新與tmax(即,已經處理的元組的最大ID)一起發送到彈性執行器的主進程。在從遠程任務T接收到狀態更新時,彈性執行器更新狀態的主副本,在T的待處理隊列中移除ID不大于tmax的元組,并更新ts = tmax。當遠程任務T失敗時,彈性執行程序使用狀態的主副本創建新任務,并通過在待處理隊列T中重放大于ts的ID的元組來開始執行新任務。
3.2 Consistent Workload Redistribution
?????? 雖然狀態共享提高了Shard重新分配的效率,但需要注意保證一致性。 一般而言,盡管使用與以資源為中心的方法的Key重新分區類似的過程,我們通過利用以執行器為中心的方法啟用的Operator間和執行者間的獨立性來實現具有狀態一致性的有效分片重新分配。
?????? 考慮圖4中的情況,其中元組t1處于任務T2的待處理隊列中,元組t2剛剛到達執行器的主進程,并且元組t3將由上游執行器發出。 假設所有三個元組都屬于shard r4。 如果在處理t1之前或在更新t2和t3的路由之前將shard r4從源任務T2重新分配給新的目標任務,則狀態將變得不一致。 特別地,如果目的地任務是本地的,例如T1,那么t2可能在t1之前被處理,違反了順序處理的要求。 如果目的地任務是遠程的,例如T0,則由t1對狀態的修改將丟失。
Inter-Operator Consistent Routing:為了保證從上游Operator到分配的任務所在的正確進程的一致路由(例如t3),彈性執行器在其本地主進程中實現接收器守護進程,作為來自上游Operator的所有元組的單一入口。接收器根據內部路由表將元組路由到適當的本地或遠程任務。類似地,發射器守護程序在主進程中實現為執行程序的單個退出,以將任務生成的輸出元組轉發給下游Operator。遠程過程僅與彈性執行器的主過程上的接收器和發射器通信。因此,無論在彈性執行器中的任務之間如何動態重新分配分片,上游和下游Operator總是通過其接收器和發送器向執行器發送元組或從接收器接收元組,從而避免由分片重新分配引起的任何Operator間同步。相反,以資源為中心的方法通過Operator級key space重新分區來重新分配工作負載,從而導致與所有上游執行程序同步。
?????? 請注意,與以資源為中心的方法相比,來自上游執行器的元組直接路由到下游Operator,Elasticutor可能涉及接收器/發射器和遠程任務之間的額外遠程數據傳輸。 這是我們為實現快速彈性而做出的權衡。 在典型的工作負載中,遠程數據傳輸不是性能瓶頸,如圖13所示。在5.3節中,我們通過正確配置Operator的執行器數量,討論如何避免/減少某些極端工作負載中的遠程數據傳輸。
Intra-Executor State Consistency: 為了保證在重新分區Shard期間的狀態一致性,彈性檢測器采用類似于以資源為中心的方法中使用的Operator級重新分區的Key重新分區過程,但不涉及任何全局同步。關鍵是要確保a)在將shard狀態遷移到目標任務之前必須處理掛起的元組,即SOurce任務中排隊的分片的未處理元組。 b)具有相同Key的元組不會同時在任何兩個任務中處理。在圖4中Shard r4的重新分配期間,暫停了元組r4的路由,并將標簽元組發送到其Source taskT2。由于任務以先來先服務的方式處理其輸入元組,因此當T2從其待處理隊列中拉出標簽元組時,保證已經發送到T2的任何待處理元組唄處理。之后,r4的狀態將遷移到目標任務。如果將分片重新分配給其源任務的本地任務,則省略狀態遷移。在狀態遷移之后,在恢復r4的元組的路由之前,在路由表中更新Shard到任務映射。
Discussions:值得注意的是,我們提出的以執行為中心的范例適用于其他現有的分布式流系統,例如Apache Flink,Apache Heron和Apache Samza,其中有狀態處理可以通過在key space下劃分狀態和數據來并行化。 對于無狀態應用程序,我們的方法仍然可以應用,但可能不一定是最佳選擇,因為通過簡單地以循環方式發送元組或者向負載最少的執行程序發送元組可以輕松實現負載平衡。
雖然我們的方法不適用于基于批處理的系統,但我們的雙層負載平衡設計與小批量定向的Spark Streaming [50]采用的方法有一些相似之處。 兩個主要的區別是:1)我們設計一個額外的中間層碎片提供了維護成本和平衡負載之間的權衡,2)我們的測量和平衡設計對流量系統更自然,其中操作過程輸入元組 而不是基于小批量。
4. DYNAMIC SCHEDULER
?????? 動態調度程序的目標是通過在不斷變化的工作負載下自適應地將CPU核心分配給彈性執行程序來滿足用戶定義的延遲要求。 通過使用由系統測量的瞬時性能指標,調度程序首先根據排隊網絡模型估計每個執行程序所需的核心數量,并進一步(重新)將物理核心分配給執行者,以便最小化重新分配開銷 并最大化執行程序內的計算位置。
4.1 Model-Based Resource Allocation
?????? 我們將m個彈性執行器的拓撲E = {1,··,m}建模為Jackson網絡,其中每個執行器j∈E被視為M / M / kj系統[42],其中kj表示分配給j的CPU核心數。 輸入流的平均處理等待時間,表示為E [T],可以作為資源分配決策k的函數來計算。
?????? 其中λ0表示輸入流的到達率,Tj和λj分別表示執行器j的平均處理時間和到達率。 當kj>λj/μj時,每個E [Tj](kj)是有界的,其中μj表示彈性執行器j的處理速率,并且可以作為由系統測量的參數λ0,{λj}和{μj}的函數來計算。基于等式(1),調度器嘗試找到分配k以確保E [T]不大于用戶指定的等待時間目標Tmax,同時最小化CPU核心的總數。 特別地,每個kj被初始化為λj/μj+ 1,這是使系統穩定的最低要求。 我們重復向向量k中的值加1,導致E [T]的最顯著減少,直到E [T]≤Tmax或íkj超過可用CPU資源的數量。 這個貪心算法在找到解k時已經證明是最優的[22]。
4.2 CPU-to-Executor Assignment
?????? 性能模型僅建議新的分配,即每個執行者需要的CPU核心數量,這是由工作負載波動引起的; 調度程序仍然需要通過更新現有的核心到執行程序分配來適應新的分配計劃。 新分配計劃的CPU重新分配是系統性能的關鍵,因為它可能會引入1)轉換期間的狀態遷移成本,以及2)之后的遠程數據傳輸成本。 例如,在重新分配CPU內核時,彈性檢測器會創建一個新任務,如果CPU內核遠離彈性執行程序,則涉及狀態遷移和將來的遠程數據傳輸。 為了優化執行效率,我們搜索最小化遷移成本的CPU到執行器分配,同時限制計算局部性以限制未來的遠程數據傳輸成本。
?????? 為了模擬遷移成本,我們考慮一個n個節點的集群,其中每個節點都有ci CPU核心。對于任何執行器j∈E,我們用I(j)表示其主進程所在的節點,并且通過列向量xj =(x1j,...,xnj)T表示在所有節點上分配給它的核的數量。我們將jj = i = 1 xi jas定義為j的指定核心總數,并用矩陣X =(x1,···,xm)表示CPU到執行器的分配。給定任何新的分配k,從現有分配X到新分配X的轉換需要執行一組CPU分配/解除分配。核心重新分配的開銷由狀態遷移成本決定,狀態遷移成本與跨網絡移動的狀態大小成比例。我們用sj表示任何執行者j的聚合狀態大小。為簡單起見,我們假設彈性執行器的分片均勻分布在分配的CPU核心上;因此,與每個CPU核心相關的狀態數據量大約是sj / Xj。給定任何分配k,可用核心c和現有分配X~,我們按如下方式制定CPU分配問題。
上述優化問題最小化了從現有賦值X到新賦值X的遷移成本C(X | X?),其中求和中的每個項都測量執行者j將其狀態遷移出節點i的成本。約束包括(a)CPU核心的數量,(b)分配要求和(c)計算局部性,即,要求分配給執行器的集合E(φ)的所有核心都在其本地節點上。系統通過其總輸入和輸出數據速率除以核心數kj來測量任何執行器j的瞬時每核數據強度,并且E(φ)表示數據強度高于閾值的執行器集合。 。因為如果分配的核心是遠程的,數據密集型執行程序將產生更高的網絡成本,我們通過避免將遠程核心分配給E(φ)的成員來強制執行計算局部性。這個整數規劃問題可以簡化為NP-hard多處理器調度問題[23]。因此,我們設計了一個有效的貪婪算法1來找到近似解。對于任何賦值X,我們將E + = {j∈E| Xj <kj}定義為欠配置執行器的集合,E +Δ= {j∈E+∩E(φ)}作為數據密集型執行器的子集,并且E- = {j∈E| Xj> kj}是一組過度供應的執行者。我們使用C + i j(X)和
C-ij(X)分別表示在節點i上分配/取消分配CPU核心到執行器j的開銷,可以導出為C + ij(X)= sj(Xj-xi j)/(Xj( Xj + 1))和C-ij(X)= sj(Xj-xi j)/(Xj(Xj-1))。
算法1按數據強度按降序對E +中的執行程序進行排序,并嘗試通過從其他執行程序解除分配核心,逐個為每個執行程序j分配目標CPU核心數。具體來說,如果彈性執行器j是數據密集型的,即j∈E(φ),它只接受節點i = I(j)上的CPU核心,以避免創建遠程任務。因此,在所有非數據密集型執行程序中,算法在節點I(j)上找到一個CPU核心,可以用最小的釋放開銷重新分配給j(第7行)。相反,如果j不是數據密集型,則它接受任何節點上的CPU核心。該算法搜索E-中的所有執行程序,以獲得具有CPU內核的執行程序,該內核可以通過最小的釋放和分配開銷重新分配給j(第9行)。在任何一種情況下,如果找到這樣的有效核心重新分配,則算法將其添加到新的賦值X中;否則,它返回FAIL,這表示沒有找到可行的解決方案,并且暗示需要更高的數據不敏感閾值φ來獲得可行的解決方案。
?????? 算法1按數據強度按降序對E +中的執行程序進行排序,并嘗試通過從其他執行程序解除分配核心,逐個為每個執行程序j分配目標CPU核心數。具體來說,如果彈性執行器j是數據密集型的,即j∈E(φ),它只接受節點i = I(j)上的CPU核心,以避免創建遠程任務。因此,在所有非數據密集型執行程序中,算法在節點I(j)上找到一個CPU核心,可以用最小的釋放開銷重新分配給j(第7行)。相反,如果j不是數據密集型,則它接受任何節點上的CPU核心。該算法搜索E-中的所有執行程序,以獲得具有CPU內核的執行程序,該內核可以通過最小的釋放和分配開銷重新分配給j(第9行)。在任何一種情況下,如果找到這樣的有效核心重新分配,則算法將其添加到新的賦值X中;否則,它返回FAIL,這表示沒有找到可行的解決方案,并且暗示需要更高的數據不敏感閾值φ來獲得可行的解決方案。
?????? φ的選擇提供了公式4.2的可行性與彈性執行器的計算局部性之間的權衡。由于動態分配算法非常有效,我們使用低默認值φ=?φ運行算法。如果沒有找到可行的解決方案,我們將φ加倍并重新運行算法,直到我們找到一個。在我們的實驗中,我們將φ~設置為512 KB / s,低于該值時,計算局部性的好處可以忽略不計。
Discussions:我們的動態調度程序設計適用于使用連續運算符的流處理,并遵循數據流模型[5]。調度程序確定每個執行程序滿足延遲要求所需的資源,并計算資源分配以最大限度地降低狀態遷移成本。在這個級別工作的其他調度程序包括Flink的DS2 [28],Heron的Dhalion [21],Storm的RAS [36]等等。相比之下,基于云的資源管理系統(如YARN [44]和Mesos [25])更加以集群為中心[26,27],即它們主要旨在管理不同應用程序之間的集群資源。他們通常會收到應用程序經理的資源需求,并根據效率和公平等標準來決定如何配置資源。通常開發協商器/協調器模塊用于協助不同級別的調度器之間的交互。典型的例子包括Storm-on-Yarn [3]和Flink-on-Yarn [2]。
5. PERFORMANCE EVALUATION
?????? 我們在Apache Storm上大約10,000行Java中實現了Elasticutor的原型[43]。 Elasticutor的源代碼可在[4]獲得。 Storm是一種流行的分布式流處理系統,它暴露了低級API,例如Bolt API。 這對于原型研究思想來說相對容易一些。 Storm遵循靜態方法,其操作符由用戶通過抽象類Bolt實現。 我們添加了一個新的抽象類ElasticBolt,它提供了與Bolt相同的編程接口,但是向用戶空間公開了一個新的狀態訪問接口。 對于任何定義為ElasticBolt的運算符,Elasticutor創建了許多具有內置狀態管理,度量標準Metrics測量和彈性功能的彈性執行器。 動態調度程序實現為在Storm的主節點(nimbus)上運行的守護程序進程。
?????? 我們的實驗在Amazon EC2上進行,具有32個t2.2x大型實例(節點),每個實例具有8個CPU核心和32GB RAM,運行Ubuntu 16.04。網絡是1Gbps以太網。在所有方法下,執行器以循環方式分配給節點。除非另有說明,否則Elasticutor每個運算符使用32個彈性執行程序,每個執行程序使用256個分片(每個運算符8192個分片)。為了公平比較,我們為靜態方法中的Operator創建了足夠的執行程序,以充分利用集群中的所有CPU核心;并將RC方法中的key spcae分區的粒度設置為每個運算符8192個分片,與Elasticutor中的相同。為確保系統穩定性,Storm,Heron和Flink等現有流系統實施反壓機制,以控制Operator的輸入速率。為了關注系統性能,我們評估壓力情況,其中足夠高的到達率使輸入隊列保持非空并且可能觸發Storm的背壓機制。
?????? 我們在2.3節中詳細討論了有狀態流處理設計空間中的Elasticutor。在本節中,我們將Elasticutor的性能與靜態方法(默認Storm)和以資源為中心(RC)方法的性能進行比較。第2.2節總結了這三種方法的主要區別。我們通過啟用創建/刪除執行程序和Operator級Key重新分區來實現基于Storm的RC。為了公平比較,RC使用與Elasticutor相同的性能模型,負載平衡算法和進程內狀態共享機制。我們將評估Elasticutor在設計空間中的不同維度所做出的性能和權衡,包括狀態大小,每元組計算和大小,以及數據流的偏度和動態性。通常,只要有足夠的計算資源可用于系統中的擴展,Elasticutor amis的設計就可以容納計算密集型工作負載。但是,由于遠程任務的引入可能會導致數據傳輸和狀態遷移開銷和延遲,我們的設計假定工作負載在元組大小和狀態大小方面不會過于數據密集,并且網絡帶寬容量確實如此不成為瓶頸。我們假設在關鍵領域的數據分布中表現出的傾斜度是一種規范,我們關注的是更具挑戰性的情況,其中數據傾斜度也會突然變化,這可以在股票交易數據集和評估中顯示出來。
5.1 Micro-Benchmarking
?????? 在本小節中,我們使用一個簡單而有代表性的拓撲結構,如圖5所示,它允許輕松控制工作負載特性,例如輸入速率,計算成本和數據分布。拓撲由生成器和計算器組成,輸入數據流由生成器饋送到計算器進行處理。我們確保數據生成速率使計算器的輸入隊列飽和。計算器運算符中每個元組的處理時間遵循正態分布N(μ,δ2=0.5μ)。通過在執行時間內循環運行數據加密來實現計算,以耗盡CPU周期并模擬計算密集型工作負載。除非另有說明,否則每個元組由一個整數鍵和一個128字節的有效負載組成,并且處理的平均CPU成本為1 ms。密鑰空間包含10K個不同的值,其頻率遵循zipf分布[37],偏差因子為0.5。默認狀態大小為256MB,每個分片為32KB。為了模擬工作負載動態,我們通過應用每分鐘ω次隨機排列來改變元組Key的頻率。
工作負載動態的穩健性:圖6描繪了隨著ω沿x軸變化的三種方法下的吞吐量和平均處理延遲。我們觀察到,當工作負載是動態的時,Elasicutor在兩個指標方面始終優于其他方法,即ω > 0。特別是,由于密鑰分配偏差導致工作負載不平衡,靜態方法的性能較差,但由于沒有執行彈性操作,因此在所有情況下都相對穩定。 由于RC和Elasticutor都能夠適應偏斜的鍵分布,因此當ω很小時,它們會大大優于靜態。 然而,隨著ω的增加,盡管由于彈性操作成本較高而導致RC和Elasticutor的性能下降,但Elasticutor的性能下降是微不足道的,而RC的性能下降變大了2-3個數量級,使RC無用為ω 達到16。
?????? 為了更好地解釋ω變化時三種方法的性能,我們關注ω= 2的情景,即每30秒進行一次混洗,并繪制在圖7中1秒的滑動時間窗口內測量的瞬時吞吐量。我們觀察到 靜態方法的吞吐量始終遠低于RC和Elasticutor的吞吐量,盡管變化不大。 由于關鍵混洗觸發彈性操作的執行,RC和Elasticutor每30秒就會出現一次瞬態吞吐量降低。 然而,RC的退化要差得多,其瞬態持續時間為10到20秒,而Elasticutor的衰減僅持續1到3秒。 這解釋了隨著工作負載變得更加動態,兩種方法中性能差距擴大的原因。
?????? 在不同數據強度下的性能:為了評估工作負載的數據強度如何影響三種方法的性能,我們改變元組大小(表示為s)和每個元組的計算成本(表示為c),并將其性能進行比較。結果表明,在數據強度較高的情況下,例如,元組大小較大或每個元組的計算成本較低,由于數據傳輸開銷較高,三種方法的吞吐量會下降。例如,當c = 0.01ms且s = 2KB時,一個CPU內核上的全速元組處理的數據傳輸要求是2Gbps,超過了網絡帶寬,即1Gbps,因此導致性能顯著下降。所有的方法。但是,Elasticutor通常對元組大小比競爭對手更敏感,特別是當計算成本極低時,例如,每個元組c = 0.01ms,因為它具有獨特的兩級元組路由機制,從而帶來更高的數據傳輸開銷數據強度。
?????? 不同狀態大小下的性能:圖9比較了三種方法在吞吐量和延遲方面的性能,因為狀態大小沿x軸變化。請注意,由于每個運算符有8192個分片,因此當每個分片的狀態大小為32MB時,運算符的狀態大小將為256GB,這相當大。結果表明,隨著狀態大小的增加,RC和Elasticutor的性能下降,這是由于較大的狀態大小導致的狀態遷移開銷增加。當狀態大小接近32MB時,作為一種極端情況,由于執行彈性的巨大運營成本,Elasticutor和RC都比靜態方法表現更差。我們還觀察到,在相同的州規模下,Elasicutor的表現優于RC方法。這表明Elasticutor中使用的技術,如狀態共享機制和動態調度,可以有效地減少彈性操作中的狀態遷移開銷。
對執行工作負載分配傾斜的魯棒性:實際上,由于Key分配偏差或者由于Operator級Key分區功能的不正確配置,工作負載可能無法在執行程序之間平均分配。為了評估三種方法對傾斜執行器工作負載分布的穩健性,我們使用由圖10中的偏度因子α控制的變化的密鑰分布偏度來評估它們的性能。注意,α越大,密鑰分布具有越大的偏斜。例如,當α= 0時,密鑰遵循均勻分布,而當α≥0.8時,大多數工作負荷落入幾個密鑰。結果表明,靜態方法受到負載不平衡的影響很大,而α<0.8時,Elasticutor和RC對執行器負載不平衡的抵抗力更強。主要觀察結果是,當α≤0.6時,Elasticutor始終優于RC,但其性能急劇下降,并且在極度偏斜的工作負荷分布下比RC差,例如α≥0.7。這表明盡管依賴于創建更多遠程任務來處理傾斜的執行程序工作負載分配,但Elasticutor中的執行程序能夠處理高達α= 0.5的工作負載不平衡,而不會在運行遠程任務時引入明顯的延遲增加和吞吐量降低。但是,當0.6≤α≤0.8時,大多數過載執行器無法通過有效利用更多遠程任務來進一步卸載其工作負載,這主要是由于擁塞的網絡帶寬,因此成為性能瓶頸,導致系統吞吐量和延遲較差。
Shard reassignment cost:因為RC方法和Elasticutor都使用碎片重新分配來平衡工作負載,我們會比較它們的成本以更好地理解產生的不同延遲。 圖11顯示了每個分片的平均節點內和節點間重新分配時間,分為同步時間和狀態遷移時間。 我們觀察到RC中的碎片重新分配時間遠遠長于Elasticutor,這主要是由于RC方法中的同步時間極長。 我們還可以看到Elasticutor在狀態遷移中花費的時間比RC短,但與同步時間相比,狀態遷移中兩種方法之間的差異較小。
?????? 為了深入了解兩種方法之間的同步時間差異,我們改變了上游執行器的數量,并發現RC比Elasticutor需要2-3個更大的時間來同步,并且它們的差異隨著更多的上游執行器而變寬, 如圖12(a)所示。 Elasticutor遵循以執行者為中心的標準,從而避免在分片重新分配期間與上游檢查員同步。 因此,無論上游執行器的數量如何,其同步時間約為2 ms。 相反,在RC方法中,需要更新上游執行器的路由表,并且需要全局同步來清除執行器和上游執行器之間的飛行中元組。 因此,RC中的同步時間要高得多,并且隨著上游執行器的數量而大大增加。
圖12(b)描繪了狀態大小變化時的狀態遷移時間。 我們觀察到,由于進程內狀態共享機制,兩種方法中的節點內狀態遷移的延遲可忽略不計。 當狀態大小達到32 MB時,節點間狀態遷移的時間顯著增加,其中狀態的網絡數據傳輸是狀態遷移過程中的主要開銷。 該圖還顯示,在給定相同狀態大小的情況下,由于執行器為中心的范例啟用了執行器間獨立性,因此Elasticutor遷移狀態所需的時間比RC短一些。
5.2 Scalability of a Single Elastic Executor
?????? Elasticutor的主要優點是它通過分配更多CPU內核而不是通過Operator級Key Space重新分區來處理工作負載動態。 盡管在一個合理的設置中,Operator通常有足夠的執行器來分攤單個執行器上的工作負載,但由于Key分配偏差,操作員不正確,執行人員可能負載過重而需要許多遠程任務。 級別分區或不必要的執行程序。 因此,為了Elasticutor的健壯性,彈性執行器具有良好的可伸縮性是至關重要的,即能夠有效地擴展到許多CPU核心,并且在運行遠程任務時不會引入明顯的延遲。
?????? 為了評估彈性執行器可以有效擴展的范圍,我們只為計算Operaotr設置了一個彈性執行器,但逐漸分配更多的CPU核心并測量其吞吐量和處理延遲。 由于每個節點有8個CPU核心,因此分配的前8個核心是本地核心,后續核心是遠程核心。 在我們的評估中,我們改變了彈性的數據強度和運營成本,這是影響可擴展性的主要因素。 前者決定了遠程數據傳輸在運行遠程任務時的長期成本,并且與元組大小成正比,與每個元組的計算成本成反比。 后者影響執行彈性操作的短期運輸開銷,并與規模和工作量動態(ω)呈正相關。
?????? 圖13描繪了執行器在不同計算成本(左)和元組大小(右)下的可伸縮性。我們觀察到單個彈性執行器通常可以有效地擴展到整個集群(256個CPU核心),這表明遠程數據傳輸的成本可以忽略不計。我們還觀察到彈性執行器無法有效地利用超過16個具有非常大的元組大小的CPU核心,例如8KB,或者非常低的計算成本,例如每個元組0.01ms,這表明巨大的遠程數據傳輸鏈接到高數據強度可防止執行程序擴展。圖14顯示了彈性執行器向外擴展時的99%延遲。我們可以看到,在大多數情況下,由于Netty [1]啟用了有效的網絡數據傳輸,處理延遲不會隨著彈性執行器的擴展而顯著增加。然而,在數據密集型工作負載中,例如,計算成本≤0.1ms或元組大小≥2KB,隨著分配的CPU核心數超過遠程數據傳輸成為性能瓶頸的點,等待時間大大增加。請注意,由于我們在任何一對輸入輸出執行器之間實現了反壓機制,因此延遲不會無限增長。
?????? 圖15顯示了彈性執行器在各種碎片狀態大小下的可擴展性,ω= 2(左)和16(右)。 結果表明,彈性執行器在所有狀態尺寸下均可有效擴展,但是32MB。 狀態較大時,狀態遷移會成為性能瓶頸,從而阻止執行程序有效地使用遠程CPU核心。 通過比較兩個子圖,我們觀察到當ω增加到16時,由于與更高工作負載動態相關的狀態遷移需求增加,大狀態下的可擴展性顯著降低。
5.3 Choosing Appropriate Parameters
?????? Elasticutor中有兩個重要參數:每個執行程序的硬數,表示為z,每個運算符的執行數,表示為asy。作為一個規則,將z設置在256和1024之間可以實現良好的內部執行器負載平衡,并且將y設置為計算密集型運營商的節點數可以為那些運營商提供足夠的潛力來擴展工作負載陣陣。然而,在下文中,我們在各種工作負載下評估系統性能的大范圍(y,z),以便了解這兩個參數影響系統性能的原因和方式以及如何在極端情況下選擇合適的參數工作負載。為了進行全面觀察,我們使用三種代表性工作負載,即默認工作負載,數據密集型工作負載和高度動態工作負載。設s和ω分別表示以字節為單位的元組大小和每分鐘的密鑰重組。在默認工作負載中,(s,ω)=(128B,2)。我們分別通過將s增加到8K和ω增加到16來獲得數據密集型工作負載和高動態工作負載。因此,(s,ω)=(8K,2)用于數據密集型工作負載,(s,ω)=(128B,16)用于高動態工作負載。圖16顯示了在三個工作負載下具有各種y和z的系統吞吐量。為了比較,我們還在圖中顯示了靜態和RC方法的吞吐量。
?????? Number of shards:從圖16中,我們觀察到隨著z增加,吞吐量通常會增加,盡管邊際增長正在減少。 這表明當使用太少的分片時,例如,z≤64,執行器內負載平衡質量差,妨礙彈性執行器有效地利用多個核心; 然而,太精細的分片(例如,z≥1024)不會進一步提高吞吐量,因為執行器內的負載平衡已經有效。 基于那些z = 16個觀測值,我們驗證每個執行器256到1024個分片實現了良好的性能。
?????? Number of executors:如圖16(a)所示,對于一個足夠大的z,除了y = 256之外,Elasticutor實現了有希望的性能。當y = 256時,即集群中的CPU核心數量,每個彈性執行器只能分配一個CPU核心。因此,執行者失去彈性,Elasticutor被降級為靜態方法。通過比較圖16(a)和圖16(b),我們可以看到,當元組大小增加到8K時,靜態和RC的性能變化不大,而在y = 1的情況下Elasticutor的性能嚴重下降。與默認工作負載相比,在數據密集型工作負載中運行遠程任務時遠程數據傳輸的成本高出64倍。這限制了單個執行程序的可伸縮性,因此導致單個執行程序需要擴展到許多遠程CPU核心的小y的性能較差。通過比較圖16(a)和圖16(c),我們觀察到隨著洗牌頻率從2增加到16,雖然吞吐量一般會減少,但當y很小時,減少幅度要大得多,即1或者8.在頻繁混洗的動態工作負載下,例如ω= 16,需要重新分配更多分片以進行負載平衡,從而導致高遷移成本。相反,當y足夠大時,大多數執行器可以使用本地CPU內核進行擴展,從而避免由于內部處理狀態共享機制導致的狀態遷移;因此,吞吐量不會降低太多。總之,為每個節點設置一個或兩個執行程序對各種工作負載都很穩健。
5.4 Evaluation of Realtime Application
?????? 為了評估Elasticutor在實際應用中的表現,我們使用上海證券交易所(SSE)交易的股票的匿名訂單數據集,收集時間超過三個月,每個交易時間約有800萬條記錄。該應用程序執行證券交易所的市場清算機制,并提供實時分析。應用程序的拓撲結構如圖17所示。輸入流由買方和賣方的限價訂單組成,這些限價訂單指定了特定庫存的特定交易量的出價和要價。順序元組的大小為96字節。在新訂單到達時,交易員操作員針對未完成的訂單執行它,并確定交易數量和現金轉移。一旦進行了這樣的交易,就會向下游運營商發送160字節的交易記錄,包括時間,股票數量和交易價格以及賣方,買方和股票的ID,包括6個運營商的統計數據和5個事件處理的運營商。分析運算符生成統計數據,例如移動平均值和綜合指數,并觸發用戶定義的事件,例如當特定股票的交易價格超過預定義閾值時的警報。每個統計運營商的狀態大小約為200MB到400MB,而事件處理運營商的狀態相對較小,低于10MB。由于交易和分析涉及個股,我們將庫存ID的空間劃分為并行處理。由于股票交易具有不可預測的性質,股票的到達率和分布均隨著時間的推移而波動很大,從而導致高度動態的工作量。為了說明工作負荷動態,圖18顯示了5種最受歡迎??的股票的即時到達率。
?????? 除了靜態,RC和Elasticutor之外,我們還測試了一個天真的以執行器為中心(naive-EC)的實現,它與Elasticutor相同,只是在調度程序中禁用了遷移成本和計算局部性的優化。 圖19繪制了在32個節點上運行的四種方法下的瞬時吞吐量和第99百分位處理延遲。 我們觀察到,naive-EC和Elasicutor都優于靜態和RC方法,大約使吞吐量翻倍,并將延遲降低1-2個數量級。 盡管naive-EC和Elasticutor之間的性能差距是可識別的,但與執行者為中心的方法和其他兩種方法之間的差距相比,它們之間的差距很小。 這一觀察結果表明,盡管動態調度程序中的優化能夠顯著提高性能,但Elasticutor的更好性能主要歸功于采用的以執行器為中心的有利范式。
?????? 為了進一步說明naive-EC和Elasticutor之間性能差距背后的原因,我們在表2中顯示了它們的狀態遷移率和遠程數據傳輸率。前者的速率是整個系統在網絡中遷移的狀態的聚合大小。 單位時間。 后一種速率是在所有彈性執行器和它們的遠程任務之間以單位時間傳輸的數據的總量。 我們觀察到,naive-EC下的狀態遷移率和遠程數據傳輸率分別比Elasticutor下的5倍和10倍高。 通過較少的狀態遷移,彈性執行器轉換到新的資源分配計劃將更有效,從而實現更高的性能。 同樣,通過較少的遠程數據傳輸,Operator間數據傳輸可以使用更多的網絡帶寬,從而進一步提高性能。
?????? 最后,我們在SSE工作負載下評估Elasticutor的可伸縮性。 我們改變計算集群的大小,即節點的數量,并測量Elasticutor的吞吐量和調度成本,即動態調度器計算新的CPU到執行器分配所需的平均時間。 保持較低的調度成本對于系統適應動態工作負載非常重要。 表3顯示了隨著規模增加的吞吐量和調度成本。 我們觀察到隨著集群的增長,吞吐量幾乎呈線性增長; 并且調度成本大約是幾毫秒,并且隨著節點數量的增加而略有增長.
6. RELATEDWORK
?????? Stream Processing System 早期的流處理系統,如Aurora [7],Borealis [6],TelegraphCQ [17]和STREAM [10],旨在通過利用分布式但靜態的計算資源來處理海量數據更新。 借助云計算技術,出現了新一代流系統,重點是并行數據處理,可用性和容錯,以充分利用基于云的平臺上的靈活資源管理方案。 Spark Streaming [50],Storm [43],Samza [35],Heron [31],Flink [12]和Waterwheel [46]是最流行的開源系統,提供分布式流處理和分析。 大型工業企業也在開發內部分布式流系統,如Muppet [32],MillWheel [8],Trill [16],Dataflow [9]和StreamScope [33]。
Elasticity.大量的工作探索了實現彈性的可能性。卡斯特羅等人。 [15]將資源重新擴展操作與分布式流系統中的容錯功能相結合,以便在遷移到新計算節點之前將與處理邏輯綁定的中間狀態寫入持久性存儲。王等人。 [47]提出了彈性流水線技術,以便為分布式SQL查詢啟用動態,工作負載感知的運行時重新配置。在Flux [39]中提出了一種自適應分區算子,以實現節點之間的分區移動以實現負載平衡。但是,由于其工作負載遷移基于每個分區,因此當單個分區超出群集中任何節點的處理能力時,此方法將面臨困難。 ChronoStream [48]將計算狀態劃分為一個集合的粒度切片單元,并在節點之間動態分配它們以支持彈性。 Gedik等人。 [24]提出了在不違反狀態一致性的情況下擴展有狀態運算符的機制。 Chi [34]是一個具有監控和動態重新配置功能的控制面板。然而,這些方法在以資源為中心的標準之后實現了彈性,這導致了昂貴的同步并且妨礙了快速彈性。它們適用于以粗糙時間粒度使用彈性功能的情況,即每5分鐘;實現的彈性太慢,無法應用于具有高動態工作負載的應用中。 Elasticutor采用新的以執行器為中心的方法來避免這個問題。此方法大大降低了執行工作負載重新同步的同步開銷,因此可在幾毫秒內實現工作負載重新分配。
Workload Distribution。分布式流系統的通用工作負載分配是一個具有挑戰性的問題,因為隨著時間的推移,輸入數據流的偏差很大并且差異很大。 Shah [39]等。為傳統流處理框架中的單個操作設計了動態工作負載再分配機制,例如Borealis [6]。 [24]和[20]研究了混合路由策略,通過其密鑰對工作負載進行分組,以便根據CPU,內存和帶寬資源動態平衡負載。 TimeStream [38]采用圖形重組策略,直接用全新的處理拓撲替換原始處理拓撲。然而,系統在所有可應用的圖形結構的巨大搜索空間中監視和優化拓撲結構具有挑戰性。 Cardellini等。 [13]研究了Storm之上的有狀態任務遷移。丁等人。 [18]討論了基于馬爾可夫決策過程(MDP)制定任務遷移計劃的長期優化,以提高分布式流引擎的資源利用率。但是,Elasticutor不僅可以實現工作負載分配中的負載平衡,還可以考慮遷移成本最小化和計算局部性。
7. Conclusion
我們設計并實現了Elasticutor,它為流處理系統提供了快速彈性。 彈性器遵循一種新的以執行器為中心的方法,該方法將執行程序靜態綁定到運算符,但允許執行程序獨立擴展。 這種方法將操作員的擴展與有狀態處理所需的全局同步分離開來。 Elasticutor框架有兩個構建塊:彈性執行器,執行動態負載平衡,以及優化計算資源使用的調度程序。 實驗表明,與傳統的以資源為中心的提供彈性的方法相比,彈性器使吞吐量增加一倍,平均延遲降低了幾個數量級。
總結
以上是生活随笔為你收集整理的一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一周一论文(翻译)——[IEEE 14]
- 下一篇: 一周一论文(翻译)——[VLDB 19]