Spark性能优化 -- Spark SQL、DataFrame、Dataset
本文將詳細(xì)分析和總結(jié)Spark SQL及其DataFrame、Dataset的相關(guān)原理和優(yōu)化過程。
Spark SQL簡介
Spark SQL是Spark中 具有 大規(guī)模關(guān)系查詢的結(jié)構(gòu)化數(shù)據(jù)處理 模塊(spark核心組件:spark sql,spark streaming,spark mllib,spark GraphX)。spark sql支持大規(guī)模的分布式內(nèi)存計(jì)算,并且模糊了RDD與 relational table 之間的界限,DataFrame API和Datasets API是與Spark SQL進(jìn)行交互的方式。
Spark SQL在Spark Core之上運(yùn)行。它允許開發(fā)人員從Hive表和Parquet文件導(dǎo)入關(guān)系數(shù)據(jù),對(duì)導(dǎo)入的數(shù)據(jù)和現(xiàn)有RDD運(yùn)行SQL查詢,并輕松將RDD寫出到Hive表或Parquet文件。正如Spark SQL提供的DataFrame API一樣,它可以在外部數(shù)據(jù)源和Sparks內(nèi)置分布式集合上執(zhí)行關(guān)系操作。Spark SQL引入了稱為Catalyst的可擴(kuò)展優(yōu)化器。
Spark SQL使用結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)的3種主要功能,如:a) 它在Scala,Java和Python中均可使用DataFrame。同樣,簡化了結(jié)構(gòu)化數(shù)據(jù)集的工作,DataFrames與關(guān)系數(shù)據(jù)庫中的表相似。b) Spark SQL以各種結(jié)構(gòu)化格式可以讀取和寫入數(shù)據(jù)。例如,Hive Table,JSON和Parquet。c) 我們可以使用Spark SQL語句 查詢數(shù)據(jù),在Spark程序內(nèi)部以及從外部工具連接到Spark SQL。
Spark SQL特點(diǎn)如下:a) 數(shù)據(jù)兼容:可從Hive表、外部數(shù)據(jù)庫(JDBC)、RDD、Parquet文件、JSON文件獲取數(shù)據(jù),可通過Scala方法或SQL方式操作這些數(shù)據(jù),并把結(jié)果轉(zhuǎn)回RDD。b) 組件擴(kuò)展:SQL語法解析器、分析器、優(yōu)化器均可重新定義。c) 性能優(yōu)化:內(nèi)存列存儲(chǔ)、動(dòng)態(tài)字節(jié)碼生成等優(yōu)化技術(shù),內(nèi)存緩存數(shù)據(jù)。d) 多語言支持:Scala、Java、Python、R。
Spark SQL執(zhí)行計(jì)劃
Catalyst Optimizer
Catalyst Optimizer是基于scala中的函數(shù)編寫的。Catalyst Optimizer支持基于規(guī)則和基于成本優(yōu)化。
基于規(guī)則的優(yōu)化中,基于規(guī)則的優(yōu)化器使用規(guī)則集來確定如何執(zhí)行查詢。
而基于成本的優(yōu)化旨在找到執(zhí)行SQL語句的最合適方法。基于成本的優(yōu)化中,使用規(guī)則生成多個(gè)計(jì)劃,然后計(jì)算其成本。
Unresolved logical Plan:與編譯器非常類似,spark的優(yōu)化是多階段的,在執(zhí)行任何優(yōu)化之前,需要解析表達(dá)式的引用和類型。
Logical Plan:在Unresolved logical Plan基礎(chǔ)上加上Schema信息,spark直接對(duì)Logical Plan進(jìn)行簡化和優(yōu)化,生成一個(gè)優(yōu)化后的logical Plan。這些簡化可以用匹配模式等規(guī)則來編寫。優(yōu)化器不僅限于模式匹配,而且規(guī)則還可以包含任意的Scala代碼。
一旦logical plan得到優(yōu)化,spark將生成一個(gè)物理計(jì)劃。物理計(jì)劃階段既有基于規(guī)則的優(yōu)化,也有基于成本的優(yōu)化,以生成最佳的物理計(jì)劃。
Spark SQL執(zhí)行計(jì)劃
Spark SQL優(yōu)化可提高開發(fā)人員的生產(chǎn)力以及他們編寫的查詢的性能。一個(gè)好的查詢優(yōu)化器會(huì)自動(dòng)重寫關(guān)系查詢,以使用諸如早期過濾數(shù)據(jù),利用可用索引,甚至確保以最有效的順序連接不同的數(shù)據(jù)源之類的技術(shù)來更有效地執(zhí)行。
通過執(zhí)行這些轉(zhuǎn)換,優(yōu)化器改善了關(guān)系查詢的執(zhí)行時(shí)間,并使開發(fā)人員從專注于其應(yīng)用程序的語義而非性能上解放出來。
Catalyst利用Scala的強(qiáng)大功能(例如模式匹配和運(yùn)行時(shí)元編程),使開發(fā)人員可以簡明地指定復(fù)雜的關(guān)系優(yōu)化。
SparkSession
在spark2.0版本之前
sparkContext:在2.0之前,SparkContext作為spark應(yīng)用程序的入口,spark驅(qū)動(dòng)程序利用spark context連接集群。
sparkConf :用來指定配置參數(shù),例如APPName、或指定spark驅(qū)動(dòng),應(yīng)用,core數(shù)量等等。
為了使用SQL、HIVE和Streaming的api,需要?jiǎng)?chuàng)建單獨(dú)的Context
在spark2.0及其后續(xù)版本
SparkSession提供了與底層Spark功能交互的單一入口點(diǎn),并允許使用Dataframe和DataSet API編寫Spark程序。sparkContext提供的所有功能也都可以在sparkSession中使用。
為了使用SQL、HIVE和Streaming的api,不需要?jiǎng)?chuàng)建單獨(dú)的Context,因?yàn)閟parkSession包含所有api。
一旦SparkSession被實(shí)例化,我們就可以配置Spark的運(yùn)行時(shí)配置屬性
Schemas
和RDD一樣,Spark SQL下的DataFrame和Dataset也是一個(gè)分布式的集合。但是相對(duì)于RDD,DataFrame和Dataset多了一個(gè)額外的Schema信息。如上面所述,Schemas可在Catalyst優(yōu)化器中使用。
通過case class指定schema
case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])case class PandaPlace(name: String, pandas: Array[RawPanda]) def createAndPrintSchema() = {val damao = RawPanda(1, "M1B 5K7", "giant", true, Array(0.1, 0.1))val pandaPlace = PandaPlace("toronto", Array(damao))val df = session.createDataFrame(Seq(pandaPlace))df.printSchema() }通過StructType直接指定Schema
import org.apache.spark.{SparkContext, SparkConf}val personRDD = sc.textFile(args(0)).map(_.split(" ")) //通過StructType直接指定每個(gè)字段的schema val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("age", IntegerType, true))) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應(yīng)用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)DataFrame
DataFrame與RDD類似,同樣擁有 不變性,彈性,分布式計(jì)算的特性,也有惰性設(shè)計(jì),有transform(轉(zhuǎn)換)與action(執(zhí)行)操作之分。相對(duì)于RDD,它能處理大量結(jié)構(gòu)化數(shù)據(jù),DataFrame包含帶有Schema的行,類似于pandas的DataFrame的 header行。
注意:相對(duì)于RDD的lazy設(shè)計(jì),DataFrame只是部分的lazy,例如schema是立即執(zhí)行的。為什么使用DataFrame
相對(duì)于RDD,DataFrame提供了內(nèi)存管理和優(yōu)化的執(zhí)行計(jì)劃。
自定義內(nèi)存管理:這也被稱為Tungsten,Spark是由scala開發(fā)的,JVM的實(shí)現(xiàn)帶來了一些性能上的限制和弊端(例如GC上的overhead,java序列化耗時(shí)),使得Spark在性能上無法和一些更加底層的語言(例如c,可以對(duì)memory進(jìn)行高效管理,從而利用hardware的特性)相媲美,Tungsten設(shè)計(jì)了一套內(nèi)存管理機(jī)制,而不再是交給JVM托管,Spark的operation直接使用分配的binary data而不是JVM objects。
優(yōu)化執(zhí)行計(jì)劃:這也被稱為query optimizer(例如Catalyst Optimizer)。使用此選項(xiàng),將為查詢的執(zhí)行創(chuàng)建一個(gè)優(yōu)化的執(zhí)行計(jì)劃。一旦優(yōu)化的計(jì)劃被創(chuàng)建,最終將在Spark的rdd上執(zhí)行。
關(guān)于jvm內(nèi)存,可查看 JVM中的堆外內(nèi)存(off-heap memory)與堆內(nèi)內(nèi)存(on-heap memory)[1]更多關(guān)于 Tungsten,可查看 Tungsten-github[2]
DataFrame 經(jīng)驗(yàn) tips:
$ 可以用來隱式的指定DataFrame中的列。
Spark DataFrame 中 === 用 =!= 來過濾特定列的行數(shù)據(jù)。
相對(duì)filter操作,distinct和dropDuplicates可能會(huì)引起shuffle過程,因此可能會(huì)比較慢。
與RDD中g(shù)roupby效率低下不同,DataFrame中Groupby已經(jīng)經(jīng)過本地聚合再全局聚合(DataFrame / Dataset groupBy behaviour/optimization[3])
如果你需要計(jì)算各種復(fù)雜的統(tǒng)計(jì)運(yùn)算,建議在GroupData(groupby后)執(zhí)行:
在hive Data上,有時(shí)使用sql表達(dá)式操作比直接在DataFrame上操作,效率更高
Dataset
Dataset是SparkSQL中的一種數(shù)據(jù)結(jié)構(gòu),它是強(qiáng)類型的,包含指定的schema(指定了變量的類型)。Dataset是對(duì)DataFrame API的擴(kuò)展。Spark Dataset 提供了類型安全和面向?qū)ο蟮木幊探涌凇jP(guān)于強(qiáng)類型和類型安全的定義可參考 Magic lies here - Statically vs Dynamically Typed Languages[4]
Dataset有以下特點(diǎn):
在編寫代碼時(shí),有如RDD般的方便和靈活。
有如DataFrame優(yōu)化表現(xiàn)(同樣使用Tungsten 和 Catalyst Query Optimizer)
具有scala語言的靜態(tài)、類型安全的特點(diǎn)
使用spark Dataset,可以在編譯時(shí)檢查語法和分析,而Dataframe、rdd或常規(guī)SQL查詢不能做到。
RDD,DataFrame,Dataset區(qū)別
數(shù)據(jù)格式上差別
RDD:它可以方便有效地處理結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)。但和DataFrame和DataSets不一樣,RDD并不能推斷schema信息,而是要求用戶指定它。
DataFrame:它只適用于結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù),可以推斷schema信息。DataFrames允許Spark管理schema。
Dataset:它還可以有效地處理結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù),它以行的JVM對(duì)象或行對(duì)象的集合的形式表示數(shù)據(jù),一行就是一個(gè)通用的無類型的 JVM 對(duì)象。
三者互轉(zhuǎn)
在這里插入圖片描述RDD轉(zhuǎn)DataFrame(行動(dòng)操作,立即執(zhí)行)時(shí),需要指定schema信息,有如下三種方法:
def createFromCaseClassRDD(input: RDD[PandaPlace]) = {// Create DataFrame explicitly using session and schema inferenceval df1 = session.createDataFrame(input)// Create DataFrame using session implicits and schema inferenceval df2 = input.toDF()// Create a Row RDD from our RDD of case classesval rowRDD = input.map(pm => Row(pm.name,pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes))))val pandasType = ArrayType(StructType(List(StructField("id", LongType, true),StructField("zip", StringType, true),StructField("happy", BooleanType, true),StructField("attributes", ArrayType(FloatType), true))))// Create DataFrame explicitly with specified schemaval schema = StructType(List(StructField("name", StringType, true),StructField("pandas", pandasType)))val df3 = session.createDataFrame(rowRDD, schema)}DataFrame轉(zhuǎn)RDD(轉(zhuǎn)換操作,行動(dòng)操作再執(zhí)行),簡單的df.rdd得到的是個(gè)Row Object,因?yàn)槊啃锌梢园我鈨?nèi)容,你需要指定特別的類型,這樣你才能獲取每列的內(nèi)容:
def toRDD(input: DataFrame): RDD[RawPanda] = {val rdd: RDD[Row] = input.rddrdd.map(row => RawPanda(row.getAs[Long](0 "Long"), row.getAs[String](1 "String"),row.getAs[String](2 "String"), row.getAs[Boolean](3 "Boolean"), row.getAs[Array[Double]](4 "Array[Double]")))}轉(zhuǎn)Dataset
def fromDF(df: DataFrame): Dataset[RawPanda] = {df.as[RawPanda]//RawPanda為一個(gè)case class } // rdd轉(zhuǎn) Dataset,可以先轉(zhuǎn) DataFrame再轉(zhuǎn)Dataset /** * Illustrate converting a Dataset to an RDD */ def toRDD(ds: Dataset[RawPanda]): RDD[RawPanda] = {ds.rdd } /** * Illustrate converting a Dataset to a DataFrame */ def toDF(ds: Dataset[RawPanda]): DataFrame = {ds.toDF() }
靜態(tài)類型與運(yùn)行時(shí)類型安全
如果你用的是 Spark SQL 的查詢語句,要直到運(yùn)行時(shí)你才會(huì)發(fā)現(xiàn)有語法錯(cuò)誤(這樣做代價(jià)很大),而如果你用的是 DataFrame 和 Dataset,你在編譯時(shí)就可以捕獲syntax errors(這樣就節(jié)省了開發(fā)者的時(shí)間和整體代價(jià))。也就是說,當(dāng)你在 DataFrame 中調(diào)用了 API 之外的函數(shù)時(shí),編譯器就可以發(fā)現(xiàn)這個(gè)錯(cuò)。不過,如果你使用了一個(gè)不存在的字段名字,那就要到運(yùn)行時(shí)才能發(fā)現(xiàn)錯(cuò)誤了。
Dataset API 都是用 lambda 函數(shù)和 JVM 類型對(duì)象表示的,所有不匹配的類型參數(shù)都可以在編譯時(shí)發(fā)現(xiàn)。而且在使用 Dataset 時(shí),你的Analysis errors 也會(huì)在編譯時(shí)被發(fā)現(xiàn),這樣就節(jié)省了開發(fā)者的時(shí)間和代價(jià)。例如DataFrame編譯時(shí)不檢查列信息(例如無論你寫df.select("name") 還是 df.select("naame") 編譯時(shí)均不會(huì)報(bào)錯(cuò),而實(shí)際運(yùn)行時(shí)才會(huì)報(bào)錯(cuò)),而Dataset在編譯時(shí)就會(huì)檢查到該類錯(cuò)誤。
分區(qū)方式
如何針對(duì)數(shù)據(jù)分布自定義分區(qū)方式,這對(duì)于避免令人頭痛的數(shù)據(jù)傾斜非常重要。
分階段分區(qū)聚合
使用map-side預(yù)聚合的shuffle操作。所謂的map-side預(yù)聚合,說的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作,類似于MapReduce中的本地combiner。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key,因?yàn)槎鄺l相同的key都被聚合起來了。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸,性能相對(duì)來說比較差。與RDD中g(shù)roupby效率低下不同,DataFrame中Groupby已經(jīng)經(jīng)過本地聚合再全局聚合
加隨機(jī)數(shù)分區(qū)分階段聚合。這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù),比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì)變成了(1_hello, 2) (2_hello, 2)。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。--- 將原本相同的key通過附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過多的問題。接著去除掉隨機(jī)前綴,再次進(jìn)行全局聚合,就可以得到最終的結(jié)果。加隨機(jī)數(shù)參考代碼如下:
Hash Partitioning in Spark 與 Range Partitioning in Spark
可參考 Spark中的分區(qū)方法詳解(https://www.cnblogs.com/tongxupeng/p/10435976.html),個(gè)人感覺這篇博客已經(jīng)寫得非常詳細(xì)完整。
DataFrame.repartition(col):可由 指定的列 表達(dá)式來進(jìn)行分區(qū),默認(rèn)hash分區(qū) (隨機(jī)key) 方式
DataFrame.repartitionByRange(col): 可由 指定的列 表達(dá)式來進(jìn)行分區(qū),默認(rèn)Range分區(qū) (隨機(jī)key) 方式
Spark.DataFrame 與 DataSet 無自定義分區(qū)方式,可先將rdd自定分區(qū)完成,再轉(zhuǎn)成DataFrame。
sqlContext.createDataFrame(df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,df.schema )UDFs & UDAFs 使用
User-defined functions(udfs) 和 user-defined aggregate functions(udafs) 提供了使用自己的自定義代碼擴(kuò)展DataFrame和SQL API的方法,同時(shí)保留了Catalyst優(yōu)化器。這對(duì)性能的提高非常有用,否則您需要將數(shù)據(jù)轉(zhuǎn)換為RDD(并可能再次轉(zhuǎn)換)來執(zhí)行任意函數(shù),這非常昂貴。udf和udaf也可以使用SQL查詢表達(dá)式進(jìn)行 內(nèi)部訪問。
注:使用python編寫udf和udaf函數(shù),會(huì)丟失性能優(yōu)勢。UDFs
spark 2.x:
def get_max(x: Double, y: Double): Double={if ( x > y )xelsey}val udf_get_max = udf(get_max _) df = df.withColumn("max_fea", udf_get_max(df("fea1"), df("fea2")))UDAFs
相對(duì)于udfs,udafs編寫較為復(fù)雜,需要繼承 UserDefinedAggregateFunction 并實(shí)現(xiàn)里面的函數(shù),但UDAFs的性能相當(dāng)好。可以直接在列上使用UDAF,也可以像對(duì)非聚合UDF那樣將其添加到函數(shù)注冊(cè)表中。
計(jì)算平均值的UDAF例子代碼:
import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._object AverageUserDefinedAggregateFunction extends UserDefinedAggregateFunction {// 聚合函數(shù)的輸入數(shù)據(jù)結(jié)構(gòu)override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)// 緩存區(qū)數(shù)據(jù)結(jié)構(gòu)override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)// 聚合函數(shù)返回值數(shù)據(jù)結(jié)構(gòu)override def dataType: DataType = DoubleType// 聚合函數(shù)是否是冪等的,即相同輸入是否總是能得到相同輸出override def deterministic: Boolean = true// 初始化緩沖區(qū)override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// 給聚合函數(shù)傳入一條新數(shù)據(jù)進(jìn)行處理override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (input.isNullAt(0)) returnbuffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}// 合并聚合函數(shù)緩沖區(qū)override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 計(jì)算最終結(jié)果override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)}然后在主函數(shù)里注冊(cè)并使用該函數(shù):
spark.read.json("data/user").createOrReplaceTempView("v_user") spark.udf.register("u_avg", AverageUserDefinedAggregateFunction)// 將整張表看做是一個(gè)分組對(duì)求所有人的平均年齡 spark.sql("select count(1) as count, u_avg(age) as avg_age from v_user").show()// 按照性別分組求平均年齡 spark.sql("select sex, count(1) as count, u_avg(age) as avg_age from v_user group by sex").show()參考
High Performance Spark
https://data-flair.training/blogs/spark-sql-optimization/
https://www.quora.com/What-is-the-difference-between-spark-context-and-spark-session
https://www.cnblogs.com/netoxi/p/7223413.html
https://www.infoq.cn/article/three-apache-spark-apis-rdds-dataframes-and-datasets/
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
https://www.cnblogs.com/tongxupeng/p/10435976.html
參考資料
[1]
JVM中的堆外內(nèi)存(off-heap memory)與堆內(nèi)內(nèi)存(on-heap memory): https://blog.csdn.net/khxu666/article/details/80775635
[2]Tungsten-github: https://github.com/hustnn/TungstenSecret
[3]DataFrame / Dataset groupBy behaviour/optimization: https://stackoverflow.com/questions/32902982/dataframe-dataset-groupby-behaviour-optimization
[4]Magic lies here - Statically vs Dynamically Typed Languages: https://android.jlelse.eu/magic-lies-here-statically-typed-vs-dynamically-typed-languages-d151c7f95e2b
往期精彩回顧適合初學(xué)者入門人工智能的路線及資料下載機(jī)器學(xué)習(xí)在線手冊(cè)深度學(xué)習(xí)在線手冊(cè)AI基礎(chǔ)下載(pdf更新到25集)備注:加入本站微信群或者qq群,請(qǐng)回復(fù)“加群”獲取一折本站知識(shí)星球優(yōu)惠券,請(qǐng)回復(fù)“知識(shí)星球”
喜歡文章,點(diǎn)個(gè)在看
總結(jié)
以上是生活随笔為你收集整理的Spark性能优化 -- Spark SQL、DataFrame、Dataset的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度学习到底有多难?掌握方法很重要!
- 下一篇: 公子龙:我读研期间通过实习和比赛收入五十