在阿里云Serverless K8S集群上部署Spark任务并连接OSS(详细步骤)
在阿里云ASK集群上部署Spark任務并連接OSS
簡介
ASK是阿里云的一個產品,屬于Serverless Kubernetes 集群,這次實驗是要在ASK集群上運行Spark計算任務(以WordCount為例),另外為了能讓計算和存儲分離,我使用了阿里云OSS來存放數據。
(連接OSS這塊找了好多資料都不全,在本地可以運行的代碼一放在集群就報錯,遇到很多bug才終于弄好了,記錄下來希望對以后的小伙伴有幫助)
環境準備
本機需要安裝:
JAVA jdk1.8
IDEA
Maven
Docker(安裝在Linux或者Windows)
需要在阿里云開通的服務有:
ASK集群:https://www.aliyun.com/product/cs/ask?spm=5176.166170.J_8058803260.27.586451643ru45z
OSS對象存儲: https://www.aliyun.com/product/oss?spm=5176.166170.J_8058803260.32.58645164XpoJle
ACR鏡像服務:https://www.aliyun.com/product/acr?spm=5176.19720258.J_8058803260.31.281e2c4astzVxy
一、在OSS中準備數據
則訪問該文件的url為,
(按照這種【oss://桶名/路徑/文件名】格式改成你自己的,后面代碼要用到)
二、編寫代碼
1.使用IDEA新建一個maven項目
目錄結構如下:
需要寫的就只有pom.xml文件和java下的osstest.java文件。下面會給出代碼:
(1)osstest.java
這是一份詞頻統計(wordcount)的代碼。步驟是:
具體實現如下:
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;public class osstest {public static void main(String[] args) {// 這些都是OSS的依賴包,不寫的話在本地能跑,放上集群會報錯List<String> jarList = new ArrayList<>();jarList.add("emr-core-1.4.1.jar");jarList.add("aliyun-sdk-oss-3.4.1.jar");jarList.add("commons-codec-1.9.jar");jarList.add("jdom-1.1.jar");jarList.add("commons-logging-1.2.jar");jarList.add("httpclient-4.5.1.jar");jarList.add("httpcore-4.4.1.jar");String ossDepPath = jarList.stream().map(s -> "/opt/spark/jars/" + s).collect(Collectors.joining(","));SparkConf conf = new SparkConf().setAppName("JavaWordCount"); // 如果在本地IDEA執行,需要打開下面一行代碼 // .setMaster("local");conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem"); // 如果在本地IDEA執行,需要打開下面一行代碼 // conf.set("spark.hadoop.mapreduce.job.run-local", "true");conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-shenzhen.aliyuncs.com");// 改成你存放文本的OSS桶的地區conf.set("spark.hadoop.fs.oss.accessKeyId", "*****"); // 改成你自己的accessKeyIdconf.set("spark.hadoop.fs.oss.accessKeySecret", "******");// 改成你自己的accessKeySecret// 需要指定oss依賴的路徑,否則會報錯conf.set("spark.hadoop.fs.oss.core.dependency.path", ossDepPath);System.out.println("----------開始-----------");//創建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("oss://spark-on-k8s-1/hp1.txt", 10); // 改成你自己的讀取文件路徑System.out.println("-----------讀取數據"+lines.count()+"行。----------------");JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());System.out.println("-----------3:"+words);//將單詞和一組合JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> new Tuple2<>(w, 1));System.out.println("-----------4:"+wordAndOne);//聚合JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((m, n) -> m + n);System.out.println("-----------5:"+reduced);//調整順序JavaPairRDD<Integer, String> swaped = reduced.mapToPair(tp -> tp.swap());System.out.println("-----------6"+swaped);//排序JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);System.out.println("-----------7"+sorted);//調整順序JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());System.out.println("-----------8"+result);//將結果保存到ossresult.saveAsTextFile("oss://spark-on-k8s-1/hp1-result-1");// 改成你自己的輸出文件路徑System.out.println("-----------結束------------------------");//釋放資源sc.stop();} }因此以上代碼需要修改的地方有:
(2)pom.xml
pom.xml聲明了Spark和OSS的一些依賴。
注意EMR雖然是阿里云的另一項服務,在這里我們不需要開通它。但少了com.aliyun.emr這個依賴就不能訪問到oss://開頭的地址,所以要加進pom.xml里。
maven-assembly-plugin是用來自定義打包的。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>osstest</artifactId><version>2.0-SNAPSHOT</version><dependencies><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.aliyun.dfs</groupId><artifactId>aliyun-sdk-dfs</artifactId><version>1.0.3</version></dependency><dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-core</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>2.4.3</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><appendAssemblyId>false</appendAssemblyId><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>osstest</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin></plugins></build><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>打包成功的結果為:
jar包就準備好了。
然后需要一臺安裝了Docker的機器(Linux或Windows都行)
創建一個test文件夾。
把打包好的osstest-2.0-SNAPSHOT.jar 和 其他要用到的第三方jar全部放到test文件夾下。
第三方jar就是代碼中寫到的那些,網上都可以下載到:
三、準備鏡像
Dokerfile里做的事情是:
把阿里云提供的spark2.4.4作為基礎鏡像,然后創建了一個 /opt/spark/jars文件夾(注意這個路徑和java代碼中是一致的),然后把我們寫的java代碼打的jar包,和其他的第三方包都放進去。
接下來使用docker build命令,把我們的Dockerfile制作成一個鏡像。
sudo docker build -t registry.cn-shenzhen.aliyuncs.com/sicilly/spark:0.9 -f Dockerfile .注意在上述命令中:
registry.cn-shenzhen.aliyuncs.com/sicilly 需要改成你自己的鏡像倉庫地址
spark 是倉庫名稱,你可以自己起
0.9 是鏡像版本,你可以自己起
再注意命令最后有一個英文的句號
使用docker push命令,把鏡像推送的阿里云的鏡像倉庫。
sudo docker push registry.cn-shenzhen.aliyuncs.com/sicilly/spark:0.9同上,需要改成你自己的鏡像倉庫地址
三、創建集群
注意開通這些服務是要付費的:
安裝ack-spark-operator
在容器服務管理控制臺的導航欄中選擇市場 > 應用目錄,通過選擇ack-spark-operator來進行部署。
四、提交到集群
上述東西都準備好了以后,就可以編寫yaml文件,將任務提交到ASK執行了。
如果你用的是我上面的代碼,需要改的就只有鏡像地址。
方法一:使用kubectl。需要在windows上安裝kubectl工具(安裝方法),連接到你的ASK集群后輸入下列命令即完成創建。
kubectl create -f wordcount-operator-example.yaml方法二:如果不想安裝kubectl,也可以在容器服務管理控制臺上點擊應用-無狀態-使用YAML創建資源
執行完畢
第一次因為要拉取spark鏡像會比較久,耐心等待幾分鐘,顯示Completed就表示已經完成執行。
查看日志
使用kubectl查看日志,也可以在容器組右邊點擊日志。
沒有報錯說明成功了,有報錯的話根據日志排查問題。
下面是一次成功執行的日志。
21/12/21 06:33:33 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1161 21/12/21 06:33:33 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 5 (MapPartitionsRDD[10] at saveAsTextFile at osstest.java:63) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) 21/12/21 06:33:33 INFO TaskSchedulerImpl: Adding task set 5.0 with 10 tasks 21/12/21 06:33:33 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 40, 192.168.59.99, executor 1, partition 0, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:33 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.59.99:41645 (size: 27.2 KB, free: 116.9 MB) 21/12/21 06:33:33 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 192.168.59.99:57144 21/12/21 06:33:34 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 41, 192.168.59.99, executor 1, partition 1, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:34 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 40) in 945 ms on 192.168.59.99 (executor 1) (1/10) 21/12/21 06:33:34 INFO TaskSetManager: Starting task 2.0 in stage 5.0 (TID 42, 192.168.59.99, executor 1, partition 2, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:34 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 41) in 316 ms on 192.168.59.99 (executor 1) (2/10) 21/12/21 06:33:34 INFO TaskSetManager: Starting task 3.0 in stage 5.0 (TID 43, 192.168.59.99, executor 1, partition 3, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:34 INFO TaskSetManager: Finished task 2.0 in stage 5.0 (TID 42) in 316 ms on 192.168.59.99 (executor 1) (3/10) 21/12/21 06:33:35 INFO TaskSetManager: Starting task 4.0 in stage 5.0 (TID 44, 192.168.59.99, executor 1, partition 4, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:35 INFO TaskSetManager: Finished task 3.0 in stage 5.0 (TID 43) in 313 ms on 192.168.59.99 (executor 1) (4/10) 21/12/21 06:33:35 INFO TaskSetManager: Starting task 5.0 in stage 5.0 (TID 45, 192.168.59.99, executor 1, partition 5, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:35 INFO TaskSetManager: Finished task 4.0 in stage 5.0 (TID 44) in 312 ms on 192.168.59.99 (executor 1) (5/10) 21/12/21 06:33:35 INFO TaskSetManager: Starting task 6.0 in stage 5.0 (TID 46, 192.168.59.99, executor 1, partition 6, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:35 INFO TaskSetManager: Finished task 5.0 in stage 5.0 (TID 45) in 350 ms on 192.168.59.99 (executor 1) (6/10) 21/12/21 06:33:36 INFO TaskSetManager: Starting task 7.0 in stage 5.0 (TID 47, 192.168.59.99, executor 1, partition 7, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:36 INFO TaskSetManager: Finished task 6.0 in stage 5.0 (TID 46) in 324 ms on 192.168.59.99 (executor 1) (7/10) 21/12/21 06:33:36 INFO TaskSetManager: Starting task 8.0 in stage 5.0 (TID 48, 192.168.59.99, executor 1, partition 8, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:36 INFO TaskSetManager: Finished task 7.0 in stage 5.0 (TID 47) in 429 ms on 192.168.59.99 (executor 1) (8/10) 21/12/21 06:33:36 INFO TaskSetManager: Starting task 9.0 in stage 5.0 (TID 49, 192.168.59.99, executor 1, partition 9, NODE_LOCAL, 7681 bytes) 21/12/21 06:33:36 INFO TaskSetManager: Finished task 8.0 in stage 5.0 (TID 48) in 335 ms on 192.168.59.99 (executor 1) (9/10) 21/12/21 06:33:37 INFO TaskSetManager: Finished task 9.0 in stage 5.0 (TID 49) in 376 ms on 192.168.59.99 (executor 1) (10/10) 21/12/21 06:33:37 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 21/12/21 06:33:37 INFO DAGScheduler: ResultStage 5 (runJob at SparkHadoopWriter.scala:78) finished in 4.022 s 21/12/21 06:33:37 INFO DAGScheduler: Job 2 finished: runJob at SparkHadoopWriter.scala:78, took 4.741556 s 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 84 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 94 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 120 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 100 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 97 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 119 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 81 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 118 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 77 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 82 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 99 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 121 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 107 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 102 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 105 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 101 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 110 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 80 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 85 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 75 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 83 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 76 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 96 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 91 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 98 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 124 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 122 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 112 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 95 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 93 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 79 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 116 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 106 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 109 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 88 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 113 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 123 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 104 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 78 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 117 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 89 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 108 21/12/21 06:33:38 INFO BlockManagerInfo: Removed broadcast_5_piece0 on wordcount-1640068323479-driver-svc.default.svc:7079 in memory (size: 27.2 KB, free: 116.9 MB) 21/12/21 06:33:38 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 192.168.59.99:41645 in memory (size: 27.2 KB, free: 116.9 MB) 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 92 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 103 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 90 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 111 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 86 21/12/21 06:33:38 INFO BlockManagerInfo: Removed broadcast_4_piece0 on wordcount-1640068323479-driver-svc.default.svc:7079 in memory (size: 3.0 KB, free: 116.9 MB) 21/12/21 06:33:38 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.59.99:41645 in memory (size: 3.0 KB, free: 116.9 MB) 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 87 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 114 21/12/21 06:33:38 INFO ContextCleaner: Cleaned accumulator 115 21/12/21 06:33:39 INFO NativeOssFileSystem: OutputStream for key 'hp1-result-1/_SUCCESS' writing to tempfile '/tmp/hadoop-root/dfs/data/data/root/oss/output-4047609689034382569.data' for block 0 21/12/21 06:33:39 INFO NativeOssFileSystem: OutputStream for key 'hp1-result-1/_SUCCESS' closed. Now beginning upload 21/12/21 06:33:39 INFO NativeOssFileSystem: OutputStream for key 'hp1-result-1/_SUCCESS' upload complete 21/12/21 06:33:39 INFO SparkHadoopWriter: Job job_20211221063332_0010 committed. -----------over------ 21/12/21 06:33:39 INFO SparkUI: Stopped Spark web UI at http://wordcount-1640068323479-driver-svc.default.svc:4040 21/12/21 06:33:39 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 21/12/21 06:33:39 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 21/12/21 06:33:39 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.) 21/12/21 06:33:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/12/21 06:33:39 INFO MemoryStore: MemoryStore cleared 21/12/21 06:33:39 INFO BlockManager: BlockManager stopped 21/12/21 06:33:39 INFO BlockManagerMaster: BlockManagerMaster stopped 21/12/21 06:33:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/12/21 06:33:39 INFO SparkContext: Successfully stopped SparkContext 21/12/21 06:33:39 INFO ShutdownHookManager: Shutdown hook called 21/12/21 06:33:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-fe376e0d-8552-41fa-9620-685390a8ccbb 21/12/21 06:33:39 INFO ShutdownHookManager: Deleting directory /var/data/spark-528397fc-176a-4897-9129-9f4f14327b16/spark-21a04125-581f-48a0-8b84-36704c279704五、查看結果
部分結果為:
(the,3306) (,3056) (to,1827) (and,1787) (a,1577) (of,1235) (was,1148) (he,1018) (Harry,903) (in,898) (his,893) (had,691) (--,688) (said,659) (at,580) (you,578) (it,547) (on,544)完成!
注意做完實驗以后要刪掉ASK集群,否則會一直扣費的!
參考資料
ECI SPARK https://github.com/aliyuneci/BestPractice-Serverless-Kubernetes/tree/master/eci-spark
在ECI中訪問HDFS的數據 https://help.aliyun.com/document_detail/146235.html
在ECI中訪問OSS數據 https://help.aliyun.com/document_detail/146237.html
云上大數據分析最佳實踐 https://developer.aliyun.com/live/2196
ECI最佳實踐-SPARK應用 https://help.aliyun.com/document_detail/146249.html
通過ASK創建Spark計算任務 https://help.aliyun.com/document_detail/165079.htm?spm=a2c4g.11186623.0.0.427a3edeER2KDl#task-2495864
總結
以上是生活随笔為你收集整理的在阿里云Serverless K8S集群上部署Spark任务并连接OSS(详细步骤)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Docker安装Spark集群(带有
- 下一篇: 12月刷题记录