Apache Spark中实现的MapReduce设计模式
該博客是該系列文章的第一篇,討論了MapReduce設(shè)計模式一書中的一些設(shè)計模式,并展示了如何在Apache Spark(R)中實現(xiàn)這些模式。
在編寫MapReduce或Spark程序時,考慮執(zhí)行作業(yè)的數(shù)據(jù)流很有用。 即使Pig,Hive,Apache Drill和Spark數(shù)據(jù)框使分析數(shù)據(jù)變得更加容易,在較低級別理解流還是很有用的,就像使用Explain理解查詢計劃一樣有價值。 考慮這一點(diǎn)的一種方法是對模式類型進(jìn)行分組,這些模式是用于解決常見和常規(guī)數(shù)據(jù)處理問題的模板。 以下是MapReduce書籍中MapReduce模式的類型列表:
- 匯總模式
- 過濾模式
- 數(shù)據(jù)組織模式
- 聯(lián)接模式
- 元模式
- 輸入和輸出模式
在這篇文章中,我們將介紹一種匯總模式,即數(shù)值匯總。
數(shù)值總結(jié)
數(shù)值匯總是一種用于計算數(shù)據(jù)匯總統(tǒng)計值的模式。 目的是按關(guān)鍵字段對記錄進(jìn)行分組,并計算每組的匯總,例如最小值,最大值,中位數(shù)。 MapReduce設(shè)計模式手冊中的下圖顯示了該模式在MapReduce中的一般執(zhí)行。
此聚合模式對應(yīng)于在SQL中使用GROUP BY ,例如:
SELECT MIN(numericalcol1), MAX(numericalcol1),COUNT(*) FROM table GROUP BY groupcol2;在Pig中,這對應(yīng)于:
b = GROUP a BY groupcol2; c = FOREACH b GENERATE group, MIN(a.numericalcol1),MAX(a.numericalcol1), COUNT_STAR(a);在Spark中,鍵值對RDD通常用于按鍵分組以執(zhí)行聚合,如MapReduce圖所示,但是,使用Spark Pair RDDS,您不僅具有Map和Reduce 功能 ,還具有更多功能 。
我們將使用以前在Spark Dataframes上的博客中的數(shù)據(jù)集介紹一些匯總示例。 數(shù)據(jù)集是一個.csv文件,由在線拍賣數(shù)據(jù)組成。 每個拍賣都有一個與其關(guān)聯(lián)的拍賣ID,并且可以有多個出價。 每行代表一個出價。 對于每個出價,我們都有以下信息:
(在代碼框中,注釋為綠色,輸出為藍(lán)色)
下面,我們從ebay.csv文件加載數(shù)據(jù),然后使用Scala案例類定義與ebay.csv文件相對應(yīng)的Auction模式。 然后,將map()轉(zhuǎn)換應(yīng)用于每個元素以創(chuàng)建Auction對象的AuctionRDD。
// SQLContext entry point for working with structured data val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Import Spark SQL data types and Row. import org.apache.spark.sql._ //define the schema using a case class case class Auction(auctionid: String, bid: Double, bidtime: Double, bidder: String, bidderrate: Integer, openbid: Double, price: Double, item: String, daystolive: Integer) // create an RDD of Auction objects val auctionRDD= sc.textFile("ebay.csv").map(_.split(",")).map(p => Auction(p(0),p(1).toDouble,p(2).toDouble,p(3),p(4).toInt,p(5).toDouble,p(6).toDouble,p(7),p(8).toInt ))下圖顯示了Spark的一般執(zhí)行情況,用于計算項目每次競價的平均出價。
相應(yīng)的代碼如下所示。 首先,創(chuàng)建一個鍵值對,其中拍賣ID和商品為鍵,出價金額為1,例如(((id,item),bid amount,1))。 接下來,reduceBykey執(zhí)行投標(biāo)金額的總和和投標(biāo)金額的總和,以獲得總投標(biāo)金額和計數(shù)。 mapValues計算平均值,即總出價金額/出價計數(shù)。
// create key value pairs of ( (auctionid, item) , (bid, 1)) val apair = auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1))) // reducebyKey to get the sum of bids and count sum val atotalcount = apair.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)) // get a couple results atotalcount.take(2) // Array(((1641062012,cartier),(4723.99,3)), ((2920322392,palm),(3677.96,32))) // avg = total/count val avgs = atotalcount.mapValues{ case (total, count) => total.toDouble / count } // get a couple results avgs.take(2) // Array(((1641062012,cartier),1574.6633333333332), ((2920322392,palm),114.93625))// This could also be written like this val avgs =auctionRDD.map(auction=>((auction.auctionid,auction.item), (auction.bid, 1))).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (total, count) => total.toDouble / count }也可以使用java Math類或spark StatCounter類來計算統(tǒng)計信息,如下所示
import java.lang.Math// Calculate the minimum bid per auction val amax = apair.reduceByKey(Math.min) // get a couple results amax.take(2) // Array(((1641062012,cartier),1524.99), ((2920322392,palm),1.0))import org.apache.spark.util.StatCounter // Calculate statistics on the bid amount per auction val astats = apair.groupByKey().mapValues(list => StatCounter(list)) // get a result astats.take(1) // Array(((1641062012,cartier),(count: 3, mean: 1574.663333, stdev: 35.126723, max: 1600.000000, min: 1524.990000)))Spark DataFrames提供了一種特定于域的語言來進(jìn)行分布式數(shù)據(jù)操作,從而使執(zhí)行聚合更加容易。 此外,DataFrame查詢的性能要優(yōu)于使用PairRDD進(jìn)行編碼,因為它們的執(zhí)行是由查詢優(yōu)化器自動優(yōu)化的。 這是一個使用DataFrames來按Auctionid和item獲取最低,最高和平均出價的示例:
val auctionDF = auctionRDD.toDF() // get the max, min, average bid by auctionid and item auctionDF.groupBy("auctionid", "item").agg($"auctionid",$"item", max("bid"), min("bid"), avg("bid")).show auctionid item MAX(bid) MIN(bid) AVG(bid) 3016429446 palm 193.0 120.0 167.54900054931642 8211851222 xbox 161.0 51.0 95.98892879486084您還可以使用Spark SQL在使用DataFrames時使用SQL。 本示例按Auctionid和Item獲取最高,最低,平均出價。
// register as a temp table inorder to use sql auctionDF .registerTempTable("auction") // get the max, min, average bid by auctionid and item val aStatDF = sqlContext.sql("SELECT auctionid, item, MAX(bid) as maxbid, min(bid) as minbid, avg(bid) as avgbid FROM auction GROUP BY auctionid, item")// show some results aStatDF.show auctionid item maxbid minbid avgbid 3016429446 palm 193.0 120.0 167.549 8211851222 xbox 161.0 51.0 95.98892857142857摘要
這是本系列文章的第一部分,該系列文章將討論使用Spark實現(xiàn)的一些MapReduce設(shè)計模式。 討論非常緊湊,有關(guān)模式的更多信息,請參閱MapReduce設(shè)計模式手冊,有關(guān)Spark Pair RDD的更多信息,請參閱“ 學(xué)習(xí)Spark Key值對”一章。
參考和更多信息
- 免費(fèi)的交互式電子書– Apache Spark入門:從開始到生產(chǎn)
- MapReduce設(shè)計模式書
- DataFrame上聚合的方法
- 免費(fèi)的Spark on Demand培訓(xùn)
翻譯自: https://www.javacodegeeks.com/2015/11/mapreduce-design-patterns-implemented-in-apache-spark.html
總結(jié)
以上是生活随笔為你收集整理的Apache Spark中实现的MapReduce设计模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网络游戏备案查询(网游备案查询)
- 下一篇: 使用Hibernate在CQRS读取模型