【Apache Spark 】第 6 章Spark SQL 和数据集
?🔎大家好,我是Sonhhxg_柒,希望你看完之后,能對(duì)你有所幫助,不足請(qǐng)指正!共同學(xué)習(xí)交流🔎
📝個(gè)人主頁-Sonhhxg_柒的博客_CSDN博客?📃
🎁歡迎各位→點(diǎn)贊👍 + 收藏?? + 留言📝?
📣系列專欄 - 機(jī)器學(xué)習(xí)【ML】?自然語言處理【NLP】? 深度學(xué)習(xí)【DL】
?
?🖍foreword
?說明?本人講解主要包括Python、機(jī)器學(xué)習(xí)(ML)、深度學(xué)習(xí)(DL)、自然語言處理(NLP)等內(nèi)容。
如果你對(duì)這個(gè)系列感興趣的話,可以關(guān)注訂閱喲👋
文章目錄
Java 和 Scala 的單一 API
用于數(shù)據(jù)集的 Scala 案例類和 JavaBean
使用數(shù)據(jù)集
創(chuàng)建示例數(shù)據(jù)
轉(zhuǎn)換樣本數(shù)據(jù)
高階函數(shù)和函數(shù)式編程
將 DataFrame 轉(zhuǎn)換為數(shù)據(jù)集
數(shù)據(jù)集和數(shù)據(jù)幀的內(nèi)存管理
數(shù)據(jù)集編碼器
Spark 的內(nèi)部格式與 Java 對(duì)象格式
序列化和反序列化 (SerDe)
使用數(shù)據(jù)集的成本
降低成本的策略
概括
在第4章和第5章中,我們介紹了 Spark SQL 和 DataFrame API。我們研究了如何連接到內(nèi)置和外部數(shù)據(jù)源,了解了 Spark SQL 引擎,并探討了諸如 SQL 和 DataFrame 之間的互操作性、創(chuàng)建和管理視圖和表以及高級(jí) DataFrame 和 SQL 轉(zhuǎn)換等主題。
盡管我們?cè)诘?3 章中簡要介紹了 Dataset API?,但我們略讀了 Datasets(強(qiáng)類型分布式集合)在 Spark 中的創(chuàng)建、存儲(chǔ)、序列化和反序列化的主要方面。
在本章中,我們將深入了解數(shù)據(jù)集:我們將探索在 Java 和 Scala 中使用數(shù)據(jù)集,Spark 如何管理內(nèi)存以適應(yīng)作為高級(jí) API 的一部分的數(shù)據(jù)集結(jié)構(gòu),以及與使用數(shù)據(jù)集相關(guān)的成本。
Java 和 Scala 的單一 API
您可能還記得第 3 章(圖 3-1和表 3-6),數(shù)據(jù)集為強(qiáng)類型對(duì)象提供了統(tǒng)一且單一的 API。在 Spark 支持的語言中,只有 Scala 和 Java 是強(qiáng)類型的;因此,Python 和 R 僅支持無類型的 DataFrame API。
數(shù)據(jù)集是特定領(lǐng)域的類型化對(duì)象,可以使用函數(shù)式編程或您熟悉的 DataFrame API 中的 DSL 運(yùn)算符并行操作。
由于這個(gè)單一的 API,Java 開發(fā)人員不再冒險(xiǎn)落后。例如,將來對(duì) Scala 的groupBy()、flatMap()、map()或filter()API 的任何接口或行為更改對(duì)于 Java 也將是相同的,因?yàn)樗莾蓚€(gè)實(shí)現(xiàn)通用的單一接口。
用于數(shù)據(jù)集的 Scala 案例類和 JavaBean
如果您還記得第 3 章(表 3-2),Spark 具有內(nèi)部數(shù)據(jù)類型,例如StringType、BinaryType、IntegerType、BooleanType和MapType,它用于在 Spark 操作期間無縫映射到 Scala 和 Java 中的語言特定數(shù)據(jù)類型。這種映射是通過編碼器完成的,我們將在本章后面討論。
為了在 Scala 中創(chuàng)建類型化對(duì)象Dataset[T],T您需要一個(gè)定義該對(duì)象的案例類。使用第 3 章(表 3-1)中的示例數(shù)據(jù),假設(shè)我們有一個(gè) JSON 文件,其中包含數(shù)百萬關(guān)于博客作者以以下格式撰寫的關(guān)于 Apache Spark 的條目:
{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date: "1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}}, ... {id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date: "5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}要?jiǎng)?chuàng)建分布式Dataset[Bloggers],我們必須首先定義一個(gè) Scala 案例類,該類定義包含 Scala 對(duì)象的每個(gè)單獨(dú)字段。此案例類用作類型化對(duì)象的藍(lán)圖或模式Bloggers:
// In Scala case class Bloggers(id:Int, first:String, last:String, url:String, date:String, hits: Int, campaigns:Array[String])我們現(xiàn)在可以從數(shù)據(jù)源中讀取文件:
val bloggers = "../data/bloggers.json" val bloggersDS = spark.read.format("json").option("path", bloggers).load().as[Bloggers]生成的分布式數(shù)據(jù)集合中的每一行都是類型Bloggers。
同樣,您可以在 Java 中創(chuàng)建一個(gè) JavaBean 類型的類Bloggers,然后使用編碼器創(chuàng)建一個(gè)Dataset<Bloggers>:
// In Java import org.apache.spark.sql.Encoders; import java.io.Serializable;public class Bloggers implements Serializable {private int id;private String first;private String last;private String url;private String date;private int hits;private Array[String] campaigns;// JavaBean getters and setters int getID() { return id; } void setID(int i) { id = i; } String getFirst() { return first; } void setFirst(String f) { first = f; } String getLast() { return last; } void setLast(String l) { last = l; } String getURL() { return url; } void setURL (String u) { url = u; } String getDate() { return date; } Void setDate(String d) { date = d; } int getHits() { return hits; } void setHits(int h) { hits = h; }Array[String] getCampaigns() { return campaigns; } void setCampaigns(Array[String] c) { campaigns = c; } }// Create Encoder Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class); String bloggers = "../bloggers.json" Dataset<Bloggers>bloggersDS = spark.read.format("json").option("path", bloggers).load().as(BloggerEncoder);如您所見,在 Scala 和 Java 中創(chuàng)建數(shù)據(jù)集需要一些先見之明,因?yàn)槟仨氈勒谧x取的行的所有單個(gè)列名稱和類型。與 DataFrame 不同,您可以選擇讓 Spark 推斷架構(gòu),Dataset API 要求您提前定義數(shù)據(jù)類型,并且您的案例類或 JavaBean 類與您的架構(gòu)匹配。
筆記
Scala 案例類或 Java 類定義中的字段名稱必須與數(shù)據(jù)源中的順序匹配。數(shù)據(jù)中每一行的列名會(huì)自動(dòng)映射到類中對(duì)應(yīng)的名稱,并自動(dòng)保留類型。
如果字段名稱與您的輸入數(shù)據(jù)匹配,您可以使用現(xiàn)有的 Scala 案例類或 JavaBean 類。使用 Dataset API與使用 DataFrame一樣簡單、簡潔和聲明性。對(duì)于大多數(shù)數(shù)據(jù)集的轉(zhuǎn)換,您可以使用在前幾章中學(xué)習(xí)過的相同的關(guān)系運(yùn)算符。
讓我們檢查使用示例數(shù)據(jù)集的一些方面.
使用數(shù)據(jù)集
創(chuàng)建示例數(shù)據(jù)集的一種簡單而動(dòng)態(tài)的方法是使用實(shí)SparkSession??例。在這個(gè)場景中,為了說明的目的,我們動(dòng)態(tài)地創(chuàng)建一個(gè)包含三個(gè)字段的 Scala 對(duì)象:(uid用戶的唯一 ID)、uname(隨機(jī)生成的用戶名字符串)和usage(服務(wù)器或服務(wù)使用的分鐘數(shù))。
創(chuàng)建示例數(shù)據(jù)
首先,讓我們生成一些示例數(shù)據(jù):
// In Scala import scala.util.Random._ // Our case class for the Dataset case class Usage(uid:Int, uname:String, usage: Int) val r = new scala.util.Random(42) // Create 1000 instances of scala Usage class // This generates data on the fly val data = for (i <- 0 to 1000) yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString(""),r.nextInt(1000))) // Create a Dataset of Usage typed data val dsUsage = spark.createDataset(data) dsUsage.show(10)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ | 0|user-Gpi2C| 525| | 1|user-DgXDi| 502| | 2|user-M66yO| 170| | 3|user-xTOn6| 913| | 4|user-3xGSz| 246| | 5|user-2aWRN| 727| | 6|user-EzZY1| 65| | 7|user-ZlZMZ| 935| | 8|user-VjxeG| 756| | 9|user-iqf1P| 3| +---+----------+-----+ only showing top 10 rows在 Java 中這個(gè)想法是相似的,但我們必須使用顯式Encoders(在 Scala 中,Spark 隱式處理):
// In Java import org.apache.spark.sql.Encoders; import org.apache.commons.lang3.RandomStringUtils; import java.io.Serializable; import java.util.Random; import java.util.ArrayList; import java.util.List;// Create a Java class as a Bean public class Usage implements Serializable {int uid; // user idString uname; // usernameint usage; // usagepublic Usage(int uid, String uname, int usage) {this.uid = uid;this.uname = uname;this.usage = usage;}// JavaBean getters and setters public int getUid() { return this.uid; }public void setUid(int uid) { this.uid = uid; }public String getUname() { return this.uname; }public void setUname(String uname) { this.uname = uname; }public int getUsage() { return this.usage; }public void setUsage(int usage) { this.usage = usage; }public Usage() {}public String toString() {return "uid: '" + this.uid + "', uame: '" + this.uname + "', usage: '" + this.usage + "'";} }// Create an explicit Encoder Encoder<Usage> usageEncoder = Encoders.bean(Usage.class); Random rand = new Random(); rand.setSeed(42); List<Usage> data = new ArrayList<Usage>()// Create 1000 instances of Java Usage class for (int i = 0; i < 1000; i++) {data.add(new Usage(i, "user" + RandomStringUtils.randomAlphanumeric(5),rand.nextInt(1000));// Create a Dataset of Usage typed data Dataset<Usage> dsUsage = spark.createDataset(data, usageEncoder);筆記
Scala 和 Java 生成的 Dataset 會(huì)有所不同,因?yàn)殡S機(jī)種子算法可能不同。因此,您的 Scala 和 Java 的查詢結(jié)果會(huì)有所不同。
現(xiàn)在我們已經(jīng)生成了數(shù)據(jù)集,dsUsage讓我們執(zhí)行一些我們?cè)谇皫渍轮型瓿傻某R娹D(zhuǎn)換。
轉(zhuǎn)換樣本數(shù)據(jù)
回想一下,數(shù)據(jù)集是特定領(lǐng)域?qū)ο蟮膹?qiáng)類型集合。這些對(duì)象可以使用函數(shù)或關(guān)系操作并行轉(zhuǎn)換。這些轉(zhuǎn)換的示例包括map()、reduce()、filter()、select()和aggregate()。作為高階函數(shù)的示例,這些方法可以將 lambda、閉包或函數(shù)作為參數(shù)并返回結(jié)果。因此,它們非常適合函數(shù)式編程。
Scala 是一種函數(shù)式編程語言,最近 lambda、函數(shù)式參數(shù)和閉包也被添加到 Java 中。讓我們?cè)?Spark 中嘗試幾個(gè)高階函數(shù),并將函數(shù)式編程結(jié)構(gòu)與我們之前創(chuàng)建的示例數(shù)據(jù)一起使用。
高階函數(shù)和函數(shù)式編程
舉個(gè)簡單的例子,讓我們使用filter()返回我們dsUsage數(shù)據(jù)集中所有使用時(shí)間超過 900 分鐘的用戶。一種方法是使用函數(shù)表達(dá)式作為filter()方法的參數(shù):
// In Scala import org.apache.spark.sql.functions._ dsUsage.filter(d => d.usage > 900).orderBy(desc("usage")).show(5, false)另一種方法是定義一個(gè)函數(shù)并將該函數(shù)作為參數(shù)提供給filter():
def filterWithUsage(u: Usage) = u.usage > 900 dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)+---+----------+-----+ |uid| uname|usage| +---+----------+-----+ |561|user-5n2xY| 999| |113|user-nnAXr| 999| |605|user-NL6c4| 999| |634|user-L0wci| 999| |805|user-LX27o| 996| +---+----------+-----+ only showing top 5 rows在第一種情況下,我們使用 lambda 表達(dá)式,{d.usage > 900}作為filter()方法的參數(shù),而在第二種情況下,我們定義了一個(gè) Scala 函數(shù),def filterWithUsage(u: Usage) = u.usage > 900。在這兩種情況下,該filter()方法都會(huì)遍歷Usage分布式數(shù)據(jù)集中對(duì)象的每一行,并應(yīng)用表達(dá)式或執(zhí)行函數(shù),Usage為表達(dá)式或函數(shù)的值為 的行返回類型為的新數(shù)據(jù)集true。(有關(guān)方法簽名的詳細(xì)信息,請(qǐng)參閱Scala 文檔。)
在 Java 中, to 的參數(shù)filter()類型為FilterFunction<T>。這可以匿名內(nèi)聯(lián)或使用命名函數(shù)定義。在本例中,我們將按名稱定義函數(shù)并將其分配給變量f。應(yīng)用此函數(shù)filter()將返回一個(gè)新數(shù)據(jù)集,其中包含我們過濾條件為的所有行true:
// In Java // Define a Java filter function FilterFunction<Usage> f = new FilterFunction<Usage>() {public boolean call(Usage u) {return (u.usage > 900);} };// Use filter with our function and order the results in descending order dsUsage.filter(f).orderBy(col("usage").desc()).show(5);+---+----------+-----+ |uid|uname |usage| +---+----------+-----+ |67 |user-qCGvZ|997 | |878|user-J2HUU|994 | |668|user-pz2Lk|992 | |750|user-0zWqR|991 | |242|user-g0kF6|989 | +---+----------+-----+ only showing top 5 rows并非所有 lambda 或函數(shù)參數(shù)都必須計(jì)算為Boolean值;他們也可以返回計(jì)算值。考慮這個(gè)使用高階函數(shù)的例子map(),我們的目標(biāo)是找出每個(gè)用戶的使用成本,其usage價(jià)值超過某個(gè)閾值,這樣我們就可以為這些用戶提供每分鐘的特價(jià)。
// In Scala // Use an if-then-else lambda expression and compute a value dsUsage.map(u => {if (u.usage > 750) u.usage * .15 else u.usage * .50 }).show(5, false) // Define a function to compute the usage def computeCostUsage(usage: Int): Double = {if (usage > 750) usage * 0.15 else usage * 0.50 } // Use the function as an argument to map() dsUsage.map(u => {computeCostUsage(u.usage)}).show(5, false) +------+ |value | +------+ |262.5 | |251.0 | |85.0 | |136.95| |123.0 | +------+ only showing top 5 rows要map()在 Java 中使用,您必須定義一個(gè)MapFunction<T>.?這可以是匿名類或擴(kuò)展的已定義類MapFunction<T>。對(duì)于這個(gè)例子,我們內(nèi)聯(lián)使用它——也就是說,在方法調(diào)用本身中:
// In Java // Define an inline MapFunction dsUsage.map((MapFunction<Usage, Double>) u -> {if (u.usage > 750)return u.usage * 0.15;elsereturn u.usage * 0.50; }, Encoders.DOUBLE()).show(5); // We need to explicitly specify the Encoder +------+ |value | +------+ |65.0 | |114.45| |124.0 | |132.6 | |145.5 | +------+ only showing top 5 rows盡管我們已經(jīng)計(jì)算了使用成本的值,但我們不知道計(jì)算值與哪些用戶相關(guān)聯(lián)。我們?nèi)绾潍@得這些信息?
步驟很簡單:
創(chuàng)建一個(gè) Scala 案例類或 JavaBean 類UsageCost,帶有一個(gè)名為 的附加字段或列cost。
定義一個(gè)函數(shù)來計(jì)算并在方法cost中使用它。map()
這是 Scala 中的樣子:
// In Scala // Create a new case class with an additional field, cost case class UsageCost(uid: Int, uname:String, usage: Int, cost: Double)// Compute the usage cost with Usage as a parameter // Return a new object, UsageCost def computeUserCostUsage(u: Usage): UsageCost = {val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50UsageCost(u.uid, u.uname, u.usage, v) }// Use map() on our original Dataset dsUsage.map(u => {computeUserCostUsage(u)}).show(5)+---+----------+-----+------+ |uid| uname|usage| cost| +---+----------+-----+------+ | 0|user-Gpi2C| 525| 262.5| | 1|user-DgXDi| 502| 251.0| | 2|user-M66yO| 170| 85.0| | 3|user-xTOn6| 913|136.95| | 4|user-3xGSz| 246| 123.0| +---+----------+-----+------+ only showing top 5 rows現(xiàn)在我們有了一個(gè)轉(zhuǎn)換后的數(shù)據(jù)集,其中包含一個(gè)由轉(zhuǎn)換cost中的函數(shù)計(jì)算的新列,map()以及所有其他列。
同樣,在 Java 中,如果我們想要與每個(gè)用戶關(guān)聯(lián)的成本,我們需要定義一個(gè) JavaBean 類UsageCost和MapFunction<T>.?有關(guān)完整的 JavaBean 示例,請(qǐng)參閱本書的GitHub 存儲(chǔ)庫;為簡潔起見,我們將僅在MapFunction<T>此處顯示內(nèi)聯(lián):
// In Java // Get the Encoder for the JavaBean class Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);// Apply map() function to our data dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {double v = 0.0;if (u.usage > 750) v = u.usage * 0.15; else v = u.usage * 0.50;return new UsageCost(u.uid, u.uname,u.usage, v); },usageCostEncoder).show(5);+------+---+----------+-----+ | cost|uid| uname|usage| +------+---+----------+-----+ | 65.0| 0|user-xSyzf| 130| |114.45| 1|user-iOI72| 763| | 124.0| 2|user-QHRUk| 248| | 132.6| 3|user-8GTjo| 884| | 145.5| 4|user-U4cU1| 970| +------+---+----------+-----+ only showing top 5 rows關(guān)于使用高階函數(shù)和數(shù)據(jù)集,有幾點(diǎn)需要注意:
-
我們使用類型化的 JVM 對(duì)象作為函數(shù)的參數(shù)。
-
我們使用點(diǎn)表示法(來自面向?qū)ο蟮木幊?#xff09;來訪問類型化 JVM 對(duì)象中的各個(gè)字段,使其更易于閱讀。
-
我們的一些函數(shù)和 lambda 簽名可以是類型安全的,確保編譯時(shí)錯(cuò)誤檢測并指示 Spark 處理哪些數(shù)據(jù)類型、執(zhí)行哪些操作等。
-
我們的代碼具有可讀性、表達(dá)性和簡潔性,在 lambda 表達(dá)式中使用 Java 或 Scala 語言特性。
-
Spark 在 Java 和 Scala 中都提供了與高階函數(shù)構(gòu)造等效的map()和filter()沒有的高階函數(shù)構(gòu)造,因此您不必將函數(shù)式編程與 Datasets 或 DataFrames 一起使用。相反,您可以簡單地使用條件 DSL 運(yùn)算符或 SQL 表達(dá)式:例如,dsUsage.filter("usage > 900")或dsUsage($"usage" > 900).?(有關(guān)這方面的更多信息,請(qǐng)參閱“使用數(shù)據(jù)集的成本”。)
-
對(duì)于數(shù)據(jù)集,我們使用編碼器,這是一種在 JVM 和 Spark 的數(shù)據(jù)類型內(nèi)部二進(jìn)制格式之間有效轉(zhuǎn)換數(shù)據(jù)的機(jī)制(更多信息請(qǐng)參見“數(shù)據(jù)集編碼器”)。
筆記
高階函數(shù)和函數(shù)式編程并不是 Spark 數(shù)據(jù)集獨(dú)有的;您也可以將它們與 DataFrame 一起使用。回想一下,DataFrame 是一個(gè)Dataset[Row],其中Row是一個(gè)通用的無類型 JVM 對(duì)象,可以保存不同類型的字段。方法簽名采用對(duì) 進(jìn)行操作的表達(dá)式或函數(shù)Row,這意味著每個(gè)Row的數(shù)據(jù)類型都可以作為表達(dá)式或函數(shù)的輸入值。
將 DataFrame 轉(zhuǎn)換為數(shù)據(jù)集
對(duì)于查詢和構(gòu)造的強(qiáng)類型檢查,您可以將 DataFrames 轉(zhuǎn)換為 Datasets。要將現(xiàn)有 DataFrame 轉(zhuǎn)換df為 Dataset 類型SomeCaseClass,只需使用df.as[SomeCaseClass]符號(hào)。我們之前看到了一個(gè)這樣的例子:
// In Scala val bloggersDS = spark.read.format("json").option("path", "/data/bloggers/bloggers.json").load().as[Bloggers]spark.read.format("json")返回 a?DataFrame<Row>,它在 Scala 中是Dataset[Row].?Using.as[Bloggers]指示 Spark 使用本章后面討論的編碼器,將對(duì)象從 Spark 的內(nèi)部內(nèi)存表示序列化/反序列化為 JVMBloggers對(duì)象.
數(shù)據(jù)集和數(shù)據(jù)幀的內(nèi)存管理
Spark 是一種密集型內(nèi)存分布式大數(shù)據(jù)引擎,因此其內(nèi)存的有效利用對(duì)其執(zhí)行速度至關(guān)重要。1縱觀其發(fā)布?xì)v史,Spark 對(duì)內(nèi)存的使用發(fā)生了顯著變化:
-
Spark 1.0 使用基于 RDD 的 Java 對(duì)象進(jìn)行內(nèi)存存儲(chǔ)、序列化和反序列化,這在資源方面很昂貴且速度很慢。此外,存儲(chǔ)是在 Java 堆上分配的,因此對(duì)于大型數(shù)據(jù)集,您只能受 JVM 垃圾收集 (GC) 的支配。
-
Spark 1.x 引入了Project Tungsten。它的一個(gè)突出特點(diǎn)是一種新的內(nèi)部基于行的格式,使用偏移量和指針在堆外內(nèi)存中布局?jǐn)?shù)據(jù)集和數(shù)據(jù)幀。Spark 使用一種稱為編碼器的高效機(jī)制在 JVM 與其內(nèi)部 Tungsten 格式之間進(jìn)行序列化和反序列化。在堆外分配內(nèi)存意味著 Spark 較少受到 GC 的阻礙。
-
Spark 2.x 引入了第二代 Tungsten 引擎,具有全階段代碼生成和向量化的基于列的內(nèi)存布局。基于現(xiàn)代編譯器的思想和技術(shù),這個(gè)新版本還利用現(xiàn)代 CPU 和緩存架構(gòu),通過“單指令多數(shù)據(jù)”(SIMD) 方法實(shí)現(xiàn)快速并行數(shù)據(jù)訪問。
數(shù)據(jù)集編碼器
編碼器將堆外內(nèi)存中的數(shù)據(jù)從 Spark 的內(nèi)部 Tungsten 格式轉(zhuǎn)換為 JVM Java 對(duì)象。換句話說,它們將數(shù)據(jù)集對(duì)象從 Spark 的內(nèi)部格式序列化和反序列化為 JVM 對(duì)象,包括原始數(shù)據(jù)類型。例如,anEncoder[T]將從 Spark 的內(nèi)部 Tungsten 格式轉(zhuǎn)換為Dataset[T].
Spark 內(nèi)置支持為基本類型(例如,字符串、整數(shù)、長整數(shù))、Scala 案例類和 JavaBeans 自動(dòng)生成編碼器。與 Java 和 Kryo 的序列化和反序列化相比,Spark 編碼器的速度要快得多。
在我們之前的 Java 示例中,我們顯式地創(chuàng)建了一個(gè)編碼器:
Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);然而,對(duì)于 Scala,Spark 會(huì)自動(dòng)為這些高效的轉(zhuǎn)換器生成字節(jié)碼。讓我們來看看 Spark 內(nèi)部基于 Tungsten 行的格式。
Spark 的內(nèi)部格式與 Java 對(duì)象格式
Java 對(duì)象有很大的開銷——標(biāo)頭信息、哈希碼、Unicode 信息等。即使是簡單的 Java 字符串(例如“abcd”)也需要 48 個(gè)字節(jié)的存儲(chǔ)空間,而不是您可能期望的 4 個(gè)字節(jié)。例如,想象一下創(chuàng)建MyClass(Int, String, String)對(duì)象的開銷。
Spark 不是為 Datasets 或 DataFrames 創(chuàng)建基于 JVM 的對(duì)象,而是分配堆外 Java 內(nèi)存來布置它們的數(shù)據(jù),并使用編碼器將數(shù)據(jù)從內(nèi)存表示轉(zhuǎn)換為 JVM 對(duì)象。例如,圖 6-1顯示了 JVM 對(duì)象如何在MyClass(Int, String, String)內(nèi)部存儲(chǔ)。
圖 6-1。JVM 對(duì)象存儲(chǔ)在由 Spark 管理的連續(xù)堆外 Java 內(nèi)存中
當(dāng)數(shù)據(jù)以這種連續(xù)方式存儲(chǔ)并通過指針?biāo)惴ê推屏吭L問時(shí),編碼器可以快速序列化或反序列化該數(shù)據(jù)。這意味著什么?
序列化和反序列化 (SerDe)
分布式計(jì)算中的一個(gè)并不新鮮的概念,其中數(shù)據(jù)經(jīng)常通過網(wǎng)絡(luò)在集群中的計(jì)算機(jī)節(jié)點(diǎn)之間傳輸,序列化和反序列化是發(fā)送方將類型化對(duì)象編碼(序列化)為二進(jìn)制表示或格式并解碼的過程(反序列化)從二進(jìn)制格式到接收器各自的數(shù)據(jù)類型對(duì)象。
例如,如果圖 6-1MyClass中的 JVM 對(duì)象必須在Spark 集群中的節(jié)點(diǎn)之間共享,則發(fā)送方會(huì)將其序列化為字節(jié)數(shù)組,而接收方會(huì)將其反序列化回類型為 的 JVM 對(duì)象。MyClass
JVM 有自己的內(nèi)置 Java 序列化器和反序列化器,但效率低下,因?yàn)?#xff08;正如我們?cè)谏弦还?jié)中看到的)JVM 在堆內(nèi)存中創(chuàng)建的 Java 對(duì)象是臃腫的。因此,該過程是緩慢的。
這就是數(shù)據(jù)集編碼器來救援的地方,原因如下:
-
Spark 的內(nèi)部 Tungsten 二進(jìn)制格式(參見圖6-1和6-2)將對(duì)象存儲(chǔ)在 Java 堆內(nèi)存之外,而且它很緊湊,因此這些對(duì)象占用的空間更少。
-
編碼器可以通過使用帶有內(nèi)存地址和偏移量的簡單指針?biāo)惴ū闅v內(nèi)存來快速序列化(圖 6-2)。
-
在接收端,編碼器可以快速將二進(jìn)制表示反序列化為 Spark 的內(nèi)部表示。編碼器不受 JVM 垃圾收集暫停的阻礙。
圖 6-2。Spark 內(nèi)部基于 Tungsten 行的格式
然而,正如我們接下來要討論的那樣,生活中大多數(shù)美好的事物都是有代價(jià)的。
使用數(shù)據(jù)集的成本
在第 3 章的“DataFrames 與 Datasets”中,我們概述了使用 Datasets 的一些好處——但這些好處是有代價(jià)的。如上一節(jié)所述,當(dāng)數(shù)據(jù)集被傳遞給高階函數(shù)時(shí),例如,或接受 lambdas 和函數(shù)參數(shù)的函數(shù),從 Spark 的內(nèi)部 Tungsten 格式反序列化到 JVM 對(duì)象會(huì)產(chǎn)生相關(guān)成本。filter()map()flatMap()
與在 Spark 中引入編碼器之前使用的其他序列化器相比,此成本很小且可以忍受。但是,在更大的數(shù)據(jù)集和許多查詢中,此成本會(huì)累積并可能影響性能。
降低成本的策略
減輕過度序列化和反序列化的一種策略是在查詢中使用DSL 表達(dá)式,并避免過度使用 lambda 作為匿名函數(shù)作為高階函數(shù)的參數(shù)。因?yàn)?lambda 在運(yùn)行時(shí)之前對(duì) Catalyst 優(yōu)化器是匿名且不透明的,所以當(dāng)您使用它們時(shí),它無法有效地識(shí)別您在做什么(您沒有告訴 Spark要做什么),因此無法優(yōu)化您的查詢(請(qǐng)參閱“Catalyst Optimizer”在第 3 章中)。
第二種策略是以最小化序列化和反序列化的方式將查詢鏈接在一起。將查詢鏈接在一起是 Spark 中的常見做法。
讓我們用一個(gè)簡單的例子來說明。假設(shè)我們有一個(gè)類型為 的數(shù)據(jù)集Person,其中Person定義為 Scala 案例類:
// In Scala Person(id: Integer, firstName: String, middleName: String, lastName: String, gender: String, birthDate: String, ssn: String, salary: String)我們想使用函數(shù)式編程向這個(gè)數(shù)據(jù)集發(fā)出一組查詢。
讓我們來看看我們編寫查詢效率低下的情況,以這種方式我們?cè)诓恢挥X中產(chǎn)生了重復(fù)序列化和反序列化的成本:
import java.util.Calendar val earliestYear = Calendar.getInstance.get(Calendar.YEAR) - 40personDS// Everyone above 40: lambda-1.filter(x => x.birthDate.split("-")(0).toInt > earliestYear)// Everyone earning more than 80K.filter($"salary" > 80000)// Last name starts with J: lambda-2.filter(x => x.lastName.startsWith("J"))// First name starts with D.filter($"firstName".startsWith("D")).count()正如您在圖 6-3中所看到的,每次我們從 lambda 移動(dòng)到 DSL( ) 時(shí),都會(huì)產(chǎn)生序列化和反序列化JVM 對(duì)象filter($"salary" > 8000)的成本。Person
圖 6-3。使用 lambdas 和 DSL 鏈接查詢的低效方式
相比之下,以下查詢僅使用 DSL,不使用 lambda。因此,它的效率要高得多——整個(gè)組合和鏈?zhǔn)讲樵儾恍枰蛄谢?反序列化:
personDS.filter(year($"birthDate") > earliestYear) // Everyone above 40.filter($"salary" > 80000) // Everyone earning more than 80K.filter($"lastName".startsWith("J")) // Last name starts with J.filter($"firstName".startsWith("D")) // First name starts with D.count()概括
在本章中,我們?cè)敿?xì)介紹了如何在 Java 和 Scala 中使用數(shù)據(jù)集。我們探索了 Spark 如何管理內(nèi)存以將 Dataset 構(gòu)造作為其統(tǒng)一和高級(jí) API 的一部分,并且我們考慮了與使用 Datasets 相關(guān)的一些成本以及如何降低這些成本。我們還向您展示了如何在 Spark 中使用 Java 和 Scala 的函數(shù)式編程結(jié)構(gòu)。
最后,我們深入了解了編碼器如何從 Spark 的內(nèi)部 Tungsten 二進(jìn)制格式序列化和反序列化為 JVM 對(duì)象。
總結(jié)
以上是生活随笔為你收集整理的【Apache Spark 】第 6 章Spark SQL 和数据集的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 0x80070057复制从服务器复制文件
- 下一篇: <Zhuuu_ZZ>让我们来康康脚本流程