Spark 常用算子详解(转换算子、行动算子、控制算子)
Spark簡介
Spark是專為大規模數據處理而設計的快速通用的計算引擎;
Spark擁有Hadoop MapReduce所具有的優點,但是運行速度卻比MapReduce有很大的提升,特別是在數據挖掘、機器學習等需要迭代的領域可提升100x倍的速度:
Spark流程
-
- 可以設置Application name;
- 可以設置運行模式及資源需求;
-
- SparkContext向資源管理器申請運行Executor資源,并啟動StandaloneExecutorbackend;
- Executor向SparkContext申請Task;
- SparkContext將程序分發給Executor;
- SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行;
- Task在Excutor上運行,運行完釋放所有的資源;
value 類型
| 輸入分區與輸出分區一對一型 | map flatMap mapPartitions glom |
| 輸入分區與輸出分區多對一型 | union cartesain |
| 輸入分區與輸出分區多對多型 | groupBy |
| 輸出分區為輸入分區子集型 | filter distinct substract sample takeSample |
| Cache型 | cache persist |
key-value類型
| 輸入分區與輸出分區一對一 | mapValues |
| 對單個RDD或兩個RDD聚集 | 單個RDD聚集: combineByKey reduceByKey partitionBy; 兩個RDD聚集: Cogroup |
| 連接 | join leftOutJoin和 rightOutJoin |
Action算子
| 無輸出 | foreach |
| HDFS | saveAsTextFile saveAsObjectFile |
| Scala集合和數據類型 | collect collectAsMap reduceByKeyLocally lookup count top reduce fold aggregate |
轉換算子(Transformations)
不觸發提交作業,只是完成作業中間過程處理;Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。Transformation參數類型為value或者key-value的形式;
轉換算子是延遲執行的,也叫懶加載執行
map
將原來RDD的每個數據通過map中的用戶自定義函數映射為一個新的元素,源碼中map算子相當于初始化一個RDD --------- f(x)=x -> y
- scala源碼 def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
flatMap
將原來 RDD 中的每個元素通過函數 f 轉換為新的元素,并將生成的 RDD 的每個集合中的元素合并為一個集合,內部創建 FlatMappedRDD(this,sc.clean(f))。 ------ f: T => TraversableOnce[U]
-
scala源碼
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))} -
例子
lines.flatMap{lines => {lines.split(" ")
mapPartitions
mapPartitions 函 數 獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作。 內 部 實 現 是 生 成 -------f (iter)=>iter.f ilter(_>=3)
-
scala源碼
def filter(f: T => Boolean): RDD[T] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(context, pid, iter) => iter.filter(cleanF),preservesPartitioning = true)}
glom
glom函數將每個分區形成一個數組,內部實現是返回的GlommedRDD。 圖4中的每個方框代表一個RDD分區。圖4中的方框代表一個分區。 該圖表示含有V1、 V2、 V3的分區通過函數glom形成一數組Array[(V1),(V2),(V3)]
-
scala源碼
def glom(): RDD[Array[T]] = withScope {new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))}
union
使用 union 函數時需要保證兩個 RDD 元素的數據類型相同,返回的 RDD 數據類型和被合并的 RDD 元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重可以使用 distinct()(并集)
-
scala源碼
def union(other: RDD[T]): RDD[T] = withScope {sc.union(this, other)}
cartesian
對 兩 個 RDD 內 的 所 有 元 素 進 行 笛 卡 爾 積 操 作。 操 作 后, 內 部 實 現 返 回CartesianRDD。圖6中左側大方框代表兩個 RDD,大方框內的小方框代表 RDD 的分區。右側大方框代表合并后的 RDD,大方框內的小方框代表分區。圖6中的大方框代表RDD,大方框中的小方框代表RDD分區。
-
scala源碼
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {new CartesianRDD(sc, this, other)}
groupBy
將元素通過函數生成相應的 Key,數據就轉化為 Key-Value 格式,之后將 Key 相同的元素分為一組。
函數實現如下:
1)將用戶函數預處理:
val cleanF = sc.clean(f)2)對數據 map 進行函數操作,最后再進行 groupByKey 分組操作。
this.map(t => (cleanF(t), t)).groupByKey(p)-
scala源碼
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy[K](f, defaultPartitioner(this))}
distinct
對數據進行去重
-
scala源碼
/*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)}/*** Return a new RDD containing the distinct elements in this RDD.*/def distinct(): RDD[T] = withScope {distinct(partitions.length)}
subtract
subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素;
-
scala源碼
/*** Return an RDD with the elements from `this` that are not in `other`.** Uses `this` partitioner/partition size, because even if `other` is huge, the resulting* RDD will be <= us.*/def subtract(other: RDD[T]): RDD[T] = withScope {subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))}/*** Return an RDD with the elements from `this` that are not in `other`.*/def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {subtract(other, new HashPartitioner(numPartitions))}/*** Return an RDD with the elements from `this` that are not in `other`.*/def subtract(other: RDD[T],p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {if (partitioner == Some(p)) {// Our partitioner knows how to handle T (which, since we have a partitioner, is// really (K, V)) so make a new Partitioner that will de-tuple our fake tuplesval p2 = new Partitioner() {override def numPartitions: Int = p.numPartitionsoverride def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)}
sample
sample 將 RDD 這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。內部實現是生成 SampledRDD(withReplacement, fraction, seed)。
函數參數設置:
withReplacement=true,表示有放回的抽樣。 withReplacement=false,表示無放回的抽樣。- scala源碼
takeSample
takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行Collect(),返回結果集為單機的數組
- scala源碼
mapValues
針對(key, Value)型數據中的value進行map操作,而不對key進行處理
- scala源碼
combineByKey
- scala源碼
reduceByKey
reduceByKey是比combineByKey更簡單的一種情況,只是兩個值合并成一個值 -------- *(Int,Int V) >> (Int, IntC)*- scala源碼
partitionBy
partitionBy函數對RDD進行分區操作;------ partitionBy(partitioner:Partitioner)
- scala源碼
cogroup
cogroup函數將兩個RDD進行協同劃分,對兩個RDD中的key-valuel類型的元素,每個RDD相同key的元素風別聚合為一個集合,并且返回兩個RDD中對應Key的元素集合的迭代器
- scala源碼
join
join 對兩個需要連接的 RDD 進行 cogroup函數操作,將相同 key 的數據能夠放到一個分區,在 cogroup 操作之后形成的新 RDD 對每個key 下的元素進行笛卡爾積的操作,返回的結果再展平,對應 key 下的所有元組形成一個集合。最后返回 RDD[(K, (V, W))]。
- scala源碼
sortyByKey(sortBy)
作用再(Key, Value)格式的數據上,根據Key進行升序或降序排序
- scala源碼
Action算子(行動算子)
本質上在Action算子中通過SparkContext觸發SparkContext提交job作業。Action 算子會觸發 Spark 提交作業(Job),并將數據輸出 Spark系統。
foreach
foreach對RDD中的每個元素都應用f函數操作,不返回RDD和Array,而返回Uint;(遍歷)
- scala源碼
saveAsTextFile
函數將數據輸出,存儲到HDFS的制定目錄下
saveAsObjectFile
將風趣中的每10個元素組成一個Array,然后將這個Array序列化,映射為Null,BytesWritable(Y)的元素,寫入HDFS為SequenceFile的格式;
map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))colloect
相當于toArray,collect將分布式的RDD返回為一個單機的scala Array數組,在這個數組上運用scala的函數式操作;
左側方框代表 RDD 分區,右側方框代表單機內存中的數組。通過函數操作,將結果返回到 Driver 程序所在的節點,以數組形式存儲。
collectAsMap
collectAsMap對(K,V)型的RDD數據返回一個單機HashMap;
對于重復K的RDD元素,后面的元素覆蓋前面的元素
lookup
Lookup函數對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。 這個函數處理優化的部分在于,如果這個RDD包含分區器,則只會對應處理K所在的分區,然后返回由(K,V)形成的Seq。 如果RDD不包含分區器,則需要對全RDD元素進行暴力掃描處理,搜索指定K對應的元素。 ------- lookup(key:K):Seq[V]
count
count(計數器)返回整個RDD的元素個數
defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sumtop
top可返回最大的k個元素
top(num:Int)(implicit ord:Ordering[T]):Array[T]- top返回最大的k個元素。
- take返回最小的k個元素。
- takeOrdered返回最小的k個元素,并且在返回的數組中保持元素的順序。
- first相當于top(1)返回整個RDD中的前k個元素,可以定義排序的方式Ordering[T],返回的是一個含前k個元素的數組.
reduce
reduce函數相當于對RDD中的元素進行reduceLeft函數的操作; ---- Some(iter.reduceLeft(cleanF))
reduceLeft先對兩個元素<K,V>進行reduce函數操作,然后將結果和迭代器取出的下一個元素<k,V>進行reduce函數操作,直到迭代器遍歷完所有元素,得到最后結果。在RDD中,先對每個分區中的所有元素<K,V>的集合分別進行reduceLeft。 每個分區形成的結果相當于一個元素<K,V>,再對這個結果集合進行reduceleft操作;
fold
fold和reduce的原理相同,但是與reduce不同,相當于每個reduce時,迭代器取的第一個元素是zeroValue;
控制算子
控制算子有三種,cache,persist,checkpoint,以上算子都可以將RDD持久化,持久化的單位是partition。cache和persist都是懶執行的。必須有一個action類算子觸發執行。checkpoint算子不僅能將RDD持久化到磁盤,還能切斷RDD之間的依賴關系;
cache
cache默認將RDD的數據持久化到內存中,相當與persist(MEMORY_ONLY)函數的功能;
persist
可以指定持久話的級別,最常用的是MEMORY_ONLTY和MEMORY_AND_DISK;"_2"表示副本數;
持久化有如下級別:
cache和persist的注意事項
- cache和persist都是懶執行,必須有一個action類算子觸發執行。
- cache和persist算子的返回值可以賦值給一個變量,在其他job中直接使用這個變量就是使用持久化的數據了。持久化的單位是partition
- cache和persist算子后不能立即緊跟action算子。
checkpoint
checkpoint將RDD持久化到磁盤,還可以切斷RDD之間的依賴關系
-
checkpoint 的執行原理:
- 當RDD的job執行完畢后,會從finalRDD從后往前回溯。
- 當回溯到某一個RDD調用了checkpoint方法,會對當前的RDD做一個標記。
- Spark框架會自動啟動一個新的job,重新計算這個RDD的數據,將數據持久化到HDFS上。
-
優化:對RDD執行checkpoint之前,最好對這個RDD先執行cache,這樣新啟動的job只需要將內存中的數據拷貝導HDFS上就可以,省去重新計算這一步;
例如:
SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("checkpoint");JavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("./checkpoint");JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));parallelize.checkpoint();parallelize.count();sc.stop();總結
以上是生活随笔為你收集整理的Spark 常用算子详解(转换算子、行动算子、控制算子)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用java输出一个月的日历表
- 下一篇: 汇编 跳转指令: JMP、JCXZ、JE