MaxCompute MapReduce
摘要: 大數據計算服務(MaxCompute)的功能詳解和使用心得
點此查看原文:http://click.aliyun.com/m/41384/
前言
MapReduce已經有文檔,用戶可以參考文檔使用。本文是在文檔的基礎上做一些類似注解及細節解釋上的工作。
功能介紹
MapReduce
說起MapReduce就少不了WordCount,我特別喜歡文檔里的這個圖片。
比如有一張很大的表。表里有個String字段記錄的是用空格分割開單詞。最后需要統計所有記錄中,每個單詞出現的次數是多少。那整體的計算流程是:
輸入階段:根據工作量,生成幾個Mapper,把這些表的數據分配給這些Mapper。每個Mapper分配到表里的一部分記錄。
Map階段:每個Mapper針對每條數據,解析里面的字符串,用空格切開字符串,得到一組單詞。針對其中每個單詞,寫一條記錄
快速開始
運行環境
工欲善其事,必先利其器。MR的開發提供了基于IDEA和Eclipse的插件。其中比較推薦用IDEA的插件,因為IDEA我們還在持續做迭代,而Eclipse已經停止做更新了。而且IDEA的功能也比較豐富。
具體的插件的安裝方法步驟可以參考文檔,本文不在贅言。
另外后續還需要用到客戶端,可以參考文檔安裝。
后續為了更加清楚地說明問題,我會盡可能地在客戶端上操作,而不用IDEA里已經集成的方法。
線上運行
以WordCount為例,文檔可以參考這里
步驟為
做數據準備,包括創建表和使用Tunnel命令行工具導入數據
將代碼拷貝到IDE里,編譯打包成mapreduce-examples.jar
在odpscmd里執行add jar命令:
add jar /JarPath/mapreduce-examples.jar -f;
這里的 /JarPath/mapreduce-examples.jar的路徑要替換成本地實際的文件路徑。這個命令能把本地的jar包傳到服務器上,-f是如果已經有同名的jar包就覆蓋,實際使用中對于是報錯還是覆蓋需要謹慎考慮。
在odpscmd里執行
`jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out`
等待作業執行成功后,可以在SQL通過查詢wc_out表的數據,看到執行的結果
功能解讀
任務提交
任務的是在MaxComput(ODPS)上運行的,客戶端通過jar命令發起請求。
對比前面的快速開始,可以看到除去數據準備階段,和MR相關的,有資源的上傳(add jar步驟)和jar命令啟動MR作業兩步。
客戶端發起add jar/add file等資源操作,把在客戶端的機器(比如我測試的時候是從我的筆記本)上,運行任務涉及的資源文件傳到服務器上。這樣后面運行任務的時候,服務器上才能有對應的代碼和文件可以用。如果以前已經傳過了,這一步可以省略。
jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out
這個命令發起作業。MapReduce的任務是運行在MaxCompute集群上的,客戶端需要通過這個命令把任務運行相關的信息告訴集群。
客戶端先解析-classpath參數,找到main方法相關的jar包的位置
根據com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在類的路徑和名字
wc_in wc_out是傳給main方法的參數,通過解析main方法傳入參數String[] args獲得這個參數
-resources告訴服務器,在運行任務的時候,需要用到的資源有哪些。
JobConfig
JobConf定義了這個任務的細節,還是這個圖,解釋一下JobConf的其他設置項的用法。
輸入數據
InputUtils.addTable(TableInfo table, JobConf conf)設置了輸入的表。
setSplitSize(long size)通過調整分片大小來調整Mapper個數,單位 MB,默認256。Mapper個數不通過void setNumMapTasks(int n)設置。
setMemoryForJVM(int mem)設置 JVM虛擬機的內存資源,單位:MB,默認值 1024.
Map階段
setMapperClass(Class theClass)設置Mapper使用的Java類。
setMapOutputKeySchema(Column[] schema)設置 Mapper 輸出到 Reducer 的 Key 行屬性。
setMapOutputValueSchema(Column[] schema)設置 Mapper 輸出到 Reducer 的 Value 行屬性。和上個設置一起定義了Mapper到Reducer的數據格式。
Shuffle-合并排序
setOutputKeySortColumns(String[] cols)設置 Mapper 輸出到 Reducer 的 Key 排序列。
setOutputKeySortOrder(JobConf.SortOrder[] order)設置 Key 排序列的順序。
setCombinerOptimizeEnable(boolean isCombineOpt)設置是否對Combiner進行優化。
setCombinerClass(Class theClass)設置作業的 combiner。
Shuffle-分配Reduce
setNumReduceTasks(int n)設置 Reducer 任務數,默認為 Mapper 任務數的 1/4。如果是Map only的任務,需要設置成0??梢詤⒖歼@里。
setPartitionColumns(String[] cols)設置作業的分區列,定義了數據分配到Reducer的分配策略。
Reduce階段
setOutputGroupingColumns(String[] cols)數據在Reducer里排序好了后,是哪些數據進入到同一個reduce方法的,就是看這里的設置。一般來說,設置的和setPartitionColumns(String[] cols)一樣。可以看到二次排序的用法。
setReducerClass(Class theClass)設置Reducer使用的Java類。
數據輸出
setOutputOverwrite(boolean isOverwrite)設置對輸出表是否進行覆蓋。類似SQL里的Insert into/overwrite Talbe的區別。
OutputUtils.addTable(TableInfo table, JobConf conf)設置了輸出的表。
其他
void setResources(String resourceNames)有和jar命令的-resources一樣的功能,但是優先級高于-resources(也就是說代碼里的設置優先級比較高)
最后通過JobClient.runJob(job);客戶端往服務器發起了這個MapReduce作業。
詳細的SDK的文檔,可以在Maven里下載。
Map/Reduce
讀表
在一個Mapper里,只會讀一張表,不同的表的數據會在不同的Mapper worker上運行,所以可以用示例里的這個方法先獲得這個Mapper讀的是什么表。
資源表/文件
資源表和文件可以讓一些小表/小文件可以方便被讀取。鑒于讀取數據的限制需要小于64次,一般是在setup里讀取后緩存起來。
生產及周期調度
任務提交
客戶端做的就是給服務器發起任務的調度的指令。之前提到的jar命令就是一種方法。鑒于實際上運行場景的多樣性,這里介紹其他的幾種常見方法:
odpscmd -e/-f:odpscmd的-e命令可以在shell腳本里直接運行一個odpscmd里的命令,所以可以在shell腳本里運行odpscmd -e ‘jar -resources xxxxxx’這樣的命令,在shell腳本里調用MapReduce作業。一個完整的例子是
odpscmd -u accessId -p accessKey –project=testproject –endpoint=http://service.odps.aliyun.com/api -e “jar -resources aaa.jar -classpath ./aaa.jar com.XXX.A”
如果在odpscmd的配置文件里已經配置好了,那只需要寫-e的部分。
-f和-e一樣,只是把命令寫到文件里,然后用odpscmd -f xxx.sql引用這個文件,那這個文件里的多個指令都會被執行。
大數據開發套件可以配置MapReduce作業。
大數據開發套件可以配置Shell作業??梢栽赟hell作業里參考上面的方法用odpscmd -e/-f來調度MapReduce作業。
在JAVA代碼里直接調用MapReduce作業,可以通過設置SessionState.setLocalRun(false); 實現。
定時調度
大數據開發套件的定時任務/工作流可以配置調度周期和任務依賴,配合前面提到的方法里的MapReduce作業/Shell作業,實現任務的調度。
產品限制
安全沙箱
沙箱是MaxCompute的一套安全體系,使得在MaxCompute上運行的作業無法獲得其他用戶的信息,也無法獲得系統的一些信息。
無法訪問外部數據源(不能當爬蟲,不能讀RDS等)
無法起多線程/多進程
不支持反射/自定義類加載器(所以不支持一些第三方包)
不允許讀本地文件(比如JSON里就用到了,就需要改用GSON)
不允許JNI調用
其他限制
詳見MaxCompute MR 限制項匯總
總結
以上是生活随笔為你收集整理的MaxCompute MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python库大全(涵盖了Python应
- 下一篇: 下一代智能数据工厂,阿里云发布全新Dat