Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce
?
1.數(shù)據(jù)存儲與分析
問題:當磁盤的存儲量隨著時間的推移越來越大的時候,對磁盤上的數(shù)據(jù)的讀取速度卻沒有多大的增長
從多個磁盤上進行并行讀寫操作是可行的,但是存在以下幾個方面的問題:
1).第一個問題是硬件錯誤。使用的硬件越多出錯的幾率就越大。一種常用的解決方式是數(shù)據(jù)冗余,保留多分拷貝,即使一份數(shù)據(jù)處理出錯,還有另外的數(shù)據(jù)。HDFS使用的也是類似的方式,但稍有不同。
2).第二個問題是數(shù)據(jù)處理的相關性問題。例如很多分析工作在一快磁盤上處理出來的結果需要與其他磁盤上處理處理出來的結果合并才能完成任務。各種分布式系統(tǒng)也都給出了合并的策略,但是做好這方面確實是一個挑戰(zhàn)。MapReduce提供了一種編程模型,他將從硬盤上讀寫數(shù)據(jù)的問題抽象出來,轉(zhuǎn)化成對一系列鍵值對的計算
簡而言之,Hadoop提供了一個可靠的存儲和分析系統(tǒng)。存儲又HDFS提供,分析由MapReduce提供。
?
2.與其他系統(tǒng)比較
1).RDBMS
為什么需要MapReduce?
a.磁盤的尋道時間提高的速度低于數(shù)據(jù)的傳輸速度,如果數(shù)據(jù)訪問模式由尋道時間支配的話,在讀寫數(shù)據(jù)集的一大部分的時候速度就會較流式讀取慢很多,這樣就出現(xiàn)了瓶頸。
b.另一方面在更新數(shù)據(jù)集的少量數(shù)據(jù)的時候,傳統(tǒng)的B-樹工作的比較好,但是在更新數(shù)據(jù)集的大部分數(shù)據(jù)的時候B-樹就顯得比MapReduce方式慢了。MapReduce使用排序/合并操作去重建數(shù)據(jù)庫(完成數(shù)據(jù)更新).
c.MapReduce比較適合于需要分析整個數(shù)據(jù)集,并且要使用批處理方式,特別是特定的分析的情況;RDBMS點查詢方面占優(yōu)勢,或在已編制索引的數(shù)據(jù)集提供低延遲的檢索和更新的數(shù)據(jù),但是數(shù)據(jù)量不能太大。MapReduce適合一次寫入,多次讀取的操作,但是關系數(shù)據(jù)庫就比較適合對數(shù)據(jù)集的持續(xù)更新。
d.MapReduce比較適合處理半結構化,非結構化的數(shù)據(jù)
e.MapReduce是可以進行線性擴展的編程模型。一個對集群級別的數(shù)據(jù)量而寫的MapReduce可以不加修改的應用于小數(shù)據(jù)量或者更大數(shù)據(jù)量的處理上。更重要的是當你的輸入數(shù)據(jù)增長一倍的時候,相應的處理時間也會增加一倍。但是如果你把集群也增長一倍的話,處理的速度則會和沒有增加數(shù)據(jù)量時候的速度一樣快,這方面對SQL查詢來說不見得是正確的。
f.關系數(shù)據(jù)往往進行規(guī)則化以保證數(shù)據(jù)完整性,并刪除冗余。這樣做給MapReduce提出了新的問題:它使得讀數(shù)據(jù)變成了非本地執(zhí)行,而MapReduce的一個重要前提(假設)就是數(shù)據(jù)可以進行高速的流式讀寫。
?
2).Grid Compuing 網(wǎng)格計算
a.MapReduce使數(shù)據(jù)和計算在一個節(jié)點上完成,這樣就變成了本地的讀取。這是MapReduce高性能的核心.
b.MPI將控制權大大的交給了程序員,但是這就要求程序員明確的處理數(shù)據(jù)流等情況,而MapReduce只提供高層次的操作:程序員只需考慮處理鍵值對的函數(shù),而對數(shù)據(jù)流則是比較隱晦的。
c.MapReduce是一種非共享(Shared-nothing)的架構,當MapReduce實現(xiàn)檢測到map或者reduce過程出錯的時候,他可以將錯誤的部分再執(zhí)行一次。MPI程序員則需要明確的考慮檢查點和恢復,這雖然給程序員很大自由,但是也使得程序變得難寫。
?
3).志愿計算
MapReduce是針對在一個高聚合網(wǎng)絡連接的數(shù)據(jù)中心中進行的可信的、使用專用的硬件工作持續(xù)數(shù)分鐘或者數(shù)個小時而設計的。相比之下,志愿計算則是在不可信的、鏈接速度有很大差異的、沒有數(shù)據(jù)本地化特性的,互聯(lián)網(wǎng)上的計算機上運行永久的(超長時間的)計算,
?
3.天氣數(shù)據(jù)集
數(shù)據(jù)是NCDC的數(shù)據(jù),我們關注以下特點:
1)? 數(shù)據(jù)是半格式化的
2)? 目錄里面存放的是從1901-2001年一個世紀的記錄,是gzip壓縮過的文件。
3)? 以行為單位,使用ASCII格式存儲,每行就是一條記錄
4)? 每條記錄我們關注一些基本的元素,比如溫度,這些數(shù)據(jù)在每條數(shù)據(jù)中都會出現(xiàn),并且寬度也是固定的。
下面是一條記錄的格式,為了便于顯示,做了一部分調(diào)整。
?
?
?
4.使用Unix工具分析數(shù)據(jù)
以分析某年份的最高溫度為例,下面是一段Unix的腳本程序:
#!/usr/bin/env bash for year in all/* doecho -ne `basename $year .gz`"\t"gunzip -c $year | \awk '{ temp = substr($0, 88, 5) + 0;q = substr($0, 93, 1);if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }END { print max }' done這段腳本的執(zhí)行過程如下:
腳本循環(huán)處理每一個壓縮的年份文件,首先打印出年份,然后對每一個文件使用awk處理。Awk腳本從數(shù)據(jù)中解析出兩個字段:一個air temperature,一個quality code。air temperature值加0被轉(zhuǎn)換成整形。接下來查看溫度數(shù)據(jù)是否有效(9999表示在NCDC數(shù)據(jù)集中丟失的值),并且檢查quality code是不是可信并沒有錯誤的。如果讀取一切正常,temp將與目前的最大值比較,如果出現(xiàn)新的最大值,則更新當前max的值。當文件中所有行的數(shù)據(jù)都被處理之后,開始執(zhí)行End程序塊,并且打印出最大值。
下面是某次運行結果的起始部分:
為了加速處理速度,我們將程序的某些部分進行并行執(zhí)行。這在理論上是比較簡單的,我們可以按照年份來在不同的處理器上執(zhí)行,使用所有可用的硬件線程,但是還是有些問題:
1).把任務切分成相同大小的塊不總是那么容易的。
2).合并單獨處理出來的結果還需要進一步的處理
3).人們?nèi)耘f被單機的處理能力所束縛。
?
5.?使用Hadoop分析數(shù)據(jù)
1).Map和Reduce
MapReduce將工作分為map階段和reduce階段,每個階段都將鍵值對作為輸入輸入,鍵值對的類型可以由程序員選擇。程序員還指定兩個函數(shù):map和reduce函數(shù)。
Map階段的輸入數(shù)據(jù)是NCDC的原始數(shù)據(jù),我們選擇文本格式輸入,這樣可以把記錄中的每一行作為文本value。Key是當前行離開始行的偏移量,但是我們并不需要這個信息,所以忽略不要。
我們的Map函數(shù)比較簡單,僅僅從輸入中析取出temperature。其中,map函數(shù)僅僅是完成了數(shù)據(jù)的準備階段,這樣使得reducer函數(shù)可以基于它查找歷年的最高溫度。Map函數(shù)也是一個很好的過濾階段,這里可以過濾掉丟失、置疑、錯誤的temperature數(shù)據(jù)。
下面是輸入數(shù)據(jù)樣例:
下面這些行以鍵值對的方式來給map函數(shù)處理,其中加粗的是我們需要處理的數(shù)據(jù)
上面的鍵(key)是文件中的行偏移量,map函數(shù)不需要,這里的map函數(shù)的功能僅限于提取年份和氣溫,并將他們作為輸出:
map函數(shù)輸出經(jīng)由mapreduce框架中進行進一步的處理后,主要需要根據(jù)鍵對鍵/值對進行排序和分組。經(jīng)過這一番處理之后,Reduce收來的結果如下:
處理這些數(shù)據(jù),reduce所需要做的工作僅僅是遍歷這些數(shù)據(jù),找出最大值,產(chǎn)生最終的輸出結果:
MapReduce的邏輯數(shù)據(jù)流:
?
?
2).Java MapReduce 程序
這里需要三塊代碼:Map函數(shù)、Reduce函數(shù)、用來運行作業(yè)的main函數(shù)
Map函數(shù)的實現(xiàn)
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extendsMapper<LongWritable, Text, Text, IntWritable> {private static final int MISSING = 9999;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+') { // parseInt doesn't like leading plus// signsairTemperature = Integer.parseInt(line.substring(88, 92));} else {airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")) {context.write(new Text(year), new IntWritable(airTemperature));}} }Hadoop提供了他自己的基本類型,這些類型為網(wǎng)絡序列化做了專門的優(yōu)化??梢栽?/span>org.apache.hadoop.io包中找到他們。比如LongWritable相當于Java中的Long,Text相當于String而IntWritable在相當于Integer。map()方法傳入一個key和一個value。我們將Text類型的value轉(zhuǎn)化成Java的String,然后用String的substring方法取出我偶們需要的部分,最后利用context.write按照鍵/值的形式收集數(shù)據(jù)。
?
Reduce函數(shù)的實現(xiàn)
?
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extendsReducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(key, new IntWritable(maxValue));} }Reduce的輸入類型必須是:Text,IntWritable類型。Reduce在本例輸出結果是Text和IntWritbale類型,year和與其對應的maxValue是經(jīng)過遍歷、比較之后得到的。
?
負責運行MapReduce作業(yè)的main函數(shù)
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);} }
?
6.數(shù)據(jù)流
1).MapReduce作業(yè)的工作單元由如下組成:輸入數(shù)據(jù)、MapReduce程序、配置信息
2).Hadoop將Task分成兩類:MapTask、ReduceTask.
3).為了控制Hadoop的Job運行,Hadoop有兩累節(jié)點:一種是jobtracker,一種是tasktracker。Jobtracker通過調(diào)度tasktracker協(xié)調(diào)所有工作的執(zhí)行。Tasktracker運行任務并將報告發(fā)送給jobtracker,jobtracker所有工作的進度。如果一個任務失敗,jobtracker再重新調(diào)度一個不同的tasktracker進行工作。但是在Hadoop 0.23和2.0版本ongoing他們已經(jīng)被Resource Manager、Node Manager和ApplicationMaster所替代,相應的資源并被Container所封裝
4).Hadoop將MapReduce的輸入數(shù)據(jù)分成固定大小的分片,成為數(shù)據(jù)分片(input split),然后為每個分片創(chuàng)建一個MapTask (分片默認64M大小)
5).Hadoop可以通過在存儲有輸入的數(shù)據(jù)節(jié)點上運行相應Map任務,提高性能(數(shù)據(jù)本地優(yōu)化),另外Map任務將其樹立后的數(shù)據(jù)寫到本地磁盤,減少Hadoop分布式Node之間的數(shù)據(jù)傳輸壓力。相比之下任務沒有數(shù)據(jù)本地優(yōu)化的優(yōu)勢-----單個Reduce任務的輸入通常來自所有的map輸出
下圖展現(xiàn)了Data-local(數(shù)據(jù)本地),Rack-local與Off-local Map任務的區(qū)別:
下圖給出MapReduce的集中執(zhí)行方式數(shù)據(jù)流的情況:
?一個Reduce的情況:
兩個Reduce的情況:
無Reduce情況:
?
?
?
?
?
?
7.Combiner函數(shù)
為了優(yōu)化集群數(shù)據(jù)傳輸,減少Map任務和Reduce任務之間的數(shù)據(jù)傳輸,Hadop針對Map任務指定一個Combiner函數(shù)(合并函數(shù))--對本地鍵(key)相同的做合并處理,實際上Combiner函數(shù)很像Reduce函數(shù),只不過運行在本地,最后Combiner函數(shù)的輸出作為作為Reduce的函數(shù)的輸入,Hadoop執(zhí)行函數(shù)的順序就變?yōu)镸ap--->Combiner--->Reduce
示例如下:
第一個Map的輸出:
第二個Map的輸出:
Reduce函數(shù)被調(diào)用時:
最后輸出結果:
?
指定一個Combiner函數(shù),這里用Reduce函數(shù)作為Combiner函數(shù)
?
public class MaxTemperatureWithCombiner {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperatureWithCombiner <input path> "+ "<output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperatureWithCombiner.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);} }
?
8.Hadoop的Streaming
Streaming天生適合用于文本處理,在文本模式下使用時,它有一個數(shù)據(jù)的行視圖,map的輸入數(shù)據(jù)通過標準輸入流傳遞給map函數(shù),并且一行一行的傳輸,最后將結果行寫到標準輸出。
map輸出的鍵/值對是以一個制表符分隔的行,它以這樣的形式寫到標準輸出,reduce函數(shù)的輸入格式相同----通過制表符來分隔的鍵/值對----并通過標準輸入流進行傳輸
1)ruby版本
2)Python版本
?
9.Hadoop的Pipes
?
?
?
?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/biyeymyhjob/archive/2012/08/08/2628265.html
總結
以上是生活随笔為你收集整理的Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用kickstart实现pxe的自动化
- 下一篇: 基于corosync和NFS服务器实现L