PySpark 数据分析基础:PySpark 原理详解
?一、基礎原理
我們知道 spark 是用 scala 開發(fā)的,而 scala 又是基于 Java 語言開發(fā)的,那么 spark 的底層架構就是 Java 語言開發(fā)的。如果要使用 python 來進行與 java 之間通信轉換,那必然需要通過 JVM 來轉換。我們先看原理構建圖:
從圖中我們發(fā)現(xiàn)在 python 環(huán)境中我們編寫的程序將以 SparkContext 的形式存在,Pythpn 通過于 Py4j 建立 Socket 通信,通過 Py4j 實現(xiàn)在 Python 中調(diào)用 Java 的方法,將我們編寫成 python 的 SpakrContext 對象通過 Py4j,最終在 JVM Driver 中實例化為 Scala 的 SparkContext。
那么我們再從 Spark 集群運行機制來看:
主節(jié)點運行 Spark 任務是通過 SparkContext 傳遞任務分發(fā)到各個從節(jié)點,標橙色的方框就為 JVM。通過 JVM 中間語言與其他從節(jié)點的 JVM 進行通信。之后 Executor 通信結束之后下發(fā) Task 進行執(zhí)行。
此時我們再把 python 在每個主從節(jié)點展示出來:
這樣就一目了然了:主節(jié)點的 Python 通過 Py4j 通信傳遞 SparkContext,最后在 JVM Driver 上面生成 SparkContxt。主節(jié)點 JVM Driver 與其他從節(jié)點的 JVM Executor 通信傳輸 SparkContext,JVM Executor 通過分解 SparkContext 為許多 Task,給 pyspark.daemon 調(diào)用 pyspark.work 從 socket 中讀取要執(zhí)行的 python 函數(shù)和數(shù)據(jù),開始真正的數(shù)據(jù)處理邏輯。數(shù)據(jù)處理完成之后將處理結果寫回 socket,jvm 中通過 PythonRDD 的 read 方法讀取,并返回結果。最終 executor 將 PythonRDD 的執(zhí)行結果上報到 drive 上,返回給用戶。
完整了解 PySpark 在集群上運行的原理之后,再看上圖就很容易理解了。
Executor 端運行的 Task 邏輯是由 Driver 發(fā)過來的,那是序列化后的字節(jié)碼,雖然里面可能包含有用戶定義的 Python 函數(shù)或 Lambda 表達式,Py4j 并不能實現(xiàn)在 Java 里調(diào)用 Python 的方法,為了能在 Executor 端運行用戶定義的 Python 函數(shù)或 Lambda 表達式,則需要為每個 Task 單獨啟一個 Python 進程,通過 socket 通信方式將 Python 函數(shù)或 Lambda 表達式發(fā)給 Python 進程執(zhí)行。
二、程序運行原理
1.主節(jié)點 JVM 運行過程
當我們提交 pyspark 的任務時,會先上傳 python 腳本以及依賴并申請資源,申請到資源后會通過 PythonRunner 拉起 JVM。
首先 PythonRunner 開啟 Pyj4 GatewayServer,通過 Java Process 方式運行用戶上傳的 Python 腳本。
? 用戶 Python 腳本起來后,首先會實例化 Python 版的 SparkContext 對象,并且實例化 Py4j GatewayClient,連接 JVM 中的 Py4j GatewayServer,后續(xù)在 Python 中調(diào)用 Java 的方法都是借助這個 Py4j Gateway。然后通過 Py4j Gateway 在 JVM 中實例化 SparkContext 對象。
? 過上面兩步后,SparkContext 對象初始化完畢,與其他從節(jié)點通信。開始申請 Executor 資源,同時開始調(diào)度任務。用戶 Python 腳本中定義的一系列處理邏輯最終遇到 action 方法后會觸發(fā) Job 的提交,提交 Job 時是直接通過 Py4j 調(diào)用 Java 的 PythonRDD.runJob 方法完成,映射到 JVM 中,會轉給 sparkContext.runJob 方法,Job 運行完成后,JVM 中會開啟一個本地 Socket 等待 Python 進程拉取,對應地,Python 進程在調(diào)用 PythonRDD.runJob 后就會通過 Socket 去拉取結果。
2.從節(jié)點 JVM 運行過程
當 Driver 得到 Executor 資源時,通過 CoarseGrainedExecutorBackend(其中有 main 方法)通信 JVM,啟動一些必要的服務后等待 Driver 的 Task 下發(fā),在還沒有 Task 下發(fā)過來時,Executor 端是沒有 Python 進程的。當收到 Driver 下發(fā)過來的 Task 后,Executor 的內(nèi)部運行過程如下圖所示。
Executor 端收到 Task 后,會通過 launchTask 運行 Task,最后會調(diào)用到 PythonRDD 的 compute 方法,來處理一個分區(qū)的數(shù)據(jù),PythonRDD 的 compute 方法的計算流程大致分三步走:
-
如果不存在 pyspark.deamon 后臺 Python 進程,那么通過 Java Process 的方式啟動 pyspark.deamon 后臺進程,注意每個 Executor 上只會有一個 pyspark.deamon 后臺進程,否則,直接通過 Socket 連接 pyspark.deamon,請求開啟一個 pyspark.worker 進程運行用戶定義的
-
Python 函數(shù)或 Lambda 表達式。pyspark.deamon 是一個典型的多進程服務器,來一個 Socket 請求,fork 一個 pyspark.worker 進程處理,一個 Executor 上同時運行多少個 Task,就會有多少個對應的 pyspark.worker 進程。
-
緊接著會單獨開一個線程,給 pyspark.worker 進程輸入數(shù)據(jù),pyspark.worker 則會調(diào)用用戶定義的 Python 函數(shù)或 Lambda 表達式處理計算。在一邊輸入數(shù)據(jù)的過程中,另一邊則通過 Socket 去拉取 pyspark.worker 的計算結果。
把前面運行時架構圖中 Executor 部分單獨拉出來,如下圖所示,橙色部分為 JVM 進程,白色部分為 Python 進程,每個 Executor 上有一個公共的 pyspark.deamon 進程,負責接收 Task 請求,并 fork pyspark.worker 進程單獨處理每個 Task,實際數(shù)據(jù)處理過程中,pyspark.worker 進程和 JVM Task 會較頻繁地進行本地 Socket 數(shù)據(jù)通信。
三、總結
總體而言,PySpark 是借助 Py4j 實現(xiàn) Python 調(diào)用 Java,來驅動 Spark 應用程序,本質上主要還是 JVM runtime,Java 到 Python 的結果返回是通過本地 Socket 完成。雖然這種架構保證了 Spark 核心代碼的獨立性,但是在大數(shù)據(jù)場景下,JVM 和 Python 進程間頻繁的數(shù)據(jù)通信導致其性能損耗較多,惡劣時還可能會直接卡死,所以建議對于大規(guī)模機器學習或者 Streaming 應用場景還是慎用 PySpark,盡量使用原生的 Scala/Java 編寫應用程序,對于中小規(guī)模數(shù)據(jù)量下的簡單離線任務,可以使用 PySpark 快速部署提交
總結
以上是生活随笔為你收集整理的PySpark 数据分析基础:PySpark 原理详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bat自动清理(girl的电脑桌面)
- 下一篇: WorkNC刀柄轮廓导入方法