新手福利:Apache Spark 入门攻略
本文聚焦 Apache Spark 入門,了解其在大數據領域的地位,覆蓋 Apache Spark 的安裝及應用程序的建立,并解釋一些常見的行為和操作。
一、 為什么要使用 Apache Spark
時下,我們正處在一個“大數據”的時代,每時每刻,都有各種類型的數據被生產。而在此紫外,數據增幅的速度也在顯著增加。從廣義上看,這些數據包含交易數據、社交媒體內容(比如文本、圖像和視頻)以及傳感器數據。那么,為什么要在這些內容上投入如此多精力,其原因無非就是從海量數據中提取洞見可以對生活和生產實踐進行很好的指導。
在幾年前,只有少部分公司擁有足夠的技術力量和資金去儲存和挖掘大量數據,并對其挖掘從而獲得洞見。然而,被雅虎 2009 年開源的 Apache Hadoop 對這一狀況產生了顛覆性的沖擊——通過使用商用服務器組成的集群大幅度地降低了海量數據處理的門檻。因此,許多行業(比如 Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing 和 Entertainment)開始了 Hadoop 的征程,走上了海量數據提取價值的道路。著眼 Hadoop ,其主要提供了兩個方面的功能:
- 通過水平擴展商用主機,HDFS提供了一個廉價的方式對海量數據進行容錯存儲。
- MapReduce 計算范例,提供了一個簡單的編程模型來挖掘數據并獲得洞見。
下圖展示了 MapReduce 的數據處理流程,其中一個 Map-Reduce step 的輸出將作為下一個典型 Hadoop job 的輸入結果。
在整個過程中,中間結果會借助磁盤傳遞,因此對比計算,大量的 Map-Reduced 作業都受限于 IO 。然而對于 ETL 、數據整合和清理這樣的用例來說,IO 約束并不會產生很大的影響,因為這些場景對數據處理時間往往不會有較高的需求。然而,在現實世界中,同樣存在許多對延時要求較為苛刻的用例,比如:
毫無疑問,歷經數年發展,Hadoop 生態圈中的豐富工具已深受用戶喜愛,然而這里仍然存在眾多問題給使用帶來了挑戰:
每個用例都需要多個不同的技術堆棧來支撐,在不同使用場景下,大量的解決方案往往捉襟見肘。
在生產環境中機構往往需要精通數門技術。
許多技術存在版本兼容性問題。
無法在并行 job 中更快地共享數據。
而通過 Apache Spark,上述問題迎刃而解!Apache Spark 是一個輕量級的內存集群計算平臺,通過不同的組件來支撐批、流和交互式用例,如下圖。
二、 關于 Apache Spark
Apache Spark 是個開源和兼容 Hadoop 的集群計算平臺。由加州大學伯克利分校的 AMPLabs 開發,作為 Berkeley Data Analytics Stack(BDAS) 的一部分,當下由大數據公司 Databricks 保駕護航,更是 Apache 旗下的頂級項目,下圖顯示了 Apache Spark 堆棧中的不同組件。
Apache Spark 的5大優勢:
更高的性能,因為數據被加載到集群主機的分布式內存中。數據可以被快速的轉換迭代,并緩存用以后續的頻繁訪問需求。很多對 Spark 感興趣的朋友可能也會聽過這樣一句話——在數據全部加載到內存的情況下, Spark 可以比 Hadoop 快 100 倍,在內存不夠存放所有數據的情況下快 Hadoop 10 倍。
通過建立在 Java、Scala、Python、SQL (應對交互式查詢)的標準 API 以方便各行各業使用,同時還含有大量開箱即用的機器學習庫。
與現有 Hadoop v1 ( SIMR ) 和 2.x (YARN) 生態兼容,因此機構可以進行無縫遷移。
方便下載和安裝。方便的 shell(REPL: Read-Eval-Print-Loop)可以對 API 進行交互式的學習。
借助高等級的架構提高生產力,從而可以講精力放到計算上。
同時, Apache Spark 由 Scala 實現,代碼非常簡潔。
三、安裝Apache Spark
下表列出了一些重要鏈接和先決條件:[+]查看原圖
如上圖所示,Apache Spark 的部署方式包括 standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos 。Apache Spark 需求一定的 Java、Scala 或 Python 知識。這里,我們將專注 standalone 配置下的安裝和運行。
安裝 JDK 1.6+、Scala 2.10+、Python [2.6,3] 和 sbt
下載 Apache Spark 1.0.1 Release
在指定目錄下 Untar 和 Unzip spark-1.0.1.tgz
akuntamukkala@localhost~/Downloads$ pwd /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark運行 sbt 建立 Apache Spark
akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly發布 Scala 的 Apache Spark standalone REPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell如果是 Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark查看 SparkUI @ http://localhost:4040
四、Apache Spark 的工作模式
Spark 引擎提供了在集群中所有主機上進行分布式內存數據處理的能力,下圖顯示了一個典型 Spark job 的處理流程。
下圖顯示了 Apache Spark 如何在集群中執行一個作業。
Master 控制數據如何被分割,利用了數據本地性,并在 Slaves 上跟蹤所有分布式計算。在某個Slave不可用時,其存儲的數據會分配給其他可用的 Slaves 。雖然當下( 1.0.1 版本) Master 還存在單點故障,但后期必然會被修復。
五、彈性分布式數據集(Resilient Distributed Dataset,RDD)
彈性分布式數據集(RDD,從 Spark 1.3 版本開始已被 DataFrame 替代)是 Apache Spark 的核心理念。它是由數據組成的不可變分布式集合,其主要進行兩個操作:transformation 和 action 。Transformation 是類似在 RDD 上做 filter()、map() 或 union() 以生成另一個 RDD 的操作,而 action 則是 count()、first()、take(n)、collect() 等促發一個計算并返回值到 Master 或者穩定存儲系統的操作。Transformations 一般都是 lazy 的,直到 action 執行后才會被執行。Spark Master/Driver 會保存 RDD 上的 Transformations 。這樣一來,如果某個 RDD 丟失(也就是 salves 宕掉),它可以快速和便捷地轉換到集群中存活的主機上。這也就是 RDD 的彈性所在。
下圖展示了 Transformation 的 lazy :
我們可以通過下面示例來理解這個概念:從文本中發現 5 個最常用的 word 。下圖顯示了一個可能的解決方案。
在上面命令中,我們對文本進行讀取并且建立字符串的 RDD 。每個條目代表了文本中的 1 行。
scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”) hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false) topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
通過上述命令我們可以發現這個操作非常簡單——通過簡單的 Scala API 來連接 transformations 和 actions 。
可能存在某些 words 被 1 個以上空格分隔的情況,導致有些 words 是空字符串,因此需要使用 filter(!_.isEmpty) 將它們過濾掉。
每個 word 都被映射成一個鍵值對:map(word=>(word,1))。
為了合計所有計數,這里需要調用一個 reduce 步驟—— reduceByKey(+) 。 + 可以非常便捷地為每個 key 賦值。
我們得到了 words 以及各自的 counts,下一步需要做的是根據 counts 排序。在 Apache Spark ,用戶只能根據 key 排序,而不是值。因此,這里需要使用 map{case (word, count) => (count, word)} 將 (word, count) 流轉到 (count, word)。
需要計算最常用的 5 個 words ,因此需要使用 sortByKey(false) 做一個計數的遞減排序。
上述命令包含了一個 .take(5) (an action operation, which triggers computation) 和在 /Users/akuntamukkala/temp/gutenburg.txt 文本中輸出 10 個最常用的 words 。在 Python shell 中用戶可以實現同樣的功能。
RDD lineage 可以通過 toDebugString (一個值得記住的操作)來跟蹤。
scala> topWordCount.take(5).foreach(x=>println(x)) (1044,the) (730,and) (679,of) (648,to) (511,I)
常用的 Transformations:
[+]查看原圖[+]查看原圖
常見集合操作[+]查看原圖[+]查看原圖
更多 transformations 信息,請查看 http://spark.apache.org/docs/latest/programming-guide.html#transformations
常用的 actions[+]查看原圖
更多 actions 參見 http://spark.apache.org/docs/latest/programming-guide.html#actions
六、RDD持久性
Apache Spark 中一個主要的能力就是在集群內存中持久化/緩存 RDD 。這將顯著地提升交互速度。下表顯示了 Spark 中各種選項。
[+]查看原圖
上面的存儲等級可以通過 RDD. cache() 操作上的 persist () 操作訪問,可以方便地指定 MEMORY_ONLY 選項。關于持久化等級的更多信息,可以訪問這里 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。
Spark 使用 Least Recently Used (LRU) 算法來移除緩存中舊的、不常用的 RDD ,從而釋放出更多可用內存。同樣還提供了一個 unpersist() 操作來強制移除緩存/持久化的 RDD 。
七、變量共享
Accumulators。Spark 提供了一個非常便捷地途徑來避免可變的計數器和計數器同步問題—— Accumulators 。Accumulators 在一個 Spark context 中通過默認值初始化,這些計數器在 Slaves 節點上可用,但是 Slaves 節點不能對其進行讀取。它們的作用就是來獲取原子更新,并將其轉發到 Master 。 Master 是唯一可以讀取和計算所有更新合集的節點。舉個例子:
akuntamukkala@localhost~/temp$ cat output.log error warning info trace error info info scala> val nErrors=sc.accumulator(0.0) scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”) scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1) scala> nErrors.value Result:Int = 2
Broadcast Variables。實際生產中,通過指定 key 在 RDDs 上對數據進行合并的場景非常常見。在這種情況下,很可能會出現給 slave nodes 發送大體積數據集的情況,讓其負責托管需要做 join 的數據。因此,這里很可能存在巨大的性能瓶頸,因為網絡 IO 比內存訪問速度慢 100 倍。為了解決這個問題,Spark 提供了 Broadcast Variables,如其名稱一樣,它會向 slave nodes 進行廣播。因此,節點上的 RDD 操作可以快速訪問 Broadcast Variables 值。舉個例子,期望計算一個文件中所有路線項的運輸成本。通過一個 look-up table指定每種運輸類型的成本,這個look-up table 就可以作為 Broadcast Variables 。
akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground express media priority priority ground express media scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10) scala> val bcMailRates = sc.broadcast(map)
上述命令中,我們建立了一個 broadcast variable,基于服務類別成本的 map 。
scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)
在上述命令中,我們通過 broadcast variable 的 mailing rates 來計算運輸成本。
scala> pts.map(shipType=>(shipType,1)).reduceByKey(+). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()
通過上述命令,我們使用 accumulator 來累加所有運輸的成本。詳細信息可通過下面的 PDF 查看 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。
八、Spark SQL
通過 Spark Engine,Spark SQL 提供了一個便捷的途徑來進行交互式分析,使用一個被稱為 SchemaRDD 類型的 RDD 。SchemaRDD 可以通過已有 RDDs 建立,或者其他外部數據格式,比如 Parquet files、JSON 數據,或者在 Hive 上運行 HQL。SchemaRDD 非常類似于 RDBMS 中的表格。一旦數據被導入 SchemaRDD,Spark 引擎就可以對它進行批或流處理。Spark SQL 提供了兩種類型的 Contexts——SQLContext 和 HiveContext,擴展了 SparkContext 的功能。
SparkContext 提供了到簡單 SQL parser 的訪問,而 HiveContext 則提供了到 HiveQL parser 的訪問。HiveContext 允許企業利用已有的 Hive 基礎設施。
這里看一個簡單的 SQLContext 示例。
下面文本中的用戶數據通過 “ | ” 來分割。
John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
定義 Scala case class 來表示每一行:
case class Customer(name:String,age:Int,gender:String,address: String)下面的代碼片段體現了如何使用 SparkContext 來建立 SQLContext ,讀取輸入文件,將每一行都轉換成 SparkContext 中的一條記錄,并通過簡單的 SQL 語句來查詢 30 歲以下的男性用戶。
val sparkConf = new SparkConf().setAppName(“Customers”) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’)) val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”) sqlContext.sql(“select * from customers where gender=’M’ and age <30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,TX,75461]更多使用 SQL 和 HiveQL 的示例請訪問下面鏈接 https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
九、Spark Streaming
Spark Streaming 提供了一個可擴展、容錯、高效的途徑來處理流數據,同時還利用了 Spark 的簡易編程模型。從真正意義上講,Spark Streaming 會將流數據轉換成 micro batches,從而將 Spark 批處理編程模型應用到流用例中。這種統一的編程模型讓 Spark 可以很好地整合批量處理和交互式流分析。下圖顯示了 Spark Streaming 可以從不同數據源中讀取數據進行分析。
Spark Streaming 中的核心抽象是 Discretized Stream(DStream)。DStream 由一組 RDD 組成,每個 RDD 都包含了規定時間(可配置)流入的數據。上圖很好地展示了 Spark Streaming 如何通過將流入數據轉換成一系列的 RDDs,再轉換成 DStream 。每個 RDD 都包含兩秒(設定的區間長度)的數據。在 Spark Streaming 中,最小長度可以設置為 0.5 秒,因此處理延時可以達到 1 秒以下。
Spark Streaming 同樣提供了 window operators ,它有助于更有效率在一組 RDD ( a rolling window of time)上進行計算。同時,DStream 還提供了一個 API ,其操作符(transformations 和 output operators)可以幫助用戶直接操作 RDD 。下面不妨看向包含在 Spark Streaming 下載中的一個簡單示例。示例是在 Twitter 流中找出趨勢 hashtags ,詳見下面代碼。
spark- 1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”) val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters)上述代碼用于建立 Spark Streaming Context 。Spark Streaming 將在 DStream 中建立一個 RDD ,包含了每 2 秒流入的 tweets 。
val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))上述代碼片段將 Tweet 轉換成一組 words ,并過濾出所有以 a# 開頭的。
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))上述代碼展示了如何整合計算 60 秒內一個 hashtag 流入的總次數。
topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println(“\nPopular topics in last 60 seconds (%s total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s tweets)”.format(tag, count))} })上面代碼將找出 top 10 趨勢 tweets ,然后將其打印。
ssc.start()上述代碼讓 Spark Streaming Context 開始檢索 tweets 。一起聚焦一些常用操作,假設我們正在從一個 socket 中讀入流文本。
al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)更多 operators 請訪問 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
Spark Streaming 擁有大量強大的 output operators ,比如上文提到的 foreachRDD(),了解更多可訪問 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。
十、附加學習資源
- Wikipedia article (good): http://en.wikipedia.org/wiki/Apache_Spark
- Launching a Spark cluster on EC2: http://ampcamp.berkeley.edu/exercises-strata-conf-2013/launching-a-cluster.html
- Quick start: https://spark.apache.org/docs/1.0.1/quick-start.html
- The Spark platform provides MLLib(machine learning) and GraphX(graph algorithms). The following links provide more information:https://spark.apache.org/docs/latest/mllib-guide.html、https://spark.apache.org/docs/1.0.1/graphx-programming-guide.html、https://dzone.com/refcardz/apache-spark
原文鏈接:Apache Spark:An Engine for Large-Scale Data Processing
本文系 OneAPM 工程師編譯整理。
總結
以上是生活随笔為你收集整理的新手福利:Apache Spark 入门攻略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 涉足计算机视觉领域要知道的
- 下一篇: 数字图像处理:第六章 几何运算