Spark RDD使用详解3--Value型Transformation算子
處理數據類型為Value型的Transformation算子可以根據RDD變換算子的輸入分區與輸出分區關系分為以下幾種類型:
1)輸入分區與輸出分區一對一型?
2)輸入分區與輸出分區多對一型?
3)輸入分區與輸出分區多對多型?
4)輸出分區為輸入分區子集型?
5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。 Cache算子對RDD分區進行緩存
輸入分區與輸出分區一對一型
(1)map
將原來RDD的每個數據項通過map中的用戶自定義函數f映射轉變為一個新的元素。源碼中的map算子相當于初始化一個RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。
圖中,每個方框表示一個RDD分區,左側的分區經過用戶自定義函數f:T->U映射為右側的新的RDD分區。但是實際只有等到Action算子觸發后,這個f函數才會和其他函數在一個Stage中對數據進行運算。 V1輸入f轉換輸出V’ 1。
源碼:
?/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
(2)flatMap
將原來RDD中的每個元素通過函數f轉換為新的元素,并將生成的RDD的每個集合中的元素合并為一個集合。 內部創建FlatMappedRDD(this,sc.clean(f))。
圖中,小方框表示RDD的一個分區,對分區進行flatMap函數操作,flatMap中傳入的函數為f:T->U,T和U可以是任意的數據類型。將分區中的數據通過用戶自定義函數f轉換為新的數據。外部大方框可以認為是一個RDD分區,小方框代表一個集合。 V1、 V2、 V3在一個集合作為RDD的一個數據項,轉換為V’ 1、 V’ 2、 V’ 3后,將結合拆散,形成為RDD中的數據項。
源碼:
?
?/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
?
(3)mapPartitions
mapPartitions函數獲取到每個分區的迭代器,在函數中通過這個分區整體的迭代器對整個分區的元素進行操作。 內部實現是生成MapPartitionsRDD。
圖中,用戶通過函數f(iter) => iter.filter(_>=3)對分區中的所有數據進行過濾,>=3的數據保留。一個方塊代表一個RDD分區,含有1、 2、 3的分區過濾只剩下元素3。
源碼:
?
?/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
?
(4)glom
glom函數將每個分區形成一個數組,內部實現是返回的RDD[Array[T]]。?
圖中,方框代表一個分區。 該圖表示含有V1、 V2、 V3的分區通過函數glom形成一個數組Array[(V1),(V2),(V3)]。
源碼:
?
?/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
?
輸入分區與輸出分區多對一型
(1)union
使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當于uion函數操作。
圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區,其他元素同理進行合并。
源碼:
?
?/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = {
if (partitioner.isDefined && other.partitioner == partitioner) {
new PartitionerAwareUnionRDD(sc, Array(this, other))
} else {
new UnionRDD(sc, Array(this, other))
}
}
?
(2)certesian
對兩個RDD內的所有元素進行笛卡爾積操作。操作后,內部實現返回CartesianRDD。?
左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。?
大方框代表RDD,大方框中的小方框代表RDD分區。 例如,V1和另一個RDD中的W1、 W2、 Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。
源碼:
?
?/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
?
輸入分區與輸出分區多對多型
groupBy
將元素通過函數生成相應的Key,數據就轉化為Key-Value格式,之后將Key相同的元素分為一組。
val cleanF = sc.clean(f)中sc.clean函數將用戶函數預處理;?
this.map(t => (cleanF(t), t)).groupByKey(p)對數據map進行函數操作,再對groupByKey進行分組操作。其中,p中確定了分區個數和分區函數,也就決定了并行化的程度。
圖中,方框代表一個RDD分區,相同key的元素合并到一個組。 例如,V1,V2合并為一個Key-Value對,其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。
源碼:
?
?/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
?
輸出分區為輸入分區子集型
(1)filter
filter的功能是對元素進行過濾,對每個元素應用f函數,返回值為true的元素在RDD中保留,返回為false的將過濾掉。 內部實現相當于生成FilteredRDD(this,sc.clean(f))。
圖中,每個方框代表一個RDD分區。 T可以是任意的類型。通過用戶自定義的過濾函數f,對每個數據項進行操作,將滿足條件,返回結果為true的數據項保留。 例如,過濾掉V2、 V3保留了V1,將區分命名為V1’。
源碼:
?
?/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
?
(2)distinct
distinct將RDD中的元素進行去重操作。?
圖中,每個方框代表一個分區,通過distinct函數,將數據去重。 例如,重復數據V1、 V1去重后只保留一份V1。
源碼:
?
?/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
(3)subtract
subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。?
圖中,左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
源碼:
?
?/**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
subtract(other, new HashPartitioner(numPartitions))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
override def numPartitions = p.numPartitions
override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
// the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
// partitioned by the right/real keys (e.g. p).
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
?
(4)sample
sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。?
參數說明:
withReplacement=true, 表示有放回的抽樣;?
withReplacement=false, 表示無放回的抽樣。
每個方框是一個RDD分區。通過sample函數,采樣50%的數據。V1、V2、U1、U2、U3、U4采樣出數據V1和U1、U2,形成新的RDD。
源碼:
?
?/**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
?
(5)takeSample
takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行collect(),返回結果的集合為單機的數組。
圖中,左側的方框代表分布式的各個節點上的分區,右側方框代表單機上返回的結果數組。通過takeSample對數據采樣,設置為采樣一份數據,返回結果為V1。
源碼:
?
?/**
* Return a fixed-size sampled subset of this RDD in an array
*
* @param withReplacement whether sampling is done with replacement
* @param num size of the returned sample
* @param seed seed for the random number generator
* @return sample of specified size in an array
*/
def takeSample(withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = {
val numStDev = 10.0
if (num < 0) {
throw new IllegalArgumentException("Negative number of elements requested")
} else if (num == 0) {
return new Array[T](0)
}
val initialCount = this.count()
if (initialCount == 0) {
return new Array[T](0)
}
val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
if (num > maxSampleSize) {
throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
s"$numStDev * math.sqrt(Int.MaxValue)")
}
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
return Utils.randomizeInPlace(this.collect(), rand)
}
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for the initial size
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
?
Cache型
(1)cache
cache將RDD元素從磁盤緩存到內存,相當于persist(MEMORY_ONLY)函數的功能。?
圖中,每個方框代表一個RDD分區,左側相當于數據分區都存儲在磁盤,通過cache算子將數據緩存在內存。
源碼:
?
?/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
?
(2)persist
persist函數對RDD進行緩存操作。數據緩存在哪里由StorageLevel枚舉類型確定。?
有幾種類型的組合,DISK代表磁盤,MEMORY代表內存,SER代表數據是否進行序列化存儲。StorageLevel是枚舉類型,代表存儲模式,如,MEMORY_AND_DISK_SER代表數據可以存儲在內存和磁盤,并且以序列化的方式存儲。 其他同理。
圖中,方框代表RDD分區。 disk代表存儲在磁盤,mem代表存儲在內存。 數據最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數據緩存到內存,但是有的分區無法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。
源碼:
?
?/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
sc.persistRDD(this)
// Register the RDD with the ContextCleaner for automatic GC-based cleanup
sc.cleaner.foreach(_.registerRDDForCleanup(this))
storageLevel = newLevel
this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
原文鏈接:http://blog.csdn.net/jasonding1354
總結
以上是生活随笔為你收集整理的Spark RDD使用详解3--Value型Transformation算子的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark RDD使用详解1--RDD原
- 下一篇: Spark RDD使用详解4--Key-