基于 C# 的 ETL 大数据并行编程
作者:James Spinella
譯者:精致碼農
原文:https://bit.ly/3nGQu4J
并行編程在歷史上一直是軟件開發中比較小眾和復雜的環節,往往不值得頭疼。但編寫并行化應用只會越來越簡單,一個應用同時利用設備 CPU 上的多個內核,來實現效率最大化也是很常見的。
如今,隨著數據工程作為一個專業領域的興起,并行編程比以往任何時候都更受歡迎。Apache Spark 是一個用于Extract(提取), Transform(轉換) 和 Load(加載)——ETL 大型數據集的軟件庫,可能是當今最流行的并行編程的方式。雖然 Apache 的 Spark、Hadoop 和 AirFlow 是對數據工程師來說最常見的技術,但它們的使用要求精通的不是 C#,而是 Python、Scala 或 Java(盡管不是很理想)。
對于 ETL 工作來說,“正確”的工具應該是 Spark 或 Hadoop 這樣的工具。它們是專門為 ETL 設計的,與 C# 或其他語言相比,需要你編寫的代碼更少。如果你想在數據工程這個領域深耕,你最好的選擇學習一下 Python 和 Spark(以及其他技術)。盡管如此,但有時正確的工具是你已經知道如何使用的工具。事實上,我已經發現 C# 和 .NET 可以勝任并行化 ETL 操作的任務。
微軟也一直在開發?.NET for Apache Spark[1],它允許我們即使只會 C# 也能吃上 Spark 這塊蛋糕。很大程度上仍在開發中,但目前你已經可以開始嘗試了。(譯注:想了更多關于 .NET for Apache Spark 的內容,可以閱讀我的另一篇譯文:使用 .NET 5 體驗大數據和機器學習)
多年來,C# 的發展使并行編程變得越來越簡單。因為 C# 與以前的版本保持 100% 的向后兼容,所以很難知道在眾多并行運行代碼的方法中哪種是最好的方法。事實上,在 .NET 中,有好幾種方法可以啟動多個線程,而真正的問題在于,你希望 .NET 在“背后”為你處理多少和你希望自己手動處理多少。
一般來說,我們確實希望 .NET 盡可能多的為我們處理,特別是在線程管理方面,因為并行運行代碼是(編程時)非常復雜的,而且非常容易出現意外的運行時錯誤。事實上,微軟在這里[2]有一篇專門的文檔涵蓋了并行編程的潛在陷阱。
我建議大家讀一讀,但就 ETL 和其他“數據處理”任務而言,我們真正需要擔心的只有兩件事:并行化是否真的會更快,如果是,確保我們的代碼是線程安全的。
CPU密集型 vs IO密集型
在確定“并行化”代碼是否值得時,重要的是要了解應用程序的哪些部分是?CPU 密集型(CPU-bound)而不是?IO 密集型(IO-bound)的。正如你可能已經猜到的那樣,并行化增強了 CPU 密集型代碼的性能,它不僅不會對任何 IO 瓶頸(bottlenecks)產生改善,而且可能會加劇應用程序的 IO 瓶頸(相比之下,異步編程的目的是減少 IO 密集)。
CPU 密集型的代碼通常是對程序中的對象進行的運算或其他操作。解析 CSV 文件、映射對象和計算平均值都依賴于 CPU。在處理數據時,效率來自于根據可用的 CPU 數量拆分數據集,基本上在每個 CPU 上同時運行程序--只是處理的是整個數據集不同的分組。
當代碼的執行依賴于通過“網線”發送或接收數據時,即通過互聯網或內部網絡連接到另一個服務器時,代碼就是 IO 密集型的。這種情況在調用 API 或數據庫的存儲庫方法中最為常見。如果代碼向持久性存儲(如硬盤或固態硬盤)寫入或讀取,那么它也可以是 IO 密集型的。如果我們使用的第三方 API 需要 10 秒才能將數據返回給我們的應用程序,我們也沒有什么辦法,然而通過異步編程,我們至少可以讓我們的程序在等待 API 調用的時候繼續使用 CPU 運行其他部分,而不是閑置。
在處理每個數據集都依賴于 IO 密集型調用的情況下,比如調用數據庫對處理后的數據進行 INSERT,那么我們運行程序的 CPU 線程數與調用外部源(如 API 或數據庫)的次數之間的平衡就顯得非常重要。如果 API 有使用限制(例如每秒 10 個請求),或者數據庫服務器沒有足夠的線程來處理我們的程序比如說 20 個線程都試圖同時向同一個數據庫 insert,那么這一點就尤其重要。
在不使程序的 IO 密集型過載的情況下最大化效率和最小化程序運行時間,可能需要對程序在 ETL 過程的各個步驟中使用的線程數量進行一些試錯式的調整。
Parallel.ForEach vs PLINQ
當使用 .NET 時,現在可以通過一個簡單的 Parallel.ForEach 循環或?Parallel LINQ[3](PLINQ)來實現并行化應用程序所需的一切。對于 .NET 來說,這些并不是特別新的東西,但與它們的前身相比,它們更容易使用,因為它們需要創建和管理線程,并手動分割集合。Parallel.ForEach 和 PLINQ 兩者都為我們處理了這一切,根據我的經驗,兩者之間的性能沒有明顯的差異。我猜測它們在底層調用的代碼大致相同。
下面是分別使用 Parallel.ForEach 和 PLINQ 讀取 CSV 文件的示例:
// PLINQ using all CPU cores public?static?void?PLINQAll(string filePath) {var sw = new Stopwatch();sw.Start();var results = System.IO.File.ReadAllLines(filePath).AsParallel().Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).ToList();sw.Stop();Console.WriteLine($"PLINQ using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// PLINQ using all CPU threads (2x cores) public?static?void?PLINQAllThreads(string filePath) {var threads = Environment.ProcessorCount * 2;var sw = new Stopwatch();sw.Start();var results = System.IO.File.ReadAllLines(filePath).AsParallel().WithDegreeOfParallelism(threads).Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).ToList();sw.Stop();Console.WriteLine($"PLINQ using all THREADS {(threads)}: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// PLINQ using 2 CPU cores public?static?void?PLINQ2(string filePath) {var sw = new Stopwatch();sw.Start();var results = System.IO.File.ReadAllLines(filePath).AsParallel().WithDegreeOfParallelism(2).Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).ToList();sw.Stop();Console.WriteLine($"PLINQ using 2 cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// PLINQ using user-defined thread count public?static?void?PLINQUser(string filePath, int numThreads) {var sw = new Stopwatch();sw.Start();var results = System.IO.File.ReadAllLines(filePath).AsParallel().WithDegreeOfParallelism(numThreads).Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).ToList();sw.Stop();Console.WriteLine($"PLINQ using {numThreads} threads: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// PLINQ using all CPU cores without Regex parsing - May yield bad data due to commas within fields public?static?void?PLINQNoRegex(string filePath) {var sw = new Stopwatch();sw.Start();var results = System.IO.File.ReadAllLines(filePath).AsParallel().Select(x => x.Split(',')).ToList();sw.Stop();Console.WriteLine($"PLINQ using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// Parallel.ForEach using all CPU cores - May yield bad data public?static?void?ParallelForEach(string filePath) {var sw = new Stopwatch();sw.Start();var rows = new List<string[]>();Parallel.ForEach(File.ReadLines(filePath), line =>{rows.Add(line.Split(','));});sw.Stop();Console.WriteLine($"Parallel.ForEach using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }// Parallel.ForEach using all CPU cores with Regex parsing - takes roughly the same amount of time as PLINQ public?static?void?ParallelForEachRegex(string filePath) {var sw = new Stopwatch();sw.Start();var rows = new List<string[]>();Parallel.ForEach(File.ReadLines(filePath), line =>{rows.Add(Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"));});sw.Stop();Console.WriteLine($"Parallel.ForEach w/Regex using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds"); }那么應該選擇哪一種方式呢?我建議根據你在非并行情況下使用哪種方法來決定--你會使用?foreach?循環還是?LINQ?查詢?也就是說,這兩者之間最大的、不明顯的區別是 Parallel.ForEach 允許你指定線程數,最多不超過計算機或服務器上可用的線程數(你可以指定更多的線程數,但它最多只能啟動 CPU 所擁有的線程數)。然而,根據你的指定,PLINQ 可以使用超過計算機上 CPU 線程數的線程(譯注:不是所有的線程都是工作中的,所以理論上可以創建無數個線程)。一般情況下,你不會希望啟動超出比 CPU 線程更多的線程,但在某些情況下,這樣做會更有效。例如,如果你要寫一個網絡爬蟲,那么啟動雙倍的線程數量可能是有意義的,因為每個線程大概都要等待網站的加載。其他這樣的網絡密集型(IO 密集型的一個子集)任務可能會在更多線程的情況下運行得更快。
Parallel.ForEach 和 PLINQ 之間還有一些其他的區別,在這里[4]進行了討論。但是對于 ETL 來說,除非在非常特殊的情況下,這些區別不太適用。例如,如果必須保留數據的順序,你應該使用 PLINQ,因為它提供了一種保留順序的方法。
示例程序
我們的 ELT 示例程序(倉庫地址[5])將做以下四件事件:
讀取 CSV 文件;
將這些 CSV 文件中的字段映射到 C# 對象;
對數據(對象列表)執行一些轉換操作;
將這些數據插入到數據庫中。
我將在接下來即將發布的第二篇文章中繼續介紹該示例。
文中鏈接:
[1].?https://dotnet.microsoft.com/apps/data/spark
[2].?https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism
[3].?https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq
[4].?https://devblogs.microsoft.com/pfxteam/when-to-use-parallel-foreach-and-when-to-use-plinq/
[5].?https://gitlab.com/jspinella/parallel-etl-examples
總結
以上是生活随笔為你收集整理的基于 C# 的 ETL 大数据并行编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用 .NET Core 中的 Even
- 下一篇: 一个小技巧助您减少if语句的状态判断