spark中各种数量的确定和查询(持续更新中)
?
| 數量 | 決定/設置方式 | 函數查詢方式 | 備注 |
| partition數量 | sqlContext.setConf("spark.sql.shuffle.partitions", "300") | rdd1.getNumPartitions 或者 rdd1.partitions.size ? | ?configures the number of partitions that are used when shuffling data for joins or aggregations. |
| partition數量 | sqlContext.setConf("spark.default.parallelism", "300") | rdd1.getNumPartitions 或者 rdd1.partitions.size | default number of partitions in?RDDs returned by transformations like?join,?reduceByKey, and?parallelize |
| task數量 | number-of-tasks = number-of-partitions或者"one task per stage per partition”[1] | 不存在查命令/函數行詢方式 | ? |
| executor數量 | --num-executors | 不需要查詢,直接命令行或者配置文件中設定即可 | ? |
| job數量 | 每次spark-submit/spark-shell提交任務就是一個job | 不存在查命令/函數行詢方式 | ? |
| stage數量 | 根據DAG依賴圖來確定(其實還是根據代碼來確定的) | 不存在查命令/函數行詢方式 | ? |
?
根據[1]可知,task數量就是partition的數量。
stage只是一個時間單位,
stage0執行完了執行stage1,然后執行stage2
?
注意不要搞混“并行度”(parallelism)和"并發數"(concurrency)兩個概念
?
[3]spark.default.parallelism???默認是沒有值的,如果設置了值比如說10,是在shuffle的過程才會起作用(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分區數就是10,rdd1的分區數不受這個參數的影響)
new SparkConf().set(“spark.default.parallelism”,”“500)
spark.sql.shuffle.partitions?//spark sql中shuffle過程中partitions的數量
?
[4]
如果是讀取hdfs的文件,一般來說,partition的數量等于文件的數量。
如果單個文件的大小大于hdfs的分塊大小,partition的數量就等于 “文件大小/分塊大小”。
同時,也可以使用rdd的repartition方法重新劃分partition。
另外,在使用聚合函數比如 reducebykey, groupbykey,可以通過指定partitioner來指定partition的數量。
例如:
?val rdd2 = rdd1.reduceByKey(_+_,10)
這里的10可以覆蓋前面的spark.default.parallelism
?
val rdd3 = rdd1.join(rdd2) rdd3里面partiiton的數量是由父RDD(這里指的是rdd1和rdd2)中最多的partition數量來決定,因此使用join算子的時候,增加父RDD中partition的數量。
?
num-executors * executor-cores的2~3倍即為task(partition)數量,此時可以比較充分地利用集群資源[5]
?
Reference:
[1]http://How are stages split into tasks in Spark?
[2]What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism?
[3]Spark性能調優之合理設置并行度
[4]Spark作業中Partition數目的劃分是由什么決定的?
[5]spark 體驗點滴- executor 數量 和task 并行數
總結
以上是生活随笔為你收集整理的spark中各种数量的确定和查询(持续更新中)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 天龙八部手游都有哪些礼包
- 下一篇: 如何在不删除任何数据的情况下恢复默认的