Hadoop学习记录(4)|MapReduce原理|API操作使用
MapReduce概念
MapReduce是一種分布式計算模型,由谷歌提出,主要用于搜索領域,解決海量數據計算問題。
MR由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數實現分布式計算。
這兩個函數的形參是key,value對,表示函數的輸入信息。
MP執行流程
客戶端提交給jobtracker,jobtracker分配給tasktracker。
trasktracker會對任務進行mapper和reducer操作。
MapReduce原理
一個map輸入k1、v1,數據由輸入文件中獲取
map會把數據提交到每一個shuffle,最后輸出到reducer任務。
reducer任務的數量跟mapper發送到shuffle的數量是一致的。
map任務處理
1.1、讀取輸入文件內容,解析成key,value對。對輸入文件的每一個解析成key\value對。每個鍵值對調用一次map函數。
1.2、寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
1.3、對輸出的key、value進行分區
1.4、對不同分區的數據按照key進行排序、分組。相同key的value放到一個集合中。
1.5、(可選)分組后對數據進行規約
reduce任務處理
2.1、對多個map任務的輸出按照不同的分區通過網絡拷貝到reduce節點。
2.2、對多個map任務輸出進行合并、排序。寫reduce函數自己的邏輯,對輸入key、value處理,轉換成新的key、value輸出。
2.3、把輸出的結果保存到HDFS中。
MapReduce執行過程
1.1.讀取hdfs中文件,每個解析成<k,v>。每一個鍵值對調用一次map函數。
解析成兩個<k,v>,分別<0,hello you><10,hello me>。調用map函數兩次。
k是每行的開始位置,v則示每行的文本內容。
1.2.覆蓋map()函數,接收1.1產生的<k,v>,進行處理,轉換新的<k,v>輸出。
1.3.對1.2輸出的<k,v>進行分區
public void map(k,v,context){
String[] split = v.toString().split(“ ”);
for(String str : split){
context.write(str,1);
}
}
1.4.對不同分區中的數據進行排序(按照k)、分組,分別將key的value放到一個集合中。
map輸出后的數據是:<hello,1]>,<you,1>,<hello,1>,<me,1>
排序后: <hello,1]>, <hello,1>,<you,1>,<me,1>
分組后:<hello,{1,1}>,<you,{1}>,<me,{1}>
1.5.(可選)對分組后的數據進行規約。
2.1.多個map任務的輸出按照不同的分區,通過網絡copy到不同的reduce節點上。
2.2.對多個map的輸出進行合并,排序,覆蓋reduce函數,接收的是分組的數據,實現自己的業務邏輯,處理后產生新的<k,v>輸出。
reduce函數被調用3次,跟分組次數一致。
public void reduce(k,vs,context){
long sum =0L;
for(long num:vs){
sum +=num;
}
context.write(k,sum);
}
2.3.設置任務執行,對reduce輸出的<k,v>保存到hdfs中。
job.waitForCompletion(true);
?
整個流程我分了四步。簡單些可以這樣說,每個map task都有一個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數據。
MapReduce提交的源代碼分析
waitForCompletion函數中的submit方法連接和提交到jobtracker。
在eclipse中寫的代碼如何提交到JobTracker中的哪?
答(1)eclipse中調用的job.waitForCompletion(true),實際調用的是JobClient中的提交方法。
contect()
info = jobClient.submitJobInternal(conf)
(2)在contect()中,實際創建了一個JobClient對象,在調用該對象的構造方法時,獲得了JobTracker的客戶端代理對象JobSubmissionProtocol
jobSubmissionProtocol實現類是JobTracker
(3)在jobClient.submitJobInternal(conf)方法中,調用了jobSubmissionProtocol.submitJob()
即,執行的是JobTracker.submitJob(..)
Hadoop基本類型
Hadoop數據類型必須實現Writable接口。
Long LongWritable
Boolean BooleanWritable
String Text
Integer IntWritable
Java類型轉換為Hadoop基本類型:
直接調用hadoop類的構造方法,或者調用set()方法
new IntWritable(123)
Hadoop類型轉換成Java類型:
text需要調用toString方法
其他類型調用get()方法
使用Hadoop自定義類型處理手機上網流量
1、自定義類
class KpiWritable implements Writable{
long upPackNum;
long downPackNum;
long upPayLoad;
long downPayLoad;
public KpiWritable() {}
public KpiWritable(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
@Override
public void write(DataOutput out) throws IOException {
//序列化出去
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
//順序和寫出去一樣
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public String toString() {
return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;
}
2、自定義Map
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
protected void map(LongWritable k1, Text v1,
org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context)
throws IOException, InterruptedException {
//處理接收的數據
String[] splits = v1.toString().split("\t");
//獲取手機號
String msisdn = splits[1];
Text k2 = new Text(msisdn);
KpiWritable v2 = new KpiWritable(splits[6],splits[7],splits[8],splits[9]);
//寫入context中交過reduce執行
context.write(k2, v2);
}
}
3、自定義Reduce
static class MyReduce extends Reducer<Text, KpiWritable, Text, KpiWritable>{
protected void reduce(Text k2, Iterable<KpiWritable> v2s,org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
/**
* k2 表示不同的手機號
* v2s 表示該手機號不同時段流量集合
*/
//定義計數器
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
//遍歷合并數據
for(KpiWritable kpi : v2s){
upPackNum += kpi.upPackNum;
downPackNum += kpi.downPackNum;
upPayLoad += kpi.upPayLoad;
downPayLoad += kpi.downPayLoad;
}
//封裝到對象中
KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
//寫入context中
context.write(k2, v3);
}
}
4、寫驅動程序
static final String INPUT_PATH = "hdfs://h1:9000/wlan";
static final String OUT_PATH = "hdfs://h1:9000/wlan_out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://h1:9000/"), conf);
Path outPut = new Path(OUT_PATH);
if(fileSystem.exists(outPut)){
fileSystem.delete(outPut,true);
}
/**
* 1.1、指定輸入文件路徑
* 1.1.1. 指定那個類來格式化輸入文
* 1.2、指定自定義的Mapper類
* 1.2.1.指定輸出<k2,v2>的類型
* 1.3、指定分區
* 1.4、排序分區(TODO)
* 1.5、(可選)合并
* 2.1、多個map任務的輸出,通過網絡copy到不同的reduce節點上,這個操作由hadoop自動完成
* 2.2、指定定義的reduce類
* 2.2.1.指定輸出<k3,v3>類型
* 2.3、指定輸出位置
* 2.3.1、設置輸出文件的格式化類
*
* 最后吧代碼提交到JobTracker執行
*/
//創建Job任務
Job job = new Job(conf,KpiApp.class.getSimpleName());
//設置輸入路徑
FileInputFormat.setInputPaths(job, INPUT_PATH);
//設置輸入數據使用的格式化類
job.setInputFormatClass(TextInputFormat.class);
//設置自定義Map類
job.setMapperClass(MyMapper.class);
//設置Map類輸出的key和value值類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
//設置分區類
job.setPartitionerClass(HashPartitioner.class);
//設置任務數
job.setNumReduceTasks(1);
//設置自定義Reduce類
job.setReducerClass(MyReduce.class);
//設置輸入數據使用的格式化類
job.setInputFormatClass(TextInputFormat.class);
//設置Reduce輸出的key和value值類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
//設置輸出文件位置
FileOutputFormat.setOutputPath(job, outPut);
//設置將任務提交到JobTracker
job.waitForCompletion(true);
}
MapReduce 0.x API區別
hadoop版本0.x
1、包一般是mapred。
2、使用是JobConf類創建Job任務。
3、使用JobClient.runJob(jobConf)提交任務。
4、自定義類需要繼承MapReduceBase實現Mapper和Reducer接口。
hadoop版本1.x
1、包一般是mapreduce。
2、使用的Job類創建任務。
3、ob.waitForCompletion(true)提交任務。
4、自定義類只需要繼承Mapper和Reducer類。
命令行運行指定參數
hadoop jar WordCount.jar hdfs://h1:9000/hello hdfs://h1:9000/cmd_out
跟eclipse直接運行的代碼區別:
1、類需要繼承org.apache.hadoop.conf.Configured,并實現org.apache.hadoop.util.Tool。
2、以前main方法中寫的驅動程序卸載覆寫的run方法中。
3、run()方法中
job.setJarByClass(CmdWordCount.class);
4、輸入輸出字符串定義為全局空字符串
5、main方法中使用org.apache.hadoop.util.ToolRunner的run方法,傳入new CmdWordCount()和args。args是main方法接收的字符串數組。
6、在覆寫的run方法中把接收到的args數組提取并賦值給INPUT和OUTPUT的路徑
INPUT_PATH = agrs0[0];
OUTPUT_PATH = args0[1];
6、打包時一定要記得選擇輸出類。
Hadoop計數器
File Input Format Counters
Bytes Read=19 讀取文件字節數
Map-Reduce Framework
Map output materialized bytes=65
1#Map input records=2 讀取記錄行
Reduce shuffle bytes=65
Spilled Records=8
Map output bytes=51
Total committed heap usage (bytes)=115675136
5#Combine input records=0 Map合并/規約輸入
SPLIT_RAW_BYTES=85
3# Reduce input records=4 reduce輸入行
4#Reduce input groups=3 Reduce輸入組數
Combine output records=0 Map合并/規約輸出
Reduce output records=3 Reduce輸出記錄
2#Map output records=4 Map輸出行
通過計數器數可以檢查出Map還是Reduce出現問題。
輿情監督示例,使用自定義計數器監控出現次數。
//自定義計數器
Counter helloCounter = context.getCounter("Sensitive Words", "hello");
String line = v1.toString();
if(line.contains("hello")){
helloCounter.increment(1L);
}
Combine操作
為什么使用Combine?
Combiner發生在Map端。對數據進行規約處理,數據量變小了,傳送到reduce的數據量變小,傳輸時間變短,作業整體時間變短。
為什么Combine不作為MapReduce的標配,而是可選配置?
因為不是所有的算法都適合使用Combine處理,例如求平均數。
適用于求和
Combine本身已經執行了Reduce操作,為什么Reduce階段還要執行reduce操作?
combine操作發送在map端,處理一個任務所接收的文件中的數據,不能跨map任務;只有reduce可以接收多個map任務處理數據。
設置參數
job.setCombinerClass(MyCombiner.class);
MyCombiner.class可以使用Reduce
Partitioner編程
對輸出的key,value進行分區。
指定自定義partition類,自定義類需要繼承HashPartitioner類。覆蓋getPartition方法。
分區的實例必須打成Jar包。
作用:
1、根據業務需要,產生多個輸出文件。
2、多個reduce任務在運行,提高整體job的運行效率。
根據實際情況來使用,如果有5臺機器,而分成100個分區來運行或出現延遲和整體效率低問題。因為需要排隊運行!
主要代碼:
設置成打包運行 job.setJarByClass(KpiApp.class);
設置分區job.setPartitionerClass(MyPartitioner.class);
設置Reduce任務數 job.setNumReduceTask(2);
自定義Partitioner類需要繼承HashPartition類,泛型使用K2,V2的類型。覆寫getPartition方法。
排序和分組
排序
在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較。如果讓v2也進行排序,需要將k2和v2組裝成心的類,作為k2,才能參與比較。
新類需要實現WritableCompareble,覆寫readFilds、write、compareTo、hasCode、equals方法。
在自定義map程序中將k2,v2封裝到新類中,當做k2寫入context。
編寫驅動main方法時,設置map輸出的類型(job.setMapOutputKeyClass(new.class))
分組
按照K2進行比較,這個k2是分裝到自定義類的k2。
自定義分組類實現RewComparetor,覆寫compare方法。
public int compare(NewK2 o1, NewK2 o2) {
return (int) (o1.k2 - o2.k2);
}
/**
* @param b1 表示第一個參與比較的字節數組
* @param s1 表示第一個參與比較的字節數組的起始位置
* @param l1 表示第一個參與比較的字節數組的偏移量
*
* @param b2 表示第二個參與比較的字節數組
* @param s2 表示第二個參與比較的字節數組的起始位置
* @param l2 表示第二個參與比較的字節數組的偏移量
*
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
job任務設置分組比較組 job.setGroupComparatorClass(MyGroup.class);
Shuffle
MapReduce的核心,俗稱洗牌、打亂。
shuffle在map任務傳送到reduce任務之間。
Map端
1、每個map有一個環形內存緩沖區,用于存儲任務的輸出,默認100MB,如果達到法制80MB后臺線程會把內容寫到指定磁盤(mapred.local.dir)下的新建的溢出文件。
2、寫入磁盤錢,要partition、sort。如果有combiner,combine后寫入。
3、最后記錄完成,合并全部溢寫文件為一個分區且排序的文件。
Reduce端
1、Reduce通過Http方式得到輸出文件的分區。
2、TaskTracker為分區文件運行Reduce任務。復制階段把Map輸出復制到Reducer的內存或磁盤。一個Map任務完成,Reduce開始復制輸出。
3、排序階段合并map輸出,最后運行Reduce階段。
MapReduce常見算法
單詞計數
數據去重
排序
TopK
選擇
投影
分組
多表連接
單邊關聯
轉載于:https://www.cnblogs.com/luguoyuanf/p/3593646.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Hadoop学习记录(4)|MapReduce原理|API操作使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【转】awk 里的substr函数用法举
- 下一篇: 旁注原理