RDD -- Transformation算子分析
RDD
RDD(Resilient Distributed Datasets) ,彈性分布式數據集, 是分布式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而創建,然而這些限制使得實現容錯的開銷很低。對開發者而言,RDD可以看作是Spark的一個對象,它本身運行于內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD ,不同的分片、 數據之間的依賴 、key-value類型的map數據都可以看做RDD。(注意:來自百度百科)
RDD 操作分類
RDD操作分為兩種算子:Transformation 和 Actions。這兩種算子區分本質是否觸發任務提交。
Transformation:只是把依賴關系和轉換關系記錄在血統中并不會觸發任務提交。
Actions:遇到這種算子就會觸發任務提交,并把結果返回。
Transformation:
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numPartitions]))
groupByKey([numPartitions])
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
RDD 繼承關系
map
官網 API 介紹
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
源碼
/*** Return a new RDD by applying a function to all elements of this RDD.*/def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}mapPartitions
官網 API 介紹
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
源碼
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}Transformations 算子都是一樣,創建一個新的RDD,并沒有去提交計算任務。
例子
map
map:對集合中每個元素操作
def map[U: ClassTag](f: T => U): RDD[U]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)val b = a.map(x => (x.length, x))b.collect.foreach(println)// (3,dog)// (5,tiger)// (4,lion)// (3,cat)// (7,panther)// (5,eagle)filter
filter:過濾
def filter(f: T => Boolean): RDD[T]
val a = sc.parallelize(1 to 10, 3)val b = a.filter(_ % 2 == 0)b.collect.foreach(println)// 2// 4// 6// 8// 10flatMap
flatMap和map很像,多了一個壓扁過程
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
val a = sc.parallelize(1 to 10, 5)a.flatMap(1 to _).collect.foreach(println)// 1// 1// 2// 1// 2// 3// 1// 2// 3// 4// 1// 2// 3// 4// 5// 1// 2// 3// 4// 5// 6// 1// 2// 3// 4// 5// 6// 7// 1// 2// 3// 4// 5// 6// 7// 8// 1// 2// 3// 4// 5// 6// 7// 8// 9// 1// 2// 3// 4// 5// 6// 7// 8// 9// 10mapPartitions
mapPartitions:在每個分區中執行map操作,和map操作的單位為單個元素,mapPartitions操作的單位為分區,在map操作數據庫等消耗資源時,用mapPartitions優化。
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
val a = sc.parallelize(1 to 9, 3)def myfunc[T](iter: Iterator[T]): Iterator[(T, T)] = {var res = List[(T, T)]()var pre = iter.nextwhile (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur}res.iterator}a.mapPartitions(myfunc).collect.foreach(println)// (2,3)// (1,2)// (5,6)// (4,5)// (8,9)// (7,8)mapPartitionsWithIndex
mapPartitionsWithIndex:函數作用同mapPartitions,不過提供了兩個參數,第一個參數為分區的索引
def main(args: Array[String]): Unit = {//first()//second()third()}def first(): Unit = {val x = sc.parallelize(List(1, 2, 3, 4, 5, 7, 8, 9, 10), 3)def myfunc1(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => index + ", " + x)}x.mapPartitionsWithIndex(myfunc1).collect().foreach(println)// 0, 1// 0, 2// 0, 3// 1, 4// 1, 5// 1, 7// 2, 8// 2, 9// 2, 10}def second(): Unit = {val randRDD = sc.parallelize(List((2, "cat"), (6, "mouse"), (7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)val rPartitioner = new RangePartitioner(3, randRDD)val partitioned = randRDD.partitionBy(rPartitioner)def myfunc2(index: Int, iter: Iterator[(Int, String)]): Iterator[String] = {iter.map(x => "[partID: " + index + ", val:" + x + "]")}partitioned.mapPartitionsWithIndex(myfunc2).collect().foreach(println)// [partID: 0, val:(2,cat)]// [partID: 0, val:(3,book)]// [partID: 0, val:(1,screen)]// [partID: 1, val:(4,tv)]// [partID: 1, val:(5,heater)]// [partID: 2, val:(6,mouse)]// [partID: 2, val:(7,cup)]}def third(): Unit = {val z = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)def myfunc3(index: Int, iter: Iterator[Int]): Iterator[String] = {iter.map(x => "[partID:" + index + ", val:" + x + "]")}z.mapPartitionsWithIndex(myfunc3).collect().foreach(println)// [partID:0, val:1]// [partID:0, val:2]// [partID:0, val:3]// [partID:1, val:4]// [partID:1, val:5]// [partID:1, val:6]}sample
sample : 從原來RDD隨機抽樣出一部分元素組成一個新的RDD
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T]
def main(args: Array[String]): Unit = {first()}def first(): Unit ={val a = sc.parallelize(1 to 10000,3)a.sample(false,0.001,444).collect().foreach(println)}// 120// 424// 477// 2349// 2691// 2773// 2988// 5143// 6449// 6659// 9820union, ++
union:對于兩個數據集進行合并操作(不會去除重復元素)
def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]
intersection
intersection : 求這個數據集的交集(會去除重復元素)
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]
distinct
distinct:去重
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
groupByKey
groupByKey和reduceByKey雖然兩個函數都能得出正確的結果, 但reduceByKey函數更適合使用在大數據集上。 這是因為Spark知道它可以在每個分區移動數據之前將輸出數據與一個共用的key結合。
reduceByKey
reduceByKey:類似于mapreduce的reduce階段
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
aggregateByKey
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ? U, combOp: (U, U) ? U) 先局部操作,再全局操作
zeroValue:分區操作初始值
seqOp:分區內操作規則
combOp:全局操作規則
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
join
join:相同的key join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
cogroup, groupWith
cogroup / groupWith : 是對最多三個RDD里key相同的,合并成集合
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions :根據給定的分區程序重新分區RDD,并在每個結果分區中根據鍵對記錄進行排序。
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
車遙遙,馬憧憧。
君游東山東復東,安得奮飛逐西風。
愿我如星君如月,夜夜流光相皎潔。
月暫晦,星常明。
留明待月復,三五共盈盈。
總結
以上是生活随笔為你收集整理的RDD -- Transformation算子分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Win10 LTSC 2021的BUG处
- 下一篇: 抢鞋软件bot服务器系统,抢鞋子bot机