生活随笔
收集整理的這篇文章主要介紹了
Spark RDD创建操作
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
從集合創(chuàng)建RDD
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
從一個(gè)Seq集合創(chuàng)建RDD。
參數(shù)1:Seq集合,必須。
參數(shù)2:分區(qū)數(shù),默認(rèn)為該Application分配到的資源的CPU核數(shù)
scala> var rdd = sc.parallelize(1 to 10)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21?scala> rdd.collectres3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)?scala> rdd.partitions.sizeres4: Int = 15?//設(shè)置RDD為3個(gè)分區(qū)scala> var rdd2 = sc.parallelize(1 to 10,3)rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21?scala> rdd2.collectres5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)?scala> rdd2.partitions.sizeres6: Int = 3?
def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
這種用法和parallelize完全相同
def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]
該用法可以指定每一個(gè)分區(qū)的preferredLocations。
scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),(11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))?scala> var rdd = sc.makeRDD(collect)rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23?scala> rdd.partitions.sizeres33: Int = 2?scala> rdd.preferredLocations(rdd.partitions(0))res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)?scala> rdd.preferredLocations(rdd.partitions(1))res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)??
指定分區(qū)的優(yōu)先位置,對(duì)后續(xù)的調(diào)度優(yōu)化有幫助。
?
從外部存儲(chǔ)創(chuàng)建RDD
//從hdfs文件創(chuàng)建.
//從hdfs文件創(chuàng)建scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21?scala> rdd.countres48: Long = 4?//從本地文件創(chuàng)建scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21?scala> rdd.countres49: Long = 97?
注意這里的本地文件路徑需要在Driver和Executor端存在。
hadoopFile
sequenceFile
objectFile
newAPIHadoopFile
hadoopRDD
newAPIHadoopRDD
比如:從HBase創(chuàng)建RDD
scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}?scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.mapreduce.TableInputFormat?scala> import org.apache.hadoop.hbase.client.HBaseAdminimport org.apache.hadoop.hbase.client.HBaseAdmin?scala> val conf = HBaseConfiguration.create()scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")scala> var hbaseRDD = sc.newAPIHadoopRDD(conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])?scala> hbaseRDD.countres52: Long = 1?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)
總結(jié)
以上是生活随笔為你收集整理的Spark RDD创建操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。