RDD 编程
文章目錄
- 1. RDD 創建
- 2. RDD轉換
- 3. RDD動作
- 4. 持久化
- 5. 分區
- 6. 文件數據讀寫
- 6.1 本地
- 6.2 hdfs
- 6.3 Json文件
- 6.4 Hbase
學習自 MOOC Spark編程基礎
1. RDD 創建
- 從文件創建
- 從 hdfs 創建
- 通過并行集合創建
2. RDD轉換
- filter(func),過濾
- map(func) , 映射
輸出: n 個元素,每個元素是一個 String 數組
- flatMap(func)
輸出:所有單詞
- groupByKey(), reduceByKey(func)
按 key 合并,得到 value list,后者還可以根據 func 對 value list 進行操作
3. RDD動作
spark 遇到 RDD action 時才會真正的開始執行,遇到轉換的時候,只是記錄下來,并不真正執行
- count() ,統計 rdd 元素個數
- collect(),以數組形式返回所有的元素
- first(),返回第一個元素
- take(n),返回前 n 個元素
- reduce(func),聚合
- foreach(func),遍歷
4. 持久化
- persist(),對一個 rdd 標記為持久化,遇到第一個 rdd動作 時,才真正持久化
5. 分區
- 提高并行度
- 減小通信開銷
分區原則:分區個數盡量 = 集群CPU核心數
- 創建rdd時指定分區數量 sc.textFile(path, partitionNum)
- 更改分區數量
- wordCount 例子
- 求平均值例子
6. 文件數據讀寫
6.1 本地
scala> val textFile = sc.| textFile("file:///home/hadoop/workspace/word.txt") textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/workspace/word.txt MapPartitionsRDD[5] at textFile at <console>:25scala> textFile.| saveAsTextFile("file:///home/hadoop/workspace/writeword")# 后面跟的是一個目錄,而不是文件名 ls /home/hadoop/workspace/writeword/ part-00000 part-00001 _SUCCESShadoop@dblab-VirtualBox:/usr/local/spark/bin$ cat /home/hadoop/workspace/writeword/part-00000 i love programming it is very interesting- 再次讀取寫入的文件(會把目錄下所有文件讀取)
6.2 hdfs
scala> val textFile = | sc.textFile("hdfs://localhost:9000/user/word.txt") textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/word.txt MapPartitionsRDD[11] at textFile at <console>:25scala> textFile.first() res6: String = i love programming保存到 hdfs (默認 當前用戶的目錄前綴 /user/用戶名/)
scala> textFile.saveAsTextFile("writeword")查看 hdfs
hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ ./hdfs dfs -ls -R /user/ drwxr-xr-x - hadoop supergroup 0 2021-04-22 16:01 /user/hadoop drwxr-xr-x - hadoop supergroup 0 2021-04-21 22:48 /user/hadoop/.sparkStaging drwx------ - hadoop supergroup 0 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002 -rw-r--r-- 1 hadoop supergroup 73189 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_conf__.zip -rw-r--r-- 1 hadoop supergroup 120047699 2021-04-21 22:48 /user/hadoop/.sparkStaging/application_1618998320460_0002/__spark_libs__4686608713384839717.zip drwxr-xr-x - hadoop supergroup 0 2021-04-22 16:01 /user/hadoop/writeword -rw-r--r-- 1 hadoop supergroup 0 2021-04-22 16:01 /user/hadoop/writeword/_SUCCESS -rw-r--r-- 1 hadoop supergroup 42 2021-04-22 16:01 /user/hadoop/writeword/part-00000 -rw-r--r-- 1 hadoop supergroup 20 2021-04-22 16:01 /user/hadoop/writeword/part-00001 drwxr-xr-x - hadoop supergroup 0 2017-11-05 21:57 /user/hive drwxr-xr-x - hadoop supergroup 0 2017-11-05 21:57 /user/hive/warehouse drwxr-xr-x - hadoop supergroup 0 2017-11-05 21:57 /user/hive/warehouse/hive.db -rw-r--r-- 1 hadoop supergroup 62 2021-04-21 20:06 /user/word.txt6.3 Json文件
hadoop@dblab-VirtualBox:/usr/local/hadoop/bin$ cat /usr/local/spark/examples/src/main/resources/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} scala> val jsonStr = sc.| textFile("file:///usr/local/spark/examples/src/main/resources/people.json") jsonStr: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.json MapPartitionsRDD[14] at textFile at <console>:25scala> jsonStr.foreach(println) {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}- 解析 json 文件
編寫程序
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON object JSONRead{def main(args:Array[String]){val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"val conf = new SparkConf().setAppName("JSONRead")val sc = new SparkContext(conf)val jsonStrs = sc.textFile(inputFile)val res = jsonStrs.map(s => JSON.parseFull(s))res.foreach({ r => r match {case Some(map:Map[String, Any]) => println(map)case None => println("parsing failed")case other => println("unknown data structure: " + other)}})} }使用 sbt 編譯打包為 jar,spark-submit --class "JSONRead" <路徑 of jar>(有待實踐操作)
參考: 使用Intellij Idea編寫Spark應用程序(Scala+SBT) http://dblab.xmu.edu.cn/blog/1492-2/
6.4 Hbase
hadoop@dblab-VirtualBox:/usr/local/hbase/bin$ ./hbase shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.1.5, r239b80456118175b340b2e562a5568b5c744252e, Sun May 8 20:29:26 PDT 2016hbase(main):001:0> disable "student" 0 row(s) in 3.0730 secondshbase(main):002:0> drop "student" 0 row(s) in 1.3530 secondshbase(main):003:0> create "student","info" 0 row(s) in 1.3570 seconds=> Hbase::Table - student hbase(main):004:0> put "student","1","info:name","michael" 0 row(s) in 0.0920 secondshbase(main):005:0> put "student","1","info:gender","M" 0 row(s) in 0.0410 secondshbase(main):006:0> put "student","1","info:age","18" 0 row(s) in 0.0080 seconds也需要編寫程序,sbt 編譯打包
總結
- 上一篇: Chapter2-1_Voice Con
- 下一篇: LeetCode 1863. 找出所有子