Spark整合Ray思路漫谈
什么是Ray
之前花了大概兩到三天把Ray相關(guān)的論文,官網(wǎng)文檔看了一遍,同時(shí)特意去找了一些中文資料看Ray當(dāng)前在國(guó)內(nèi)的發(fā)展情況(以及目前國(guó)內(nèi)大部分人對(duì)Ray的認(rèn)知程度)。
先來(lái)簡(jiǎn)單介紹下我對(duì)Ray的認(rèn)知。
首先基因很重要,所以我們先需要探查下Ray最初是為了解決什么問題而產(chǎn)生的。Ray的論文顯示,它最早是為了解決增強(qiáng)學(xué)習(xí)的挑戰(zhàn)而設(shè)計(jì)的。增強(qiáng)學(xué)習(xí)的難點(diǎn)在于它是一個(gè)需要邊學(xué)習(xí),邊做實(shí)時(shí)做預(yù)測(cè)的應(yīng)用場(chǎng)景,這意味會(huì)有不同類型的tasks同時(shí)運(yùn)行,并且他們之間存在復(fù)雜的依賴關(guān)系,tasks會(huì)在運(yùn)行時(shí)動(dòng)態(tài)產(chǎn)生產(chǎn)生新的tasks,現(xiàn)有的一些計(jì)算模型肯定是沒辦法解決的。如果Ray只是為了解決RL事情可能沒有那么復(fù)雜,但是作者希望它不僅僅能跑增強(qiáng)學(xué)習(xí)相關(guān)的,希望是一個(gè)通用的分布式機(jī)器學(xué)習(xí)框架,這就意味著Ray必然要進(jìn)行分層抽象了,也就是至少要分成系統(tǒng)層和應(yīng)用層。
系統(tǒng)層面,既然是分布式的應(yīng)用,那么肯定需要有一個(gè)應(yīng)用內(nèi)的resource/task調(diào)度和管理。首先是Yarn,K8s等資源調(diào)度框架是應(yīng)用程序級(jí)別的的調(diào)度,Ray作為一個(gè)為了解決具體業(yè)務(wù)問題的應(yīng)用,應(yīng)該要跑在他們上面而不是取代他們,而像Spark/Flink雖然也是基于task級(jí)別的資源調(diào)度框架,但是因?yàn)樗麄冊(cè)谠O(shè)計(jì)的時(shí)候是為了解決一個(gè)比較具體的抽象問題,所以系統(tǒng)對(duì)task/資源都做了比較高的封裝,一般用戶是面向業(yè)務(wù)編程,很難直接操控task以及對(duì)應(yīng)的resource。我們以Spark為例,用戶定義好了數(shù)據(jù)處理邏輯,至于如何將這些邏輯分成多少個(gè)Job,Stage,Task,最后占用多少Resource (CPU,GPU,Memory,Disk)等等,都是由框架自行決定,而用戶無(wú)法染指。這也是我一直詬病Spark的地方。所以Ray在系統(tǒng)層面,是一個(gè)通用的以task為調(diào)度級(jí)別的,同時(shí)可以針對(duì)每個(gè)task控制資源粒度的一個(gè)通用的分布式task執(zhí)行系統(tǒng)。記住,在Ray里,你需要明確定義Task以及Task的依賴,并且為每個(gè)task指定合適(數(shù)量,資源類型)的資源。比如你需要用三個(gè)task處理一份數(shù)據(jù),那么你就需要自己?jiǎn)?dòng)三個(gè)task,并且指定這些task需要的資源(GPU,CPU)以及數(shù)量(可以是小數(shù)或者整數(shù))。而在Spark,Flink里這是不大可能的。Ray為了讓我們做這些事情,默認(rèn)提供了Python的語(yǔ)言接口,你可以像使用Numpy那樣去使用Ray。實(shí)際上,也已經(jīng)有基于Ray做Backend的numpy實(shí)現(xiàn)了,當(dāng)然它屬于應(yīng)用層面的東西了。Ray系統(tǒng)層面很簡(jiǎn)單,也是典型的master-worker模式。類似spark的driver-executor模式,不同的是,Ray的worker類似yarn的worker,是負(fù)責(zé)Resource管理的,具體任務(wù)它會(huì)啟動(dòng)Python worker去執(zhí)行你的代碼,而spark的executor雖然也會(huì)啟動(dòng)Python worker執(zhí)行python代碼,但是對(duì)應(yīng)的executor也執(zhí)行業(yè)務(wù)邏輯,和python worker有數(shù)據(jù)交換和傳輸。
應(yīng)用層面,你可以基于Ray的系統(tǒng)進(jìn)行編程,因?yàn)镽ay默認(rèn)提供了Python的編程接口,所以你可以自己實(shí)現(xiàn)增強(qiáng)學(xué)習(xí)庫(kù)(RLLib),也可以整合已有的算法框架,比如tensorflow,讓tensorflow成為Ray上的一個(gè)應(yīng)用,并且輕松實(shí)現(xiàn)分布式。我記得知乎上有人說(shuō)Ray其實(shí)就是一個(gè)Python的分布式RPC框架,這么說(shuō)是對(duì)的,但是顯然會(huì)有誤導(dǎo),因?yàn)檫@很可能讓人以為他只是“Python分布式RPC框架”。
如何和Spark協(xié)作
根據(jù)前面我講述的,我們是可以完全基于Ray實(shí)現(xiàn)Spark的大部分API的,只是是Ray backend而非Spark core backend。實(shí)際上Ray目前正在做流相關(guān)的功能,他們現(xiàn)在要做的就是要兼容Flink的API。雖然官方宣稱Ray是一個(gè)新一代的機(jī)器學(xué)習(xí)分布式框架,但是他完全可以cover住當(dāng)前大數(shù)據(jù)和AI領(lǐng)域的大部分事情,但是任重道遠(yuǎn),還需要大量的事情。所以對(duì)我而言,我看中的是它良好的Python支持,以及系統(tǒng)層面對(duì)資源和task的控制,這使得:
1.我們可以輕易的把我們的單機(jī)Python算法庫(kù)在Ray里跑起來(lái)(雖然算法自身不是分布式的),但是我們可以很好的利用Ray的資源管理和調(diào)度功能,從而解決AI平臺(tái)的資源管理問題。
2.Ray官方提供了大量的機(jī)器學(xué)習(xí)算法的實(shí)現(xiàn),以及對(duì)當(dāng)前機(jī)器學(xué)習(xí)框架如Tensorflow,Pytorch的整合,而分布式能力則比這些庫(kù)原生提供的模式更靠譜和易用。畢竟對(duì)于這些框架而言,支持他們分布式運(yùn)行的那些輔助庫(kù)(比如TensorFlow提供parameter servers)相當(dāng)簡(jiǎn)陋。
但是,我們知道,數(shù)據(jù)處理它自身有一個(gè)很大的生態(tài),比如你的用戶畫像數(shù)據(jù)都在數(shù)據(jù)湖里,你需要把這些數(shù)據(jù)進(jìn)行非常復(fù)雜的計(jì)算才能作為特征喂給你的機(jī)器學(xué)習(xí)算法。而如果這個(gè)時(shí)候,你還要面向資源編程(或者使用一個(gè)還不夠成熟的上層應(yīng)用)而不是面向“業(yè)務(wù)”編程,這就顯得很難受了,比如我就想用SQL處理數(shù)據(jù),我只關(guān)注處理的業(yè)務(wù)邏輯,這個(gè)當(dāng)前Ray以及之上的應(yīng)用顯然還是做不到如Spark那么便利的(畢竟Spark就是為了數(shù)據(jù)處理而生的),所以最好的方式是,數(shù)據(jù)的獲取和加工依然是在Spark之上,但是數(shù)據(jù)準(zhǔn)備好了就應(yīng)該丟給用戶基于Ray寫的代碼處理了。Ray可以通過Arrow項(xiàng)目讀取HDFS上Spark已經(jīng)處理好的數(shù)據(jù),然后進(jìn)行訓(xùn)練,然后將模型保存為HDFS。當(dāng)然對(duì)于預(yù)測(cè),Ray可以自己消化掉或者丟給其他系統(tǒng)完成。我們知道Spark 在整合Python生態(tài)方面做出了非常多的努力,比如他和Ray一樣,也提供了python 編程接口,所以spark也較為容易的整合譬如Tensorflow等框架,但是沒辦法很好的管控資源(比如GPU),而且,spark 的executor 會(huì)在他所在的服務(wù)器上啟動(dòng)python worker,而spark一般而言是跑在yarn上的,這就對(duì)yarn造成了很大的管理麻煩,而且通常yarn 和hdfs之類的都是在一起的,python環(huán)境還有資源(CPU/GPU)除了管理難度大以外,還有一個(gè)很大的問題是可能會(huì)對(duì)yarn的集群造成比較大的穩(wěn)定性風(fēng)險(xiǎn)。
所以最好的模式是按如下步驟開發(fā)一個(gè)機(jī)器學(xué)習(xí)應(yīng)用:
寫一個(gè)python腳本, 在數(shù)據(jù)處理部分,使用pyspark, 在程序的算法訓(xùn)練部分,使用ray, spark 運(yùn)行在yarn(k8s)上, ray運(yùn)行在k8s里好處顯而易見:用戶完全無(wú)感知他的應(yīng)用其實(shí)是跑在兩個(gè)集群里的,對(duì)他來(lái)說(shuō)就是一個(gè)普通python腳本。
從架構(gòu)角度來(lái)講,復(fù)雜的python環(huán)境管理問題都可以丟給ray集群來(lái)完成,spark只要能跑基本的pyspark相關(guān)功能即可,數(shù)據(jù)銜接通過數(shù)據(jù)湖里的表(其實(shí)就是一堆parquet文件)即可。當(dāng)然,如果最后結(jié)果數(shù)據(jù)不大,也可以直接通過client完成pyspark到ray的交互。
Spark和Ray的架構(gòu)和部署
現(xiàn)在我們來(lái)思考一個(gè)比較好的部署模式,架構(gòu)圖大概類似這樣:
首先,大家可以理解為k8s已經(jīng)解決一切了,我們spark,ray都跑在K8s上。但是,如果我們希望一個(gè)spark 是實(shí)例多進(jìn)程跑的時(shí)候,我們并不希望是像傳統(tǒng)的那種方式,所有的節(jié)點(diǎn)都跑在K8s上,而是將executor部分放到y(tǒng)arn cluster. 在我們的架構(gòu)里,spark driver 是一個(gè)應(yīng)用,我們可以啟動(dòng)多個(gè)pod從而獲得多個(gè)spark driver實(shí)例,對(duì)外提供負(fù)載均衡,roll upgrade/restart 等功能。也就是k8s應(yīng)該是面向應(yīng)用的。但是復(fù)雜的計(jì)算,我們依然希望留給Yarn,尤其是還涉及到數(shù)據(jù)本地性,計(jì)算和存儲(chǔ)放到一起(yarn和HDFS通常是在一起的),避免k8s和HDFS有大量數(shù)據(jù)交換。
因?yàn)閅arn對(duì)Java/Scala友好,但是對(duì)Python并不友好,尤其是在yarn里涉及到Python環(huán)境問題會(huì)非常難搞(主要是Yarn對(duì)docker的支持還是不夠優(yōu)秀,對(duì)GPU支持也不好),而機(jī)器學(xué)習(xí)其實(shí)一定重度依賴Python以及非常復(fù)雜的本地庫(kù)以及Python環(huán)境,并且對(duì)資源調(diào)度也有比較高的依賴,因?yàn)樗惴ㄊ呛芟臋C(jī)器資源的,必須也有資源池,所以我們希望機(jī)器學(xué)習(xí)部分能跑在K8s里。但是我們希望整個(gè)數(shù)據(jù)處理和訓(xùn)練過程是一體的,算法的同學(xué)應(yīng)該無(wú)法感知到k8s/yarn的區(qū)別。為了達(dá)到這個(gè)目標(biāo),用戶依然使用pyspark來(lái)完成計(jì)算,然后在pyspark里使用ray的API做模型訓(xùn)練和預(yù)測(cè),數(shù)據(jù)處理部分自動(dòng)在yarn中完成,而模型訓(xùn)練部分則自動(dòng)被分發(fā)到k8s中完成。并且因?yàn)閞ay自身的優(yōu)勢(shì),算法可以很好的控制自己需要的資源,比如這次訓(xùn)練需要多少GPU/CPU/內(nèi)存,支持所有的算法庫(kù),在做到對(duì)算法最少干擾的情況下,算法的同學(xué)們有最好的資源調(diào)度可以用。
下面展示一段MLSQL代碼片段展示如何利用上面的架構(gòu):
-- python 訓(xùn)練模型的代碼 set py_train=''' import ray ray.init() @ray.remote(num_cpus=2, num_gpus=1) def f(x):return x * x futures = [f.remote(i) for i in range(4)] print(ray.get(futures)) '''; load script.`py_train` as py_train;-- 設(shè)置需要的python環(huán)境描述 set py_env=''' '''; load script.`py_env` as py_env;-- 加載hive的表 load hive.`db1.table1` as table1;-- 對(duì)Hive做處理,比如做一些特征工程 select features,label from table1 as data;-- 提交Python代碼到Ray里,此時(shí)是運(yùn)行在k8s里的 train data as PythonAlg.`/tmp/tf/model` where scripts="py_train" and entryPoint="py_train" and condaFile="py_env" and keepVersion="true" and fitParam.0.fileFormat="json" -- 還可以是parquet and `fitParam.0.psNum`="1";下面是PySpark的示例代碼:
from pyspark.ml.linalg import Vectors, SparseVector from pyspark.sql import SparkSession import logging import rayfrom pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteType from sklearn.naive_bayes import GaussianNB import os from sklearn.externals import joblib import pickle import scipy.sparse as sp from sklearn.svm import SVC import io import codecsos.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3" logger = logging.getLogger(__name__)base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest" spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()data = spark.read.format("libsvm").load(base_dir + "/data/mllib/sample_libsvm_data.txt")## 廣播數(shù)據(jù) dataBr = spark.sparkContext.broadcast(data.collect())## 訓(xùn)練模型 這部分代碼會(huì)在spark executor里的python worker執(zhí)行 def train(row):import rayray.init()train_data_id = ray.put(dataBr.value)## 這個(gè)函數(shù)的python代碼會(huì)在K8s里的Ray里執(zhí)行@ray.remotedef ray_train(x):X = []y = []for i in ray.get(train_data_id):X.append(i["features"])y.append(i["label"])if row["model"] == "SVC":gnb = GaussianNB()model = gnb.fit(X, y)# 為什么還需要encode一下?pickled = codecs.encode(pickle.dumps(model), "base64").decode()return [row["model"], pickled]if row["model"] == "BAYES":svc = SVC()model = svc.fit(X, y)pickled = codecs.encode(pickle.dumps(model), "base64").decode()return [row["model"], pickled]result = ray_train.remote(row)ray.get(result)##訓(xùn)練模型 將模型結(jié)果保存到HDFS上 rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train) spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),StructField(name="modelBinary", dataType=StringType())])).write. \format("parquet"). \mode("overwrite").save("/tmp/wow")這是一個(gè)標(biāo)準(zhǔn)的Python程序,只是使用了pyspark/ray的API,我們就完成了上面所有的工作,同時(shí)訓(xùn)練兩個(gè)模型,并且數(shù)據(jù)處理的工作在spark中,模型訓(xùn)練的在ray中。
完美結(jié)合!最重要的是解決了資源管理的問題!
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載
總結(jié)
以上是生活随笔為你收集整理的Spark整合Ray思路漫谈的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenYurt开箱测评|一键让原生K8
- 下一篇: 社区首款 OAM 可视化平台发布!