SparkR:数据科学家的新利器
from:http://www.csdn.net/article/2015-10-23/2826010
摘要:R是數(shù)據(jù)科學(xué)家中最流行的編程語言和環(huán)境之一,在Spark中加入對R的支持是社區(qū)中較受關(guān)注的話題。作為增強Spark對數(shù)據(jù)科學(xué)家群體吸引力的最新舉措,最近發(fā)布的Spark?1.4版本在現(xiàn)有的Scala/Java/Python?API之外增加了R?API(SparkR)。SparkR使得熟悉R的用戶可以在Spark的分布式計算平臺基礎(chǔ)上結(jié)合R本身強大的統(tǒng)計分析功能和豐富的第三方擴展包,對大規(guī)模數(shù)據(jù)集進行分析和處理。本文將回顧SparkR項目的背景,對其當前的特性作總體的概覽,闡述其架構(gòu)和若干技術(shù)關(guān)鍵點,最后進行展望和總結(jié)。
項目背景
R是非常流行的數(shù)據(jù)統(tǒng)計分析和制圖的語言及環(huán)境,有一項調(diào)查顯示,R語言在數(shù)據(jù)科學(xué)家中使用的程度僅次于SQL。但目前R語言的核心運行環(huán)境是單線程的,能處理的數(shù)據(jù)量受限于單機的內(nèi)存容量,大數(shù)據(jù)時代的海量數(shù)據(jù)處理對R構(gòu)成了挑戰(zhàn)。
為了解決R的可伸縮性問題,R社區(qū)已經(jīng)有一些方案,比如parallel和snow包,可以在計算機集群上并行運行R代碼。但它們的缺陷在于沒有解決數(shù)據(jù)分布式存儲,數(shù)據(jù)仍然需要在主節(jié)點集中表示,分片后再傳輸給工作節(jié)點,不適用于大數(shù)據(jù)處理的場景。另外,數(shù)據(jù)處理模型過于簡單,即數(shù)據(jù)分片在工作節(jié)點處理后,結(jié)果收集回主節(jié)點,缺少一個象MapReduce那樣通用的分布式數(shù)據(jù)編程模型。
Hadoop是流行的大數(shù)據(jù)處理平臺,它的HDFS分布式文件系統(tǒng)和之上的MapReduce編程模型比較好地解決了大數(shù)據(jù)分布式存儲和處理的問題。RHadoop項目的出現(xiàn)使得用戶具備了在R中使用Hadoop處理大數(shù)據(jù)的能力。
Apache頂級開源項目Spark是Hadoop之后備受關(guān)注的新一代分布式計算平臺。和Hadoop相比,Spark提供了分布式數(shù)據(jù)集的抽象,編程模型更靈活和高效,能夠充分利用內(nèi)存來提升性能。為了方便數(shù)據(jù)科學(xué)家使用Spark進行數(shù)據(jù)挖掘,社區(qū)持續(xù)往Spark中加入吸引數(shù)據(jù)科學(xué)家的各種特性,例如0.7.0版本中加入的python?API?(PySpark);1.3版本中加入的DataFrame等。
R和Spark的強強結(jié)合應(yīng)運而生。2013年9月SparkR作為一個獨立項目啟動于加州大學(xué)伯克利分校的大名鼎鼎的AMPLAB實驗室,與Spark源出同門。2014年1月,SparkR項目在github上開源(https://github.com/amplab-extras/SparkR-pkg)。隨后,來自工業(yè)界的Alteryx、Databricks、Intel等公司和來自學(xué)術(shù)界的普渡大學(xué),以及其它開發(fā)者積極參與到開發(fā)中來,最終在2015年4月成功地合并進Spark代碼庫的主干分支,并在Spark?1.4版本中作為重要的新特性之一正式宣布。
當前特性
SparkR往Spark中增加了R語言API和運行時支持。Spark的?API由Spark?Core的API以及各個內(nèi)置的高層組件(Spark?Streaming,Spark?SQL,ML?Pipelines和MLlib,Graphx)的API組成,目前SparkR只提供了Spark的兩組API的R語言封裝,即Spark?Core的RDD?API和Spark?SQL的DataFrame?API。
需要指出的是,在Spark?1.4版本中,SparkR的RDD?API被隱藏起來沒有開放,主要是出于兩點考慮:
目前社區(qū)正在討論是否開放RDD?API的部分子集,以及如何在RDD?API的基礎(chǔ)上構(gòu)建一個更符合R用戶習(xí)慣的高層API。
RDD?API
用戶使用SparkR?RDD?API在R中創(chuàng)建RDD,并在RDD上執(zhí)行各種操作。
目前SparkR?RDD實現(xiàn)了Scala?RDD?API中的大部分方法,可以滿足大多數(shù)情況下的使用需求:
SparkR支持的創(chuàng)建RDD的方式有:
- 從R?list或vector創(chuàng)建RDD(parallelize())
- 從文本文件創(chuàng)建RDD(textFile())
- 從object文件載入RDD(objectFile())
SparkR支持的RDD的操作有:
- 數(shù)據(jù)緩存,持久化控制:cache(),persist(),unpersist()
- 數(shù)據(jù)保存:saveAsTextFile(),saveAsObjectFile()
- 常用的數(shù)據(jù)轉(zhuǎn)換操作,如map(),flatMap(),mapPartitions()等
- 數(shù)據(jù)分組、聚合操作,如partitionBy(),groupByKey(),reduceByKey()等
- RDD間join操作,如join(),?fullOuterJoin(),?leftOuterJoin()等
- 排序操作,如sortBy(),?sortByKey(),?top()等
- Zip操作,如zip(),?zipWithIndex(),?zipWithUniqueId()
- 重分區(qū)操作,如coalesce(),?repartition()
- 其它雜項方法
和Scala?RDD?API相比,SparkR?RDD?API有一些適合R的特點:
- SparkR?RDD中存儲的元素是R的數(shù)據(jù)類型。
- SparkR?RDD?transformation操作應(yīng)用的是R函數(shù)。
- RDD是一組分布式存儲的元素,而R是用list來表示一組元素的有序集合,因此SparkR將RDD整體上視為一個分布式的list。Scala?API?中RDD的每個分區(qū)的數(shù)據(jù)由iterator來表示和訪問,而在SparkR?RDD中,每個分區(qū)的數(shù)據(jù)用一個list來表示,應(yīng)用到分區(qū)的轉(zhuǎn)換操作,如mapPartitions(),接收到的分區(qū)數(shù)據(jù)是一個list而不是iterator。
- 為了符合R用戶經(jīng)常使用lapply()對一個list中的每一個元素應(yīng)用某個指定的函數(shù)的習(xí)慣,SparkR在RDD類上提供了SparkR專有的transformation方法:lapply()、lapplyPartition()、lapplyPartitionsWithIndex(),分別對應(yīng)于Scala?API的map()、mapPartitions()、mapPartitionsWithIndex()。
DataFrame?API
Spark?1.3版本引入了DataFrame?API。相較于RDD?API,DataFrame?API更受社區(qū)的推崇,這是因為:
Spark的DataFrame?API是從R的?Data?Frame數(shù)據(jù)類型和Python的pandas庫借鑒而來,因而對于R用戶而言,SparkR的DataFrame?API是很自然的。更重要的是,SparkR?DataFrame?API性能和Scala?DataFrame?API幾乎相同,所以推薦盡量用SparkR?DataFrame來編程。
目前SparkR的DataFrame?API已經(jīng)比較完善,支持的創(chuàng)建DataFrame的方式有:
- 從R原生data.frame和list創(chuàng)建
- 從SparkR?RDD創(chuàng)建
- 從特定的數(shù)據(jù)源(JSON和Parquet格式的文件)創(chuàng)建
- 從通用的數(shù)據(jù)源創(chuàng)建
- 將指定位置的數(shù)據(jù)源保存為外部SQL表,并返回相應(yīng)的DataFrame
- 從Spark?SQL表創(chuàng)建
- 從一個SQL查詢的結(jié)果創(chuàng)建
支持的主要的DataFrame操作有:
·數(shù)據(jù)緩存,持久化控制:cache(),persist(),unpersist()
- 數(shù)據(jù)保存:saveAsParquetFile(),?saveDF()?(將DataFrame的內(nèi)容保存到一個數(shù)據(jù)源),saveAsTable()?(將DataFrame的內(nèi)容保存存為數(shù)據(jù)源的一張表)
- 集合運算:unionAll(),intersect(),?except()
- Join操作:join(),支持inner、full?outer、left/right?outer和semi?join。
- 數(shù)據(jù)過濾:filter(),?where()
- 排序:sortDF(),?orderBy()
- 列操作:增加列-?withColumn(),列名更改-?withColumnRenamed(),選擇若干列?-select()、selectExpr()。為了更符合R用戶的習(xí)慣,SparkR還支持用$、[]、[[]]操作符選擇列,可以用$<列名>?<-?的語法來增加、修改和刪除列
- RDD?map類操作:lapply()/map(),flatMap(),lapplyPartition()/mapPartitions(),foreach(),foreachPartition()
- 數(shù)據(jù)聚合:groupBy(),agg()
- 轉(zhuǎn)換為RDD:toRDD(),toJSON()
- 轉(zhuǎn)換為表:registerTempTable(),insertInto()
- 取部分數(shù)據(jù):limit(),take(),first(),head()
編程示例
總體上看,SparkR程序和Spark程序結(jié)構(gòu)很相似。
基于RDD?API的示例
要基于RDD?API編寫SparkR程序,首先調(diào)用sparkR.init()函數(shù)來創(chuàng)建SparkContext。然后用SparkContext作為參數(shù),調(diào)用parallelize()或者textFile()來創(chuàng)建RDD。有了RDD對象之后,就可以對它們進行各種transformation和action操作。下面的代碼是用SparkR編寫的Word?Count示例:
library(SparkR) #初始化SparkContext sc <- sparkR.init("local", "RWordCount") #從HDFS上的一個文本文件創(chuàng)建RDD lines <- textFile(sc, "hdfs://localhost:9000/my_text_file") #調(diào)用RDD的transformation和action方法來計算word count #transformation用的函數(shù)是R代碼 words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] }) wordCount <- lapply(words, function(word) { list(word, 1L) }) counts <- reduceByKey(wordCount, "+", 2L) output <- collect(counts)基于DataFrame API的示例
基于DataFrame?API的SparkR程序首先創(chuàng)建SparkContext,然后創(chuàng)建SQLContext,用SQLContext來創(chuàng)建DataFrame,再操作DataFrame里的數(shù)據(jù)。下面是用SparkR?DataFrame?API計算平均年齡的示例:
library(SparkR) #初始化SparkContext和SQLContext sc <- sparkR.init("local", "AverageAge") sqlCtx <- sparkRSQL.init(sc) #從當前目錄的一個JSON文件創(chuàng)建DataFrame df <- jsonFile(sqlCtx, "person.json") #調(diào)用DataFrame的操作來計算平均年齡 df2 <- agg(df, age="avg") averageAge <- collect(df2)[1, 1]對于上面兩個示例要注意的一點是SparkR?RDD和DataFrame?API的調(diào)用形式和Java/Scala?API有些不同。假設(shè)rdd為一個RDD對象,在Java/Scala?API中,調(diào)用rdd的map()方法的形式為:rdd.map(…),而在SparkR中,調(diào)用的形式為:map(rdd,?…)。這是因為SparkR使用了R的S4對象系統(tǒng)來實現(xiàn)RDD和DataFrame類。
架構(gòu)
SparkR主要由兩部分組成:SparkR包和JVM后端。SparkR包是一個R擴展包,安裝到R中之后,在R的運行時環(huán)境里提供了RDD和DataFrame?API。
圖1 ?SparkR軟件棧
SparkR的整體架構(gòu)如圖2所示。
圖2 SparkR架構(gòu)
R?JVM后端
SparkR?API運行在R解釋器中,而Spark?Core運行在JVM中,因此必須有一種機制能讓SparkR?API調(diào)用Spark?Core的服務(wù)。R?JVM后端是Spark?Core中的一個組件,提供了R解釋器和JVM虛擬機之間的橋接功能,能夠讓R代碼創(chuàng)建Java類的實例、調(diào)用Java對象的實例方法或者Java類的靜態(tài)方法。JVM后端基于Netty實現(xiàn),和R解釋器之間用TCP?socket連接,用自定義的簡單高效的二進制協(xié)議通信。
R?Worker
SparkR?RDD?API和Scala?RDD?API相比有兩大不同:SparkR?RDD是R對象的分布式數(shù)據(jù)集,SparkR?RDD?transformation操作應(yīng)用的是R函數(shù)。SparkR?RDD?API的執(zhí)行依賴于Spark?Core但運行在JVM上的Spark?Core既無法識別R對象的類型和格式,又不能執(zhí)行R的函數(shù),因此如何在Spark的分布式計算核心的基礎(chǔ)上實現(xiàn)SparkR?RDD?API是SparkR架構(gòu)設(shè)計的關(guān)鍵。
SparkR設(shè)計了Scala?RRDD類,除了從數(shù)據(jù)源創(chuàng)建的SparkR?RDD外,每個SparkR?RDD對象概念上在JVM端有一個對應(yīng)的RRDD對象。RRDD派生自RDD類,改寫了RDD的compute()方法,在執(zhí)行時會啟動一個R?worker進程,通過socket連接將父RDD的分區(qū)數(shù)據(jù)、序列化后的R函數(shù)以及其它信息傳給R?worker進程。R?worker進程反序列化接收到的分區(qū)數(shù)據(jù)和R函數(shù),將R函數(shù)應(yīng)到到分區(qū)數(shù)據(jù)上,再把結(jié)果數(shù)據(jù)序列化成字節(jié)數(shù)組傳回JVM端。
從這里可以看出,與Scala?RDD?API相比,SparkR?RDD?API的實現(xiàn)多了幾項開銷:啟動R?worker進程,將分區(qū)數(shù)據(jù)傳給R?worker和R?worker將結(jié)果返回,分區(qū)數(shù)據(jù)的序列化和反序列化。這也是SparkR?RDD?API相比Scala?RDD?API有較大性能差距的原因。
DataFrame?API的實現(xiàn)
由于SparkR?DataFrame?API不需要傳入R語言的函數(shù)(UDF()方法和RDD相關(guān)方法除外),而且DataFrame中的數(shù)據(jù)全部是以JVM的數(shù)據(jù)類型存儲,所以和SparkR?RDD?API的實現(xiàn)相比,SparkR?DataFrame?API的實現(xiàn)簡單很多。R端的DataFrame對象就是對應(yīng)的JVM端DataFrame對象的wrapper,一個DataFrame方法的實現(xiàn)基本上就是簡單地調(diào)用JVM端DataFrame的相應(yīng)方法。這種情況下,R?Worker就不需要了。這是使用SparkR?DataFrame?API能獲得和ScalaAPI近乎相同的性能的原因。
當然,DataFrame?API還包含了一些RDD?API,這些RDD?API方法的實現(xiàn)是先將DataFrame轉(zhuǎn)換成RDD,然后調(diào)用RDD?的相關(guān)方法。
展望
SparkR目前來說還不是非常成熟,一方面RDD?API在對復(fù)雜的R數(shù)據(jù)類型的支持、穩(wěn)定性和性能方面還有較大的提升空間,另一方面DataFrame?API在功能完備性上還有一些缺失,比如對用R代碼編寫UDF的支持、序列化/反序列化對嵌套類型的支持,這些問題相信會在后續(xù)的開發(fā)中得到改善和解決。如何讓DataFrame?API對熟悉R原生Data?Frame和流行的R?package如dplyr的用戶更友好是一個有意思的方向。此外,下一步的開發(fā)計劃包含幾個大的特性,比如普渡大學(xué)正在做的在SparkR中支持Spark?Streaming,還有Databricks正在做的在SparkR中支持ML?pipeline等。SparkR已經(jīng)成為Spark的一部分,相信社區(qū)中會有越來越多的人關(guān)注并使用SparkR,也會有更多的開發(fā)者參與對SparkR的貢獻,其功能和使用性將會越來越強。
總結(jié)
Spark將正式支持R?API對熟悉R語言的數(shù)據(jù)科學(xué)家是一個福音,他們可以在R中無縫地使用RDD和Data?Frame?API,借助Spark內(nèi)存計算、統(tǒng)一軟件棧上支持多種計算模型的優(yōu)勢,高效地進行分布式數(shù)據(jù)計算和分析,解決大規(guī)模數(shù)據(jù)集帶來的挑戰(zhàn)。工欲善其事,必先利其器,SparkR必將成為數(shù)據(jù)科學(xué)家在大數(shù)據(jù)時代的又一門新利器。
(責(zé)編/仲浩)
作者:孫銳,英特爾大數(shù)據(jù)團隊工程師,HIVE和Shark項目貢獻者,SparkR主力貢獻者之一。
總結(jié)
以上是生活随笔為你收集整理的SparkR:数据科学家的新利器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【MDCC 2015】开源选型之Andr
- 下一篇: s-plus