Spark安装与学习
?摘要:Spark是繼Hadoop之后的新一代大數(shù)據(jù)分布式處理框架,由UC Berkeley的Matei Zaharia主導(dǎo)開(kāi)發(fā)。我只能說(shuō)是神一樣的人物造就的神器,詳情請(qǐng)猛擊http://www.spark-project.org/
??????? Created 2012-05-09
??????? Modified 2012-08-13
1 Scala安裝
?????? 當(dāng)前,Spark最新版本是0.5,由于我寫這篇文檔時(shí),版本還是0.4,因此本文下面的所有描述基于0.4版本。
不過(guò)淘寶的達(dá)人已經(jīng)嘗試了0.5,并寫了相關(guān)安裝文檔在此http://rdc.taobao.com/team/jm/archives/tag/spark。
~~~~~~~~~~~~~~~以下開(kāi)始我的安裝文檔~~~~~~~~~~~~~~
??????? 我使用的Spark的版本是0.4,只存在于github上,該版本使用的Scala版本是0.9.1.final。所以先到http://www.scala-lang.org/node/165下載scala-2.9.1.final.tar.gz。解壓后放到本地 /opt 下面,在 /etc/profile 里添加
export SCALA_HOME=/opt/scala-2.9.1.final
export PATH=$SCALA_HOME/bin:$PATH
2 git安裝
由于下載Spark和編譯Spark需要git,因此先安裝git,安裝方法可以到Ubuntu軟件中心直接裝,也可以apt-get裝。裝好后需要到https://github.com 去注冊(cè)一個(gè)帳號(hào),我注冊(cè)的是JerryLead,注冊(cè)郵箱和密碼,然后根據(jù)網(wǎng)站上的get-start提示生成RSA密碼。
注意:如果本地之前存在rsa_id.pub,authorized_keys等,將其保存或著將原來(lái)的密碼生成為dsa形式,這樣git和原來(lái)的密碼都不沖突。
3 Spark安裝
首先下載最新的源代碼
| git clone git://github.com/mesos/spark.git |
得到目錄spark后,進(jìn)入spark目錄,進(jìn)入conf子目錄,將 spark-env.sh-template 重命名為spark-env.sh,并添加以下代碼行:
| export SCALA_HOME=/opt/scala-2.9.1.final |
回到spark目錄,開(kāi)始編譯,運(yùn)行
| $?sbt/sbt update compile |
這條命令會(huì)聯(lián)網(wǎng)下載很多jar,然后會(huì)對(duì)spark進(jìn)行編譯,編譯完成會(huì)提示success
| [success] Total time: 1228 s, completed May 9, 2012 3:42:11 PM |
可以通過(guò)運(yùn)行spark-shell來(lái)和spark進(jìn)行交互。
也可以先運(yùn)行測(cè)試用例./run <class> <params>
| ./run spark.examples.SparkLR local[2] |
在本地啟動(dòng)兩個(gè)線程運(yùn)行線性回歸。
| ./run spark.examples.SparkPi local |
在本地啟動(dòng)運(yùn)行Pi估計(jì)器。
更多的例子在examples/src/main/scala里面
3 Spark導(dǎo)出
在使用Spark之前,先將編譯好的classes導(dǎo)出為jar比較好,可以
| $ sbt/sbt assembly |
將Spark及其依賴包導(dǎo)出為jar,放在
| core/target/spark-core-assembly-0.4-SNAPSHOT.jar |
可以將該jar添加到CLASSPATH里,開(kāi)發(fā)Spark應(yīng)用了。
一般在開(kāi)發(fā)Spark應(yīng)用時(shí)需要導(dǎo)入Spark一些類和一些隱式的轉(zhuǎn)換,需要再程序開(kāi)頭加入
| import spark.SparkContext import SparkContext._ |
4 使用Spark交互模式
| 1. 運(yùn)行./spark-shell.sh 2. scala> val data = Array(1, 2, 3, 4, 5) //產(chǎn)生data data: Array[Int] = Array(1, 2, 3, 4, 5) 3. scala> val distData = sc.parallelize(data) //將data處理成RDD distData: spark.RDD[Int] =?spark.ParallelCollection@7a0ec850?(顯示出的類型為RDD) 4. scala> distData.reduce(_+_) //在RDD上進(jìn)行運(yùn)算,對(duì)data里面元素進(jìn)行加和 12/05/10 09:36:20 INFO spark.SparkContext: Starting job... 5. 最后運(yùn)行得到 12/05/10 09:36:20 INFO spark.SparkContext: Job finished in 0.076729174 s res2: Int = 15 |
5 使用Spark處理Hadoop Datasets
Spark可以從HDFS/local FS/Amazon S3/Hypertable/HBase等創(chuàng)建分布式數(shù)據(jù)集。Spark支持text files,SequenceFiles和其他Hadoop InputFormat。
比如從HDFS上讀取文本創(chuàng)建RDD
| scala> val distFile = sc.textFile("hdfs://m120:9000/user/LijieXu/Demo/file01.txt") 12/05/10 09:49:01 INFO mapred.FileInputFormat: Total input paths to process : 1 distFile: spark.RDD[String] = spark.MappedRDD@59bf8a16 |
然后可以統(tǒng)計(jì)該文本的字符數(shù),map負(fù)責(zé)處理文本每一行map(_size)得到每一行的字符數(shù),多行組成一個(gè)List,reduce負(fù)責(zé)將List中的所有元素相加。
| scala> distFile.map(_.size).reduce(_+_) 12/05/10 09:50:02 INFO spark.SparkContext: Job finished in 0.139610772 s res3: Int = 79 |
textFile可以通過(guò)設(shè)置第二個(gè)參數(shù)來(lái)指定slice個(gè)數(shù)(slice與Hadoop里的split/block概念對(duì)應(yīng),一個(gè)task處理一個(gè)slice)。Spark默認(rèn)將Hadoop上一個(gè)block對(duì)應(yīng)為一個(gè)slice,但可以調(diào)大slice的個(gè)數(shù),但不能比block的個(gè)數(shù)小,這就需要知道HDFS上一個(gè)文件的block數(shù)目,可以通過(guò)50070的dfs的jsp來(lái)查看。
對(duì)于SequenceFile,可以使用SparkContext的sequenceFile[K,V]方法生成RDD,其中K和V肯定要是SequenceFile存放時(shí)的類型了,也就是必須是Writable的子類。Spark也允許使用native types去讀取,如sequenceFile[Int, String]。
對(duì)于復(fù)雜的SequenceFile,可以使用SparkContext.hadoopRDD方法去讀取,該方法傳入JobConf參數(shù),包含InputFormat,key class,value class等,與Hadoop Java客戶端讀取方式一樣。
6 分布式數(shù)據(jù)集操作
分布式數(shù)據(jù)集支持兩種類型的操作:transformation和action。transformation的意思是從老數(shù)據(jù)集中生成新的數(shù)據(jù)集,action是在數(shù)據(jù)集上進(jìn)行計(jì)算并將結(jié)果返回給driver program。每一個(gè)Spark應(yīng)用包含一個(gè)driver program用來(lái)執(zhí)行用戶的main函數(shù),比如,map就是一個(gè)transformation,將大數(shù)據(jù)集劃分處理為小數(shù)據(jù)集,reduce是action,將數(shù)據(jù)集上內(nèi)容進(jìn)行聚合并返回給driver program。有個(gè)例外是reduceByKey應(yīng)該屬于transformation,返回的是分布式數(shù)據(jù)集。
需要注意的是,Spark的transformation是lazy的,transformation先將操作記錄下來(lái),直到接下來(lái)的action需要將處理結(jié)果返回給driver program的時(shí)候。
另一個(gè)特性是caching,如果用戶指定cache一個(gè)數(shù)據(jù)集RDD,那么該數(shù)據(jù)集中的不同slice會(huì)按照partition被存放到相應(yīng)不同節(jié)點(diǎn)的內(nèi)存中,這樣重用該數(shù)據(jù)集的時(shí)候,效率會(huì)高很多,尤其適用于迭代型和交互式的應(yīng)用。如果cache的RDD丟失,那么重新使用transformation生成。
7 共享變量
與Hadoop的MapReduce不同的是,Spark允許共享變量,但只允許兩種受限的變量:broadcast和accumulators。
Broadcast顧名思義是“廣播”,在每個(gè)節(jié)點(diǎn)上保持一份read-only的變量。比如,Hadoop的map task需要一部只讀詞典來(lái)處理文本時(shí),由于不存在共享變量,每個(gè)task都需要加載一部詞典。當(dāng)然也可以使用DistributedCache來(lái)解決。在Spark中,通過(guò)broadcast,每個(gè)節(jié)點(diǎn)存放一部詞典就夠了,這樣從task粒度上升到node粒度,節(jié)約的資源可想而知。Spark的broadcast路由算法也考慮到了通信開(kāi)銷。
通過(guò)使用SparkContext.broadcast(v)來(lái)實(shí)現(xiàn)對(duì)變量v的包裝和共享。
| scala> val broadcastVar = sc.broadcast(Array(1,2,3)) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Asked to add key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Estimated size for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) is 12 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Size estimation for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) took 0 ms 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: ensureFreeSpace((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d), 12) called with curBytes=12, maxBytes=339585269 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Adding key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Number of entries is now 2 broadcastVar: spark.broadcast.Broadcast[Array[Int]] = spark.Broadcast(a5c2a151-185d-4ea4-aad1-9ec642eebc5d) |
創(chuàng)建broadcast變量后,可以通過(guò).value來(lái)訪問(wèn)只讀原始變量v。
| scala> broadcastVar.value res4: Array[Int] = Array(1, 2, 3) |
另一種共享變量是Accumulators,顧名思義就是可以被“added”的變量,比如MapReduce中的counters就是不斷累加的變量。Spark原生支持Int和Double類型的累加變量。
通過(guò)SparkContext.accumulator(v)來(lái)創(chuàng)建accumulator類型的變量,然后運(yùn)行的task可以使用“+=”操作符來(lái)進(jìn)行累加。但是task不能讀取到該變量,只有driver program能夠讀取(通過(guò).value),這也是為了避免使用太多讀寫鎖吧。
創(chuàng)建0的accumulator版本。
| scala> val accum = sc.accumulator(0) accum: spark.Accumulator[Int] = 0 |
對(duì)生成的RDD進(jìn)行累加,這次不要reduce了。
| scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x) 12/05/10 11:05:48 INFO spark.SparkContext: Starting job... scala> accum.value res7: Int = 20 |
8 安裝Mesos
Spark-0.4推薦的Mesos版本是1205738,不是最新版的Mesos,我想最新版應(yīng)該也可以,這里暫且使用1205738。
首先下載Mesos
| svn checkout –r 1205738?https://svn.apache.org/repos/asf/incubator/mesos/trunkmesos |
得到mesos目錄后,先安裝編譯所需的軟件
| apt-get install python2.6 python2.6-dev 很遺憾,雖然Ubuntu 11.04上有python 2.7,但webui(mesos的web界面)需要python 2.6,因此要裝 apt-get install libcppunit-dev (安裝cppunit) 確保g++版本大于4.1 如果缺automake,那么安裝 apt-get install autoconf automake libtool |
由于系統(tǒng)是Ubuntu 11.04 (GNU/Linux 2.6.38-8-generic x86_64)-natty,可以直接使用./configure.template.ubuntu-natty-64。但我使用的JDK是Sun的,因此修改./configure.template.ubuntu-natty-64里面--with-java-home為/opt/jdk1.6.0_27。
總體如下:
| cp configure.template.ubuntu-natty-64 configure.template.ubuntu-my-natty-64 修改configure.template.ubuntu-my-natty-64得到如下內(nèi)容 1 #!/bin/sh 2 export PYTHON=python2.7 3 4 $(dirname $0)/configure \ 5 --with-python-headers=/usr/include/python2.7 \ 6 --with-java-home=/opt/jdk1.6.0_27 \ 7 --with-webui \ 8 --with-included-zookeeper $@ |
編譯mesos
| root@master:/opt/mesos# ./configure.template.ubuntu-my-natty-64 完了之后 root@master:/opt/mesos# make |
9 配置Mesos和Spark
先在slave1、slave2、slave3和master上安裝mesos,我這里安裝在/opt/mesos。
進(jìn)入conf目錄,修改deploy-env.sh,添加MESOS_HOME
| # This works with a newer version of hostname on Ubuntu. #FULL_IP="hostname --all-ip-addresses" #export LIBPROCESS_IP=`echo $FULL_IP | sed 's/\([^ ]*\) .*/\1/'` export MESOS_HOME=/opt/mesos |
修改mesos.conf,添加
| # mesos-slave with --help. failover_timeout=1 |
進(jìn)入/opt/spark,修改conf/spark-env.sh,添加
| # variables to set are: # - MESOS_HOME, to point to your Mesos installation # - SCALA_HOME, to point to your Scala installation # - SPARK_CLASSPATH, to add elements to Spark's classpath # - SPARK_JAVA_OPTS, to add JVM options # - SPARK_MEM, to change the amount of memory used per node (this should # be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). # - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. export SCALA_HOME=/opt/scala-2.9.1.final export MESOS_HOME=/opt/mesos export PATH=$PATH:/opt/jdk1.6.0_27/bin export SPARK_MEM=10g?(根據(jù)自己機(jī)器的內(nèi)存大小設(shè)置,指示Spark可以使用的最大內(nèi)存量) |
總結(jié)
以上是生活随笔為你收集整理的Spark安装与学习的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: OpenStack云计算快速入门教程
- 下一篇: Hadoop vs Spark性能对比