【小白视角】大数据基础实践(五) MapReduce编程基础操作
目錄
- 1. MapReduce 簡介
- 1.1 起源
- 1.2 模型簡介
- 1.3 MRv1體系結構
- 1.4 YARN
- 1.4.1 YARN體系結構
- 1.4.2 YARN工作流程
- 2. MapReduce 工作流程
- 3. Java Api要點
- 4. 實驗過程
- 最后
1. MapReduce 簡介
1.1 起源
在函數式語言里,map表示對一個列表(List)中的每個元素做計算,reduce表示對一個列表中的每個元素做迭代計算。
它們具體的計算是通過傳入的函數來實現的,map和reduce提供的是計算的框架。
- 在MapReduce里,map處理的是原始數據,每條數據之間互相沒有關系;
- 到了reduce階段,數據是以key后面跟著若干個value來組織的,這些value有相關性,至少它們都在一個key下面,于是就符合函數式語言里map和reduce的基本思想了。
- “map”和“reduce”的概念和它們的主要思想,都是從函數式編程語言借用來的,還有從矢量編程語言里借來的特性。極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統上。
1.2 模型簡介
1.3 MRv1體系結構
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task
結點說明:
- Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端,用戶可通過Client提供的一些接口查看作業運行狀態。 - JobTracker
JobTracker負責資源監控和作業調度;JobTracker監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點;JobTracker會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,選擇合適的任務去使用這些資源。 - TaskTracker
TaskTracker會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等)。TaskTracker使用“slot”等量劃分本節點上的資源量(CPU、內存等)。一個Task獲取到一個slot后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot和Reduce slot兩種,分別供Map Task和Reduce Task使用。 - Task
Task分為Map Task和Reduce Task兩種,均由TaskTracker啟動。
結構缺點:
- 存在單點故障
- JobTracker“大包大攬”導致任務過重(任務多時內存開銷大,上限4000節點)
- 容易出現內存溢出(分配資源只考慮MapReduce任務數,不考慮CPU、內存)
- 資源劃分不合理(強制劃分為slot ,包括Map slot和Reduce slot)
1.4 YARN
1.4.1 YARN體系結構
架構思想
體系結構
ResourceManager
? 處理客戶端請求
? 啟動/監控ApplicationMaster
? 監控NodeManager
? 資源分配與調度
NodeManager
? 單個節點上的資源管理
? 處理來自ResourceManger的命令
? 處理來自ApplicationMaster的命令
ApplicationMaster
? 為應用程序申請資源,并分配給內部任務
? 任務調度、監控與容錯
1.4.2 YARN工作流程
步驟1:用戶編寫客戶端應用程序,向YARN提交應用程序,提交的內容包括ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等
步驟2:YARN中的ResourceManager負責接收和處理來自客戶端的請求,為應用程序分配一個容器,在該容器中啟動一個ApplicationMaster
步驟3:ApplicationMaster被創建后會首先向ResourceManager注冊
步驟4:ApplicationMaster采用輪詢的方式向ResourceManager申請資源
步驟5:ResourceManager以“容器”的形式向提出申請的ApplicationMaster分配資源
步驟6:在容器中啟動任務(運行環境、腳本)
步驟7:各個任務向ApplicationMaster匯報自己的狀態和進度
步驟8:應用程序運行完成后,ApplicationMaster向ResourceManager的應用程序管理器注銷并關閉自己
2. MapReduce 工作流程
? 不同的Map任務之間不會進行通信
? 不同的Reduce任務之間也不會發生任何信息交換
? 用戶不能顯式地從一臺機器向另一臺機器發送消息
? 所有的數據交換都是通過MapReduce框架自身去實現的
例子
3. Java Api要點
- Writable
Hadoop 自定義的序列化接口。當要在進程間傳遞對象或持久化對象的時候,就需要序列化對象成字節流,反之當要將接收到或從磁盤讀取的字節流轉換為對象,就要進行反序列化。Map 和 Reduce 的 key、value 數據格式均為 Writeable 類型,其中 key 還需實現WritableComparable 接口。Java 基本類型對應 writable 類型的封裝如下:
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | ShortWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| enum | EnumWritable |
| Map | MapWritable |
(2)InputFormat
用于描述輸入數據的格式。提供兩個功能:
getSplits()數據分片,按照某個策略將輸入數據切分成若干個split,以便確定Map任務個數以及對應的 split;createRecordReader(),將某個split解析成一個個 key-value 對。
FileInputFormat是所有以文件作為數據源的 InputFormat實現基類,小文件不會進行分片,記錄讀取調用子類 TextInputFormat實現;
- TextInputFormat是默認處理類,處理普通文本文件,以文件中每一行作為一條記錄,行起始偏移量為key,每一行文本為 value;
- CombineFileInputFormat 針對小文件設計,可以合并小文件;
- KeyValueTextInputFormat適合處理一行兩列并以tab作為分隔符的數據;
- NLineInputFormat控制每個 split中的行數。
(3)OutputFormat
主要用于描述輸出數據的格式。Hadoop 自帶多種 OutputFormat 的實現。
- TextOutputFormat默認的輸出格式,key 和 value 中間用 tab 分隔;
- SequenceFileOutputFormat,將 key 和 value 以 SequenceFile 格式輸出;
- SequenceFileAsOutputFormat,將 key 和 value 以原始二進制格式輸出;
- MapFileOutputFormat,將 key 和 value 寫入 MapFile 中;
- MultipleOutputFormat,默認情況下 Reducer 會產生一個輸出,用該格式可以實現一個Reducer 多個輸出。
(4)Mapper/Reducer
封裝了應用程序的處理邏輯,主要由 map、reduce 方法實現。
(5)Partitioner
根據 map 輸出的 key 進行分區,通過 getPartition()方法返回分區值,默認使用哈希函
數。分區的數目與一個作業的reduce任務的數目是一樣的。HashPartitioner是默認的Partioner。
4. 實驗過程
1、計數統計類應用
仿照 WordCount 例子,編寫“TelPubXxx”類實現對撥打公共服務號碼的電話信息的統計。給出的一個文本輸入文件如下,第一列為電話號碼、第二列為公共服務號碼,中間以空格隔開。
13718855152 11216810117315 110
39451849 112
13718855153 110
13718855154 112
18610117315 114
18610117315 114
MapReduce 程序執行后輸出結果如下,電話號碼之間用“|”連接:
110 13718855153|16810117315
112 13718855154|39451849|13718855152
114 18610117315|18610117315
運行成功
2、兩表聯結 Join 應用
仿照單表關聯例子,編寫“RelationXxx”類實現多表關聯。中文文本文件轉成 UTF-8 編碼格式,否則會亂碼。
輸入 score.txt:
| s003001 | fd3003 | 84 |
| s003001 | fd3004 | 90 |
| s003002 | fd2001 | 71 |
| s002001 | fd1001 | 66 |
| s001001 | fd1001 | 98 |
| s001001 | fd1002 | 60 |
輸入 major.txt:
| fd1001 | 數據挖掘 | 數學系 |
| fd2001 | 電子工程 | 電子系 |
| fd2002 | 電子技術 | 電子系 |
| fd3001 | 大數據 | 計算機系 |
| fd3002 | 網絡工程 | 計算機系 |
| fd3003 | Java 應用 | 計算機系 |
| fd3004 | web 前端 | 計算機系 |
輸出結果:
| fd1001 | 數據挖掘 | 數學系 | s001001 | 98 |
| fd1001 | 數據挖掘 | 數學系 | s002001 | 66 |
| fd2001 | 電子工程 | 電子系 | s003002 | 71 |
| fd3003 | Java 應用 | 計算機系 | s003001 | 84 |
| fd3004 | web 前端 | 計算機系 | s003001 | 90 |
將其中需要的東西傳到hdfs中去。
沒有報錯。查看結果
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class RelationZqc {public static int time = 0;public static class RelationMap extends Mapper<Object, Text, Text, Text> {private Text classID = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String filename=((FileSplit)context.getInputSplit()).getPath().getName();String[] s = value.toString().split(" ");if(filename.equals("score.txt")){classID.set(s[1]);String val="1," + s[0] + "," + s[2];context.write(classID,new Text(val));}else if (filename.equals("major.txt")){if(!s[0].equals("classid")){classID.set(s[0]);String val = "2," + s[1] + "," + s[2];context.write(classID,new Text(val));}}}}public static class RelationReduce extends Reducer<Text, Text, Text, Text> {private Text result = new Text();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String[][] studentTable=new String[10][2];String[] data;String classID = "nil";if(time == 0){context.write(new Text("classid"), new Text("classname deptname studentid score"));time++;}int cnt = 0;for (Text val : values) {data = val.toString().split(",");if(data[0].equals("1")){studentTable[cnt][0] = data[1];studentTable[cnt][1] = data[2];cnt = cnt + 1;}else if(data.length == 3 && data[0].equals("2")){classID = data[1] + " " + data[2];}}for(int i = 0; i < cnt; i++){if(classID.equals("nil")) continue;String s=classID+" "+studentTable[i][0]+" "+studentTable[i][1];result.set(s);context.write(key, result);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 加載hadoop配置conf.set("fs.defaultFS", "hdfs://localhost:9000");String[] otherArgs = new String[]{"input/score.txt", "input/major.txt", "output/outputRelationZqc"}; // String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: Relation <in> <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "RelationZqc");// 設置環境參數job.setJarByClass(RelationZqc.class);// 設置程序主類job.setMapperClass(RelationMap.class);// 設置用戶實現的Mapper類job.setReducerClass(RelationReduce.class);// 設置用戶實現的Reducer類job.setOutputKeyClass(Text.class);// 設置輸出key類型job.setOutputValueClass(Text.class); // 設置輸出value類型for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加輸入文件路徑}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 設置輸出文件路徑System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作業并等待結束} }3、簡單排序類應用編寫 MapReduce 程序“SortXxx” 類,要求輸入文件 sort1.txt、sort2.txt、sort3.txt 內容,由程序隨機生成若干條數據并存儲到 HDFS 上,每條數據占一行,數據可以是日期也可以是數字;輸出結果為兩列數據,第一列是輸入文件中的原始數據,第二列是該數據的排位。
運行成功
最后
小生凡一,期待你的關注。
總結
以上是生活随笔為你收集整理的【小白视角】大数据基础实践(五) MapReduce编程基础操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2022-05-20 工作记录--Rea
- 下一篇: 听书记录《人性中的善良天使》