生活随笔
收集整理的這篇文章主要介紹了
Spark算子:RDD创建操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
關鍵字:Spark RDD 創建、parallelize、makeRDD、textFile、hadoopFile、hadoopRDD、newAPIHadoopFile、newAPIHadoopRDD
從集合創建RDD
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
從一個Seq集合創建RDD。
參數1:Seq集合,必須。
參數2:分區數,默認為該Application分配到的資源的CPU核數
?
scala>?var?rdd?=?sc.parallelize(1?to?10)rdd:?org.apache.spark.rdd.RDD[Int]?=?ParallelCollectionRDD[2]?at parallelize at?:21?scala>?rdd.collectres3:?Array[Int]?=?Array(1,?2,?3,?4,?5,?6,?7,?8,?9,?10)?scala>?rdd.partitions.sizeres4:?Int?=?15?//設置RDD為3個分區scala>?var?rdd2?=?sc.parallelize(1?to?10,3)rdd2:?org.apache.spark.rdd.RDD[Int]?=?ParallelCollectionRDD[3]?at parallelize at?:21?scala>?rdd2.collectres5:?Array[Int]?=?Array(1,?2,?3,?4,?5,?6,?7,?8,?9,?10)?scala>?rdd2.partitions.sizeres6:?Int?=?3?
def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
這種用法和parallelize完全相同
def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]
該用法可以指定每一個分區的preferredLocations。
?
scala>?var?collect?=?Seq((1?to?10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),(11?to?15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))collect:?Seq[(scala.collection.immutable.Range.Inclusive,?Seq[String])]?=?List((Range(1,?2,?3,?4,?5,?6,?7,?8,?9,?10),List(slave007.lxw1234.com,?slave002.lxw1234.com)),?(Range(11,?12,?13,?14,?15),List(slave013.lxw1234.com,?slave015.lxw1234.com)))?scala>?var?rdd?=?sc.makeRDD(collect)rdd:?org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive]?=?ParallelCollectionRDD[6]?at makeRDD at?:23?scala>?rdd.partitions.sizeres33:?Int?=?2?scala>?rdd.preferredLocations(rdd.partitions(0))res34:?Seq[String]?=?List(slave007.lxw1234.com,?slave002.lxw1234.com)?scala>?rdd.preferredLocations(rdd.partitions(1))res35:?Seq[String]?=?List(slave013.lxw1234.com,?slave015.lxw1234.com)??
指定分區的優先位置,對后續的調度優化有幫助。
?
從外部存儲創建RDD
//從hdfs文件創建.
?
//從hdfs文件創建scala>?var?rdd?=?sc.textFile("hdfs:///tmp/lxw1234/1.txt")rdd:?org.apache.spark.rdd.RDD[String]?=?MapPartitionsRDD[26]?at textFile at?:21?scala>?rdd.countres48:?Long?=?4?//從本地文件創建scala>?var?rdd?=?sc.textFile("file:///etc/hadoop/conf/core-site.xml")rdd:?org.apache.spark.rdd.RDD[String]?=?MapPartitionsRDD[28]?at textFile at?:21?scala>?rdd.countres49:?Long?=?97?
注意這里的本地文件路徑需要在Driver和Executor端存在。
hadoopFile
sequenceFile
objectFile
newAPIHadoopFile
hadoopRDD
newAPIHadoopRDD
比如:從HBase創建RDD
?
scala>?import?org.apache.hadoop.hbase.{HBaseConfiguration,?HTableDescriptor,?TableName}import?org.apache.hadoop.hbase.{HBaseConfiguration,?HTableDescriptor,?TableName}?scala>?import?org.apache.hadoop.hbase.mapreduce.TableInputFormatimport?org.apache.hadoop.hbase.mapreduce.TableInputFormat?scala>?import?org.apache.hadoop.hbase.client.HBaseAdminimport?org.apache.hadoop.hbase.client.HBaseAdmin?scala>?val conf?=?HBaseConfiguration.create()scala>?conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")scala>?var?hbaseRDD?=?sc.newAPIHadoopRDD(conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])?scala>?hbaseRDD.countres52:?Long?=?1
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的Spark算子:RDD创建操作的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。