新版Hadoop MapReduce-Yarn
原 Hadoop MapReduce 框架的問題
對(duì)于業(yè)界的大數(shù)據(jù)存儲(chǔ)及分布式處理系統(tǒng)來說,Hadoop 是耳熟能詳?shù)淖吭介_源分布式文件存儲(chǔ)及處理框架,對(duì)于 Hadoop 框架的介紹在此不再累述,讀者可參考 Hadoop 官方簡介。使用和學(xué)習(xí)過老 Hadoop 框架(0.20.0 及之前版本)的同仁應(yīng)該很熟悉如下的原 MapReduce 框架圖:
圖 1.Hadoop 原 MapReduce 架構(gòu)
從上圖中可以清楚的看出原 MapReduce 程序的流程及設(shè)計(jì)思路:
- 首 先用戶程序 (JobClient) 提交了一個(gè) job,job 的信息會(huì)發(fā)送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要與集群中的機(jī)器定時(shí)通信 (heartbeat), 需要管理哪些程序應(yīng)該跑在哪些機(jī)器上,需要管理所有 job 失敗、重啟等操作。
- TaskTracker 是 Map-reduce 集群中每臺(tái)機(jī)器都有的一個(gè)部分,他做的事情主要是監(jiān)視自己所在機(jī)器的資源情況。
- TaskTracker 同時(shí)監(jiān)視當(dāng)前機(jī)器的 tasks 運(yùn)行狀況。TaskTracker 需要把這些信息通過 heartbeat 發(fā)送給 JobTracker,JobTracker 會(huì)搜集這些信息以給新提交的 job 分配運(yùn)行在哪些機(jī)器上。上圖虛線箭頭就是表示消息的發(fā)送 - 接收的過程。
可以看得出原來的 map-reduce 架構(gòu)是簡單明了的,在最初推出的幾年,也得到了眾多的成功案例,獲得業(yè)界廣泛的支持和肯定,但隨著分布式系統(tǒng)集群的規(guī)模和其工作負(fù)荷的增長,原框架的問題逐漸浮出水面,主要的問題集中如下:
- JobTracker 是 Map-reduce 的集中處理點(diǎn),存在單點(diǎn)故障。
- JobTracker 完成了太多的任務(wù),造成了過多的資源消耗,當(dāng) map-reduce job 非常多的時(shí)候,會(huì)造成很大的內(nèi)存開銷,潛在來說,也增加了 JobTracker fail 的風(fēng)險(xiǎn),這也是業(yè)界普遍總結(jié)出老 Hadoop 的 Map-Reduce 只能支持 4000 節(jié)點(diǎn)主機(jī)的上限。
- 在 TaskTracker 端,以 map/reduce task 的數(shù)目作為資源的表示過于簡單,沒有考慮到 cpu/ 內(nèi)存的占用情況,如果兩個(gè)大內(nèi)存消耗的 task 被調(diào)度到了一塊,很容易出現(xiàn) OOM。
- 在 TaskTracker 端,把資源強(qiáng)制劃分為 map task slot 和 reduce task slot, 如果當(dāng)系統(tǒng)中只有 map task 或者只有 reduce task 的時(shí)候,會(huì)造成資源的浪費(fèi),也就是前面提過的集群資源利用的問題。
- 源代碼層面分析的時(shí)候,會(huì)發(fā)現(xiàn)代碼非常的難讀,常常因?yàn)橐粋€(gè) class 做了太多的事情,代碼量達(dá) 3000 多行,,造成 class 的任務(wù)不清晰,增加 bug 修復(fù)和版本維護(hù)的難度。
- 從 操作的角度來看,現(xiàn)在的 Hadoop MapReduce 框架在有任何重要的或者不重要的變化 ( 例如 bug 修復(fù),性能提升和特性化 ) 時(shí),都會(huì)強(qiáng)制進(jìn)行系統(tǒng)級(jí)別的升級(jí)更新。更糟的是,它不管用戶的喜好,強(qiáng)制讓分布式集群系統(tǒng)的每一個(gè)用戶端同時(shí)更新。這些更新會(huì)讓用戶為了驗(yàn)證他們之前的應(yīng) 用程序是不是適用新的 Hadoop 版本而浪費(fèi)大量時(shí)間。
新 Hadoop Yarn 框架原理及運(yùn)作機(jī)制
從 業(yè)界使用分布式系統(tǒng)的變化趨勢和 hadoop 框架的長遠(yuǎn)發(fā)展來看,MapReduce 的 JobTracker/TaskTracker 機(jī)制需要大規(guī)模的調(diào)整來修復(fù)它在可擴(kuò)展性,內(nèi)存消耗,線程模型,可靠性和性能上的缺陷。在過去的幾年中,hadoop 開發(fā)團(tuán)隊(duì)做了一些 bug 的修復(fù),但是最近這些修復(fù)的成本越來越高,這表明對(duì)原框架做出改變的難度越來越大。
為 從根本上解決舊 MapReduce 框架的性能瓶頸,促進(jìn) Hadoop 框架的更長遠(yuǎn)發(fā)展,從 0.23.0 版本開始,Hadoop 的 MapReduce 框架完全重構(gòu),發(fā)生了根本的變化。新的 Hadoop MapReduce 框架命名為 MapReduceV2 或者叫 Yarn,其架構(gòu)圖如下圖所示:
圖 2. 新的 Hadoop MapReduce 框架(Yarn)架構(gòu)
重 構(gòu)根本的思想是將 JobTracker 兩個(gè)主要的功能分離成單獨(dú)的組件,這兩個(gè)功能是資源管理和任務(wù)調(diào)度 / 監(jiān)控。新的資源管理器全局管理所有應(yīng)用程序計(jì)算資源的分配,每一個(gè)應(yīng)用的 ApplicationMaster 負(fù)責(zé)相應(yīng)的調(diào)度和協(xié)調(diào)。一個(gè)應(yīng)用程序無非是一個(gè)單獨(dú)的傳統(tǒng)的 MapReduce 任務(wù)或者是一個(gè) DAG( 有向無環(huán)圖 ) 任務(wù)。ResourceManager 和每一臺(tái)機(jī)器的節(jié)點(diǎn)管理服務(wù)器能夠管理用戶在那臺(tái)機(jī)器上的進(jìn)程并能對(duì)計(jì)算進(jìn)行組織。
事實(shí)上,每一個(gè)應(yīng)用的 ApplicationMaster 是一個(gè)詳細(xì)的框架庫,它結(jié)合從 ResourceManager 獲得的資源和 NodeManager 協(xié)同工作來運(yùn)行和監(jiān)控任務(wù)。
上圖中 ResourceManager 支持分層級(jí)的應(yīng)用隊(duì)列,這些隊(duì)列享有集群一定比例的資源。從某種意義上講它就是一個(gè)純粹的調(diào)度器,它在執(zhí)行過程中不對(duì)應(yīng)用進(jìn)行監(jiān)控和狀態(tài)跟蹤。同樣,它也不能重啟因應(yīng)用失敗或者硬件錯(cuò)誤而運(yùn)行失敗的任務(wù)。
ResourceManager 是基于應(yīng)用程序?qū)Y源的需求進(jìn)行調(diào)度的 ; 每一個(gè)應(yīng)用程序需要不同類型的資源因此就需要不同的容器。資源包括:內(nèi)存,CPU,磁盤,網(wǎng)絡(luò)等等。可以看出,這同現(xiàn) Mapreduce 固定類型的資源使用模型有顯著區(qū)別,它給集群的使用帶來負(fù)面的影響。資源管理器提供一個(gè)調(diào)度策略的插件,它負(fù)責(zé)將集群資源分配給多個(gè)隊(duì)列和應(yīng)用程序。調(diào)度 插件可以基于現(xiàn)有的能力調(diào)度和公平調(diào)度模型。
上圖中 NodeManager 是每一臺(tái)機(jī)器框架的代理,是執(zhí)行應(yīng)用程序的容器,監(jiān)控應(yīng)用程序的資源使用情況 (CPU,內(nèi)存,硬盤,網(wǎng)絡(luò) ) 并且向調(diào)度器匯報(bào)。
每一個(gè)應(yīng)用的 ApplicationMaster 的職責(zé)有:向調(diào)度器索要適當(dāng)?shù)馁Y源容器,運(yùn)行任務(wù),跟蹤應(yīng)用程序的狀態(tài)和監(jiān)控它們的進(jìn)程,處理任務(wù)的失敗原因。
新舊 Hadoop MapReduce 框架比對(duì)
讓我們來對(duì)新舊 MapReduce 框架做詳細(xì)的分析和對(duì)比,可以看到有以下幾點(diǎn)顯著變化:
首先客戶端不變,其調(diào)用 API 及接口大部分保持兼容,這也是為了對(duì)開發(fā)使用者透明化,使其不必對(duì)原有代碼做大的改變 ( 詳見 2.3 Demo 代碼開發(fā)及詳解),但是原框架中核心的 JobTracker 和 TaskTracker 不見了,取而代之的是 ResourceManager, ApplicationMaster 與 NodeManager 三個(gè)部分。
我 們來詳細(xì)解釋這三個(gè)部分,首先 ResourceManager 是一個(gè)中心的服務(wù),它做的事情是調(diào)度、啟動(dòng)每一個(gè) Job 所屬的 ApplicationMaster、另外監(jiān)控 ApplicationMaster 的存在情況。細(xì)心的讀者會(huì)發(fā)現(xiàn):Job 里面所在的 task 的監(jiān)控、重啟等等內(nèi)容不見了。這就是 AppMst 存在的原因。ResourceManager 負(fù)責(zé)作業(yè)與資源的調(diào)度。接收 JobSubmitter 提交的作業(yè),按照作業(yè)的上下文 (Context) 信息,以及從 NodeManager 收集來的狀態(tài)信息,啟動(dòng)調(diào)度過程,分配一個(gè) Container 作為 App Mstr
NodeManager 功能比較專一,就是負(fù)責(zé) Container 狀態(tài)的維護(hù),并向 RM 保持心跳。
ApplicationMaster 負(fù)責(zé)一個(gè) Job 生命周期內(nèi)的所有工作,類似老的框架中 JobTracker。但注意每一個(gè) Job(不是每一種)都有一個(gè) ApplicationMaster,它可以運(yùn)行在 ResourceManager 以外的機(jī)器上。
Yarn 框架相對(duì)于老的 MapReduce 框架什么優(yōu)勢呢?我們可以看到:
- 這個(gè)設(shè)計(jì)大大減小了 JobTracker(也就是現(xiàn)在的 ResourceManager)的資源消耗,并且讓監(jiān)測每一個(gè) Job 子任務(wù) (tasks) 狀態(tài)的程序分布式化了,更安全、更優(yōu)美。
- 在新的 Yarn 中,ApplicationMaster 是一個(gè)可變更的部分,用戶可以對(duì)不同的編程模型寫自己的 AppMst,讓更多類型的編程模型能夠跑在 Hadoop 集群中,可以參考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
- 對(duì)于資源的表示以內(nèi)存為單位 ( 在目前版本的 Yarn 中,沒有考慮 cpu 的占用 ),比之前以剩余 slot 數(shù)目更合理。
- 老 的框架中,JobTracker 一個(gè)很大的負(fù)擔(dān)就是監(jiān)控 job 下的 tasks 的運(yùn)行狀況,現(xiàn)在,這個(gè)部分就扔給 ApplicationMaster 做了,而 ResourceManager 中有一個(gè)模塊叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是監(jiān)測 ApplicationMaster 的運(yùn)行狀況,如果出問題,會(huì)將其在其他機(jī)器上重啟。
- Container 是 Yarn 為了將來作資源隔離而提出的一個(gè)框架。這一點(diǎn)應(yīng)該借鑒了 Mesos 的工作,目前是一個(gè)框架,僅僅提供 java 虛擬機(jī)內(nèi)存的隔離 ,hadoop 團(tuán)隊(duì)的設(shè)計(jì)思路應(yīng)該后續(xù)能支持更多的資源調(diào)度和控制 , 既然資源表示成內(nèi)存量,那就沒有了之前的 map slot/reduce slot 分開造成集群資源閑置的尷尬情況。
新的 Yarn 框架相對(duì)舊 MapRduce 框架而言,其配置文件 , 啟停腳本及全局變量等也發(fā)生了一些變化,主要的改變?nèi)缦?#xff1a;
表 1. 新舊 Hadoop 腳本 / 變量 / 位置變化表
| 改變項(xiàng) | 原框架中 | 新框架中(Yarn) | 備注 |
| 配置文件位置 | ${hadoop_home_dir}/conf | ${hadoop_home_dir}/etc/hadoop/ | Yarn 框架也兼容老的 ${hadoop_home_dir}/conf 位置配置,啟動(dòng)時(shí)會(huì)檢測是否存在老的 conf 目錄,如果存在將加載 conf 目錄下的配置,否則加載 etc 下配置 |
| 啟停腳本 | ${hadoop_home_dir}/bin/start(stop)-all.sh | ${hadoop_home_dir}/sbin/start(stop)-dfs.sh ${hadoop_home_dir}/bin/start(stop)-all.sh | 新的 Yarn 框架中啟動(dòng)分布式文件系統(tǒng)和啟動(dòng) Yarn 分離,啟動(dòng) / 停止分布式文件系統(tǒng)的命令位于 ${hadoop_home_dir}/sbin 目錄下,啟動(dòng) / 停止 Yarn 框架位于 ${hadoop_home_dir}/bin/ 目錄下 |
| JAVA_HOME 全局變量 | ${hadoop_home_dir}/bin/start-all.sh 中 | ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh ${hadoop_home_dir}/etc/hadoop/Yarn-env.sh | Yarn 框架中由于啟動(dòng) hdfs 分布式文件系統(tǒng)和啟動(dòng) MapReduce 框架分離,JAVA_HOME 需要在 hadoop-env.sh 和 Yarn-env.sh 中分別配置 |
| HADOOP_LOG_DIR 全局變量 | 不需要配置 | ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh | 老框架在 LOG,conf,tmp 目錄等均默認(rèn)為腳本啟動(dòng)的當(dāng)前目錄下的 log,conf,tmp 子目錄 Yarn 新框架中 Log 默認(rèn)創(chuàng)建在 Hadoop 用戶的 home 目錄下的 log 子目錄,因此最好在 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 配置 HADOOP_LOG_DIR,否則有可能會(huì)因?yàn)槟銌?dòng) hadoop 的用戶的 .bashrc 或者 .bash_profile 中指定了其他的 PATH 變量而造成日志位置混亂,而該位置沒有訪問權(quán)限的話啟動(dòng)過程中會(huì)報(bào)錯(cuò) |
由于新的 Yarn 框架與原 Hadoop MapReduce 框架相比變化較大,核心的配置文件中很多項(xiàng)在新框架中已經(jīng)廢棄,而新框架中新增了很多其他配置項(xiàng),看下表所示會(huì)更加清晰:
表 2. 新舊 Hadoop 框架配置項(xiàng)變化表
| 配置文件 | 配置項(xiàng) | Hadoop 0.20.X 配置 | Hadoop 0.23.X 配置 | 說明 |
| core-site.xml | 系統(tǒng)默認(rèn)分布式文件 URI | fs.default.name | fs.defaultFS | ? |
| hdfs-site.xml | DFS name node 存放 name table 的目錄 | dfs.name.dir | dfs.namenode.name.dir | 新框架中 name node 分成 dfs.namenode.name.dir( 存放 naname table 和 dfs.namenode.edits.dir(存放 edit 文件),默認(rèn)是同一個(gè)目錄 |
| ? | DFS data node 存放數(shù)據(jù) block 的目錄 | dfs.data.dir | dfs.datanode.data.dir | 新 框架中 DataNode 增加更多細(xì)節(jié)配置,位于 dfs.datanode. 配置項(xiàng)下,如dfs.datanode.data.dir.perm(datanode local 目錄默認(rèn)權(quán)限);dfs.datanode.address(datanode 節(jié)點(diǎn)監(jiān)聽端口);等 |
| ? | 分布式文件系統(tǒng)數(shù)據(jù)塊復(fù)制數(shù) | dfs.replication | dfs.replication | 新框架與老框架一致,值建議配置為與分布式 cluster 中實(shí)際的 DataNode 主機(jī)數(shù)一致 |
| mapred-site.xml | Job 監(jiān)控地址及端口 | mapred.job.tracker | 無 | 新 框架中已改為 Yarn-site.xml 中的 resouceManager 及 nodeManager 具體配置項(xiàng),新框架中歷史 job 的查詢已從 Job tracker 剝離,歸入單獨(dú)的mapreduce.jobtracker.jobhistory 相關(guān)配置, |
| ? | 第三方 MapReduce 框架 | 無 | mapreduce.framework.name | 新框架支持第三方 MapReduce 開發(fā)框架以支持如 SmartTalk/DGSG 等非 Yarn 架構(gòu),注意通常情況下這個(gè)配置的值都設(shè)置為 Yarn,如果沒有配置這項(xiàng),那么提交的 Yarn job 只會(huì)運(yùn)行在 locale 模式,而不是分布式模式。 |
| ? | ? | ? | ? | ? |
| Yarn-site.xml | The address of the applications manager interface in the RM | 無 | Yarn.resourcemanager.address | 新框架中 NodeManager 與 RM 通信的接口地址 |
| ? | The address of the scheduler interface | 無 | Yarn.resourcemanager.scheduler.address | 同上,NodeManger 需要知道 RM 主機(jī)的 scheduler 調(diào)度服務(wù)接口地址 |
| ? | The address of the RM web application | 無 | Yarn.resourcemanager.webapp.address | 新框架中各個(gè) task 的資源調(diào)度及運(yùn)行狀況通過通過該 web 界面訪問 |
| ? | The address of the resource tracker interface | 無 | Yarn.resourcemanager.resource-tracker.address | 新框架中 NodeManager 需要向 RM 報(bào)告任務(wù)運(yùn)行狀態(tài)供 Resouce 跟蹤,因此 NodeManager 節(jié)點(diǎn)主機(jī)需要知道 RM 主機(jī)的 tracker 接口地址 |
?
回頁首
Hadoop Yarn 框架 Demo 示例
Demo 場景介紹:Weblogic 應(yīng)用服務(wù)器日志分析
了解了 hadoop 新的 Yarn 框架的架構(gòu)和思路后,我們用一個(gè) Demo 示例來檢驗(yàn)新 Yarn 框架下 Map-Reduce 程序的開發(fā)部署。
我們考慮如下應(yīng)用場景:用戶的生產(chǎn)系統(tǒng)由多臺(tái) Weblogic 應(yīng)用服務(wù)器組成,每天需要每臺(tái)對(duì)應(yīng)用服務(wù)器的日志內(nèi)容進(jìn)行檢查,統(tǒng)計(jì)其日志級(jí)別和日志模塊的總數(shù)。
WebLogic 的日志范例如下圖所示:
圖 3.Weblogic 日志示例
如 上圖所示,<Info> 為 weblogic 的日志級(jí)別,<Security>,<Management> 為 Weblogic 的日志模塊,我們主要分析 loglevel 和 logmodule 這兩個(gè)維度分別在 WebLogic 日志中出現(xiàn)的次數(shù),每天需要統(tǒng)計(jì)出 loglevel 和 logmodule 分別出現(xiàn)的次數(shù)總數(shù)。
Demo 測試環(huán)境 Yarn 框架搭建
由 于 Weblogic 應(yīng)用服務(wù)器分布于不同的主機(jī),且日志數(shù)據(jù)量巨大,我們采用 hadoop 框架將 WebLogic 各個(gè)應(yīng)用服務(wù)器主機(jī)上建立分布式目錄,每天將 WebLogic 日志裝載進(jìn) hadoop 分布式文件系統(tǒng),并且編寫基于 Yarn 框架的 MapReduce 程序?qū)θ罩具M(jìn)行處理,分別統(tǒng)計(jì)出 LogLevel 和 Logmodule 在日志中出現(xiàn)的次數(shù)并計(jì)算總量,然后輸出到分布式文件系統(tǒng)中,輸出目錄命名精確到小時(shí)為后綴以便區(qū)分每次 Demo 程序運(yùn)行的處理結(jié)果。
我 們搭建一個(gè) Demo 測試環(huán)境以驗(yàn)證 Yarn 框架下分布式程序處理該案例的功能,以兩臺(tái)虛擬機(jī)作為該 Demo 的運(yùn)行平臺(tái),兩機(jī)均為 Linux 操作系統(tǒng),機(jī)器 hostname 為 OEL 和 Stephen,OEL 作為 NameNode 和 ResouceManager 節(jié)點(diǎn)主機(jī),64 位,Stephen 作為 DataNode 和 NodeManager 節(jié)點(diǎn)主機(jī),32 位(Hadoop 支持異構(gòu)性), 具體如下:
表 3.Demo 測試環(huán)境表
| 主機(jī)名 | 角色 | 備注 |
| OEL(192.168.137.8) | NameNode 節(jié)點(diǎn)主機(jī) | linux 操作系統(tǒng) |
| Stephen(192.168.l37.2) | DataNode 節(jié)點(diǎn)主機(jī) | linux 操作系統(tǒng) |
我 們把 hadoop 安裝在兩臺(tái)測試機(jī)的 /hadoop 文件系統(tǒng)目錄下,安裝后的 hadoop 根目錄為:/hadoop/hadoop-0.23.0,規(guī)劃分布式文件系統(tǒng)存放于 /hadoop/dfs 的本地目錄,對(duì)應(yīng)分布式系統(tǒng)中的目錄為 /user/oracle/dfs
我們根據(jù) Yarn 框架要求,分別在 core-site.xml 中配置分布式文件系統(tǒng)的 URL,詳細(xì)如下:
清單 1.core-site.xml 配置
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???<configuration>? ? <property>? ? <name>fs.defaultFS</name>? ? <value>hdfs://192.168.137.8:9100</value>? ? </property>??</configuration> |
在 hdfs-site.xml 中配置 nameNode,dataNode 的本地目錄信息,詳細(xì)如下:
清單 2.hdfs-site.xml 配置
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???<configuration>??<property>? ?<name>dfs.namenode.name.dir</name>? ?<value>/hadoop/dfs/name</value>? ?<description>??</description>??</property>??<property>? ?<name>dfs.datanode.data.dir</name>? ?<value>/hadoop/dfs/data</value>? ?<description> </description>??</property>??<property>? ? <name>dfs.replication</name>? ? <value>2</value>??</property>??</configuration> |
在 mapred-site.xml 中配置其使用 Yarn 框架執(zhí)行 map-reduce 處理程序,詳細(xì)如下:
清單 3.mapred-site.xml 配置
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???<configuration>? ?<property>? ?<name>mapreduce.framework.name</name>? ?<value>Yarn</value>? ?</property>??</configuration> |
最后在 Yarn-site.xml 中配置 ResourceManager,NodeManager 的通信端口,web 監(jiān)控端口等,詳細(xì)如下:
清單 4.Yarn-site.xml 配置
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???<?xml version="1.0"?>??<configuration>??<!-- Site specific YARN configuration properties -->? ?<property>? ?<name>Yarn.nodemanager.aux-services</name>? ?<value>mapreduce.shuffle</value>? ?</property>? ?<property>? ?<description>The address of the applications manager interface in the RM.</description>? ?<name>Yarn.resourcemanager.address</name>? ?<value>192.168.137.8:18040</value>? ?</property>? ?<property>? ?<description>The address of the scheduler interface.</description>? ?<name>Yarn.resourcemanager.scheduler.address</name>? ?<value>192.168.137.8:18030</value>? ?</property>? ?<property>? ?<description>The address of the RM web application.</description>? ?<name>Yarn.resourcemanager.webapp.address</name>? ?<value>192.168.137.8:18088</value>? ?</property>? ???<property>? ?<description>The address of the resource tracker interface.</description>? ?<name>Yarn.resourcemanager.resource-tracker.address</name>? ?<value>192.168.137.8:8025</value>? ?</property>??</configuration> |
具體配置項(xiàng)的含義,在 hadoop 官方網(wǎng)站有詳細(xì)的說明,讀者可以參見 hadoop 0.23.0 官方配置模板。
Demo 代碼開發(fā)及詳解
以下我們詳細(xì)介紹一下新的 Yarn 框架下針對(duì)該應(yīng)用場景的 Demo 代碼的開發(fā), 在 Demo 程序的每個(gè)類都有詳細(xì)的注釋和說明,Yarn 開發(fā)為了兼容老版本,API 變化不大,可以參考 官方 Hadoop Yarn 框架 API。
在 Map 程序中,我們以行號(hào)為 key,行文本為 value 讀取每一行 WebLogic 日志輸入,將 loglevel 和 logmodule 的值讀出作為 Map 處理后的新的 key 值,由于一行中 loglevel 和 logmodule 的出現(xiàn)次數(shù)應(yīng)該唯一,所以經(jīng) Map 程序處理后的新的 record 記錄的 value 應(yīng)該都為 1:
清單 5. Map 業(yè)務(wù)邏輯
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???public static class MapClass extends Mapper<Object, Text, Text, IntWritable>? ?{? ?private Text record = new Text();? ?private static final IntWritable recbytes = new IntWritable(1);? ?public void map(Object key, Text value,Context context)? ???throws IOException,InterruptedException {? ? String line = value.toString();??// 沒有配置 RecordReader,所以默認(rèn)采用 line 的實(shí)現(xiàn), //key 就是行號(hào),value 就是行內(nèi)容, // 按行 key-value 存放每行 loglevel 和 logmodule 內(nèi)容??if (line == null || line.equals(""))? ???return;? ?String[] words = line.split("> <");? ?if (words == null || words.length < 2)? ???return;? ?String logLevel = words[1];? ?String moduleName = words[2];? ???record.clear();? ?record.set(new StringBuffer("logLevel::").append(logLevel).toString());? ?context.write(record, recbytes);? ?// 輸出日志級(jí)別統(tǒng)計(jì)結(jié)果,通過 logLevel:: 作為前綴來標(biāo)示。? ? record.clear();? ? record.set(new StringBuffer("moduleName::").append(moduleName).toString());? ?context.write(record, recbytes);? ? // 輸出模塊名的統(tǒng)計(jì)結(jié)果,通過 moduleName:: 作為前綴來標(biāo)示??}? ? } |
由 于有 loglevel 和 logmodule 兩部分的分析工作,我們設(shè)定兩個(gè) Reduce 來分別處理這兩部分,loglevel 的交給 reduce1,logmodule 交給 reduce2。因此我們編寫 Patitioner 類,根據(jù) Map 傳過來的 Key 中包含的 logLevel 和 moduleName 的前綴,來分配到不同的 Reduce:
清單 6.Partition 業(yè)務(wù)邏輯
| public static class PartitionerClass extends Partitioner<Text, IntWritable> { public int getPartition(Text key, IntWritable value, int numPartitions) { if (numPartitions >= 2)//Reduce 個(gè)數(shù),判斷 loglevel 還是 logmodule 的統(tǒng)計(jì),分配到不同的 Reduce if (key.toString().startsWith("logLevel::")) return 0; else if(key.toString().startsWith("moduleName::")) return 1; else return 0; else return 0; } } |
在 Reduce 程序中,累加并合并 loglevel 和 logmodule 的出現(xiàn)次數(shù)
清單 7. Reduce 業(yè)務(wù)邏輯
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???public static class ReduceClass extends??Reducer<Text, IntWritable,Text, IntWritable>? ?? ?? ?{? ?? ?? ?? ? private IntWritable result = new IntWritable();? ?? ?? ?? ? public void reduce(Text key, Iterable<IntWritable> values,? ?? ?? ?? ?? ?? ?? ?Context context)throws IOException,? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?InterruptedException {? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?int tmp = 0;? ?? ?? ?? ?? ???for (IntWritable val : values) {? ?? ?? ?? ?? ?? ?? ?tmp = tmp + val.get();? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???}? ?? ?? ?? ?? ???result.set(tmp);? ?? ?? ?? ?? ???context.write(key, result);// 輸出最后的匯總結(jié)果? ?? ?? ?? ?}? ?? ?? ?? ?} |
以上完成了 MapReduce 的主要處理邏輯,對(duì)于程序入口,我們使用 Hadoop 提供的 Tools 工具包方便的進(jìn)行 May-Reduce 程序的啟動(dòng)和 Map/Reduce 對(duì)應(yīng)處理 class 的配置。
清單 8. Main 執(zhí)行類
| ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???import java.io.File;??import java.io.IOException;??import java.text.SimpleDateFormat;??import java.util.Date;??import java.util.Iterator;??import org.apache.hadoop.conf.Configuration;??import org.apache.hadoop.conf.Configured;??import org.apache.hadoop.fs.Path;??import org.apache.hadoop.io.IntWritable;??import org.apache.hadoop.io.Text;??import org.apache.hadoop.mapreduce.Job;??import org.apache.hadoop.mapreduce.Reducer;??import org.apache.hadoop.mapreduce.Mapper;??import org.apache.hadoop.mapreduce.Partitioner;??import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;??import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;??import org.apache.hadoop.util.Tool;??import org.apache.hadoop.util.ToolRunner;??public class LogAnalysiser extends Configured implements Tool {? ? public static void main(String[] args)? ?{? ???try? ?{? ?int res;? ?res = ToolRunner.run(new Configuration(),new LogAnalysiser(), args);? ?System.exit(res);? ?} catch (Exception e)? ?{? ?e.printStackTrace();? ?}? ?}? ?public int run(String[] args) throws Exception? ?{? ?if (args == null || args.length <2)? ?{? ?System.out.println("need inputpath and outputpath");? ?return 1;? ?}? ?String inputpath = args[0];? ?String outputpath = args[1];? ?String shortin = args[0];? ?String shortout = args[1];? ?if (shortin.indexOf(File.separator) >= 0)? ?shortin = shortin.substring(shortin.lastIndexOf(File.separator));? ?if (shortout.indexOf(File.separator) >= 0)? ?shortout = shortout.substring(shortout.lastIndexOf(File.separator));? ?SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd.HH.mm");? ?shortout = new StringBuffer(shortout).append("-")? ?.append(formater.format(new Date())).toString();? ?? ? if (!shortin.startsWith("/"))? ?shortin = "/" + shortin;? ?if (!shortout.startsWith("/"))? ?shortout = "/" + shortout;? ?shortin = "/user/oracle/dfs/" + shortin;? ?shortout = "/user/oracle/dfs/" + shortout;? ???File inputdir = new File(inputpath);? ?File outputdir = new File(outputpath);? ???if (!inputdir.exists() || !inputdir.isDirectory())? ?{? ?System.out.println("inputpath not exist or isn't dir!");? ?return 0;? ?}? ?if (!outputdir.exists())? ?{? ?new File(outputpath).mkdirs();? ?}??// 以下注釋的是 hadoop 0.20.X 老版本的 Job 代碼,在 hadoop0.23.X 新框架中已經(jīng)大大簡化 //? ?Configuration conf = getConf();??//? ?JobConf job = new JobConf(conf, LogAnalysiser.class);? ? //? ? JobConf conf = new JobConf(getConf(),LogAnalysiser.class);// 構(gòu)建 Config??//? ? conf.setJarByClass(MapClass.class);??//? ? conf.setJarByClass(ReduceClass.class);??//? ? conf.setJarByClass(PartitionerClass.class);??//? ? conf.setJar("hadoopTest.jar");??//? ? job.setJar("hadoopTest.jar");??// 以下是新的 hadoop 0.23.X Yarn 的 Job 代碼 job job = new Job(new Configuration());? ???job.setJarByClass(LogAnalysiser.class);? ???job.setJobName("analysisjob");? ???job.setOutputKeyClass(Text.class);// 輸出的 key 類型,在 OutputFormat 會(huì)檢查? ? job.setOutputValueClass(IntWritable.class); // 輸出的 value 類型,在 OutputFormat 會(huì)檢查? ? job.setJarByClass(LogAnalysiser.class);? ???job.setMapperClass(MapClass.class);? ???job.setCombinerClass(ReduceClass.class);? ???job.setReducerClass(ReduceClass.class);? ???job.setPartitionerClass(PartitionerClass.class);? ???job.setNumReduceTasks(2);// 強(qiáng)制需要有兩個(gè) Reduce 來分別處理流量和次數(shù)的統(tǒng)計(jì)? ? FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的輸入路徑? ? FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中輸出路徑? ?? ???Date startTime = new Date();? ???System.out.println("Job started: " + startTime);? ???job.waitForCompletion(true);? ?? ???Date end_time = new Date();? ???System.out.println("Job ended: " + end_time);? ???System.out.println("The job took " +? ???(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");? ???// 刪除輸入和輸出的臨時(shí)文件 //? ? fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));??//? ? fileSys.delete(new Path(shortin),true);??//? ? fileSys.delete(new Path(shortout),true);? ???return 0;? ?}??} |
Demo 部署及運(yùn)行
Demo 輸入輸出的控制
本 demo 中我們將從 Weblogic 日志目錄中拷貝原始待處理日志文件作為 Yarn 程序的輸入,使用 hadoop dfs 命令將其放入分布式目錄的 input 目錄,處理完后將生成以時(shí)間戳為文件目錄后綴的輸出目錄
Weblogic 日志存放的原始目錄位于:/u01/app/Oracle/Middleware/user_projects/domains/test_domain/AdminServer/logs
分布式文件系統(tǒng)中的輸入目錄:/user/oracle/dfs/input
分布式文件系統(tǒng)中的輸出目錄:/user/oracle/dfs/output_%YYYY-MM-DD-hh-mm%
Demo 打包和部署
可以使用 JDeveloper 或者 Eclipse 等 IDE 工具將開發(fā)的 Hadoop Demo 代碼打包為 jar,并指定 Main 類為 LoyAnalyze,本文中我們采用 JDeveloper 打包 Demo 代碼,如下圖示例:
圖 4.Yarn Demo 程序打包示例
Demo 執(zhí)行與跟蹤
我們在 OEL 主機(jī)(NameNode&ResourceManager 主機(jī),192.168.137.8)上啟動(dòng) dfs 分布式文件系統(tǒng):
圖 5. 啟動(dòng) Demo dfs 文件系統(tǒng)
從上圖可以看出 dfs 分布式文件系統(tǒng)已經(jīng)在 OEL 和 Stephen 主機(jī)上成功啟動(dòng),我們通過默認(rèn)的分布式文件系統(tǒng) Web 監(jiān)控 端口http://192.168.137.8:50070(也可以在上文中 core-site.xml 中配置 dfs.namenode.http-address 項(xiàng)指定其他端口 ) 來驗(yàn)證其文件系統(tǒng)情況:
圖 6.hadoop 文件系統(tǒng) web 監(jiān)控頁面
從上圖中我們可以看到 /user/oracle/dfs 分布式文件系統(tǒng)已成功建立。
接下來我們在 NameNode 主機(jī)(OEL,192.168.137.8)上啟動(dòng) Yarn 框架:
圖 7. 啟動(dòng) Demo Yarn 框架
從上圖我們可以看到 ResouceManager 在 OEL 主機(jī)上成功啟動(dòng),NodeManager 進(jìn)程在 Stephen 節(jié)點(diǎn)主機(jī)上也已經(jīng)啟動(dòng),至此整個(gè)新的 Hadoop Yarn 框架已經(jīng)成功啟動(dòng)。
我 們將打好的 testHadoop.jar 包上傳至 NameNode 主機(jī)(OEL)的 /hadoop/hadoop-0.23.0/ 根目錄下,我們使用 Hadoop 自帶的 hadoop 命令行工具執(zhí)行 Demo 的 jar 包,具體步驟為,先使用 hadoop dfs 命令將輸入文件(weblogic 原始日志)拷貝至 dfs 分布式目錄的 input 輸入目錄,清理 dfs 分布式目錄下的 output 輸出子目錄。然后使用 hadoop jar 命令執(zhí)行 testHadoop 的 jar 包。
執(zhí)行 Demo 的 shell 腳本示例如下:
| ./bin/hadoop dfs -rmr /user/oracle/dfs/output*??./bin/hadoop dfs -rmr /user/oracle/dfs/input??./bin/hadoop dfs -mkdir /user/oracle/dfs/input??./bin/hadoop dfs -copyFromLocal ./input/*.log /user/oracle/dfs/input/??./bin/hadoop jar ./hadoopTest.jar /hadoop/hadoop-0.23.0/input? ?? ?? ?? ?? ?? ?/hadoop/hadoop-0.23.0/output |
清單 9.Demo 執(zhí)行腳本
然后我們使用上文中的腳本啟動(dòng) demo 并執(zhí)行:
圖 8.Demo 程序運(yùn)行
查看大圖
從上圖的 console 輸出中我們可以看到 Demo 程序的結(jié)果和各項(xiàng)統(tǒng)計(jì)信息輸出,下面我們通過 Web 監(jiān)控界面詳細(xì)中觀察程序執(zhí)行的執(zhí)行流程和步驟細(xì)節(jié)。
Job 啟動(dòng)后我們可以通過 ResourceManager 的 Web 端口(在上文中 Yarn-site.xml 配置文件中 Yarn.resourcemanager.webapp.address 配置項(xiàng)) http://192.168.137.8:18088 來監(jiān)控其 job 的資源調(diào)度。
圖 9. 接收請求和生成 job application
查看大圖
上圖中我們可以看到 Yarn 框架接受到客戶端請求 , 如上圖所示 ID 為 application_1346564668712_0003 的 job 已經(jīng)是 accepted 狀態(tài)
我們點(diǎn)擊該 ID 的鏈接進(jìn)入到該 application 的 Map-Reduce 處理監(jiān)控頁面,該界面中有動(dòng)態(tài)分配的 ApplicationMaster 的 Web 跟蹤端口可以監(jiān)視 MapReduce 程序的步驟細(xì)節(jié)
圖 10.hadoop MapReduce Application Web 監(jiān)控頁面 (1)
點(diǎn)擊上圖中 ApplicationMaster 的 URL 可以進(jìn)入該 ApplicationMaster 負(fù)責(zé)管理的 Job 的具體 Map-Reduce 運(yùn)行狀態(tài):
圖 11.hadoop MasterApplication Web 監(jiān)控頁面(2)
上圖中我們可以看到 ID 為 application_1346564668712_0003 的 Job 正在執(zhí)行,有 2 個(gè) Map 進(jìn)程,已經(jīng)處理完畢,有 2 個(gè) Reduce 正在處理,這跟我們程序設(shè)計(jì)預(yù)期的是一樣的。
當(dāng) 狀態(tài)變?yōu)?successful 后,進(jìn)入 dfs 文件系統(tǒng)可以看到,輸出的 dfs 文件系統(tǒng)已經(jīng)生成,位置位于 /user/oracle/dfs 下,目錄名為 output-2012.09.02.13.52,可以看到格式和命名方式與 Demo 設(shè)計(jì)是一致的,如下圖所示:
圖 12.Demo 輸出目錄(1)
我們進(jìn)入具體的輸出目錄,可以清楚的看到程序處理的輸出結(jié)果,正如我們 Demo 中設(shè)計(jì)的,兩個(gè) Reduce 分別生成了兩個(gè)輸出文件,分別是 part-r-00000 和 part-r-00001,對(duì)應(yīng) Module 和 Log Level 的處理輸出信息:
圖 13.Demo 輸出目錄(2)
點(diǎn)擊 part-r-00000 的輸出文件鏈接,可以看到程序處理后的 log level 的統(tǒng)計(jì)信息:
圖 14.Demo 輸出結(jié)果(1)
點(diǎn)擊 part-r-00001 的輸出文件鏈接,可以看到程序處理后 Module 的統(tǒng)計(jì)信息:
圖 15.Demo 輸出結(jié)果(2)
至 此我們基于新的 Yarn 框架的 Demo 完全成功運(yùn)行,實(shí)現(xiàn)功能與預(yù)期設(shè)計(jì)完全一致,運(yùn)行狀態(tài)和 NameNode/DataNode 部署,Job/MapReduece 程序的調(diào)度均和設(shè)計(jì)一致。讀者可參考該 Demo 的配置及代碼進(jìn)行修改,做為實(shí)際生產(chǎn)環(huán)境部署和實(shí)施的基礎(chǔ)。
轉(zhuǎn)載于:https://www.cnblogs.com/jessen163/articles/3825547.html
總結(jié)
以上是生活随笔為你收集整理的新版Hadoop MapReduce-Yarn的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: js获取datagrid行,但是行改变了
- 下一篇: 招才猫直聘如何申诉