最经典的大数据案例解析(附代码)
生活随笔
收集整理的這篇文章主要介紹了
最经典的大数据案例解析(附代码)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
首先我們來說說需求
假設以上就是我們需要處理的數據,我們需要計算出每個月天氣最熱的兩天。
這個案例用到的東西很多,如果你能靜下心來好好看完,你一定會受益匪淺的
首先我們對自己提出幾個問題
1.怎么劃分數據,怎么定義一組???
2.考慮reduce的計算復雜度???
3.能不能多個reduce???
4.如何避免數據傾斜???
5.如何自定義數據類型???
----記錄特點
每年
每個月
溫度最高
2天
1天多條記錄怎么處理?
----進一步思考
年月分組
溫度升序
key中要包含時間和溫度!
----MR原語:相同的key分到一組
通過GroupCompartor設置分組規則
----自定義數據類型Weather
包含時間
包含溫度
自定義排序比較規則
----自定義分組比較
年月相同被視為相同的key
那么reduce迭代時,相同年月的記錄有可能是同一天的,reduce中需要判斷是否同一天
注意OOM
----數據量很大
全量數據可以切分成最少按一個月份的數據量進行判斷
這種業務場景可以設置多個reduce
通過實現partition
一>>>MainClass的實現
package com.huawei.mr.weather;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午7:43:40*/ public class MainClass {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 輸入錯誤返回提示if (args == null || args.length != 2) {System.out.println("輸入格式有誤");System.out.println("正確格式為:yarn jar weather.jar com.huawei.mr.weather.MainClass args[0] args[1]");}// 初始化hadoop默認配置文件,如果有指定的配置,則覆蓋默認配置Configuration conf = new Configuration(true);// 創建Job對象,用到系統配置信息Job job = Job.getInstance(conf);// 指定job入口程序job.setJarByClass(MainClass.class);// 設置job名稱job.setJobName("weather");// 指定文件從哪里讀取,從hdfs加載一個輸入文件給jobFileInputFormat.addInputPath(job, new Path(args[0]));// 指定hdfs上一個不存在的路徑作為job的輸出路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));// 自主設置reduce的數量job.setNumReduceTasks(2);// 指定map輸出中key的類型job.setMapOutputKeyClass(Weather.class);// 指定map輸出中value的類型job.setMapOutputValueClass(Text.class);// 設置map中的比較器,如果不設置默認采用key類型自帶的比較器/*** 由于map里面的排序和這兒的排序不一樣,稱之為二次排序*/job.setSortComparatorClass(WetherComparator.class);// 設置分區器類型 避免數據傾斜job.setPartitionerClass(WeatherPartitioner.class);job.setMapperClass(WeatherMapper.class);job.setReducerClass(WeatherReduce.class);job.waitForCompletion(true);} }二 >>>Weather 自定義key的實現
package com.huawei.mr.weather;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午8:15:26* map中輸出key的自定義*/public class Weather implements WritableComparable<Weather> {private String year;private String month;private String day;private Integer weather;public String getYear() {return year;}public void setYear(String year) {this.year = year;}public String getMonth() {return month;}public void setMonth(String month) {this.month = month;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}public Integer getWeather() {return weather;}public void setWeather(Integer weather) {this.weather = weather;}@Overridepublic void write(DataOutput out) throws IOException {// 把封裝的數據序列化之后寫出去out.writeUTF(year);out.writeUTF(month);out.writeUTF(day);out.writeInt(weather);}/** 讀寫的順序要一致*/@Overridepublic void readFields(DataInput in) throws IOException {// 把封裝的數據序列化之后讀進來setYear(in.readUTF());setMonth(in.readUTF());setDay(in.readUTF());setWeather(in.readInt());}@Overridepublic int compareTo(Weather that) {int result = 0;result = this.getYear().compareTo(that.getYear());if (result == 0) {result = this.getMonth().compareTo(that.getMonth());if (result == 0) {result = this.getDay().compareTo(that.getDay());if (result == 0) {// 如果年月日都相同,把溫度按照高到低倒序排列result = that.getWeather().compareTo(this.getWeather());}}}return result;}}三 >>>自定義map中key的比較器用于排序package com.huawei.mr.weather;import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午8:29:41* map中的比較器設置*/ public class WetherComparator extends WritableComparator {public WetherComparator() {super(Weather.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {int result = 0;Weather wa = (Weather) a;Weather wb = (Weather) b;// 分組比較器要保證同年同月為一組 和Weather里面的排序規則不一樣result = wa.getYear().compareTo(wb.getYear());if (result == 0) {result = wa.getMonth().compareTo(wb.getMonth());if (result == 0) {result = wb.getWeather().compareTo(wa.getWeather());}}return result;} }四>>>設置分區器避免數據傾斜
package com.huawei.mr.weather;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午8:47:46 * 分區器,避免數據傾斜*/ public class WeatherPartitioner extends Partitioner<Weather, Text> {@Overridepublic int getPartition(Weather key, Text value, int numPartitions) {String month = key.getMonth();int partitionNum = (month.hashCode() & Integer.MAX_VALUE) % numPartitions;return partitionNum;} }五>>>map里面對每一行的處理
package com.huawei.mr.weather;import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午8:55:29 map里面的處理*/public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> {private SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-mm-dd");private Weather wea = new Weather();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 每一行的數據格式為 1949-10-01 14:21:02 34cString linStr = value.toString();// {"1949-10-01 14:21:02","34c"}String[] linStrs = linStr.split("\t");// 得到溫度int weather = Integer.parseInt(linStrs[1].substring(0, linStrs[1].length() - 1));// 獲取時間try {Date date = DATE_FORMAT.parse(linStrs[0]);Calendar calendar = Calendar.getInstance();calendar.setTime(date);int year = calendar.get(Calendar.YEAR);int month = calendar.get(Calendar.MONTH);int day = calendar.get(Calendar.DAY_OF_MONTH);wea.setYear(year + "");wea.setMonth(month + "");wea.setDay(day + "");wea.setWeather(weather);// 把map中的值輸出context.write(wea, value);} catch (ParseException e) {e.printStackTrace();}}} 六>>>reduce里面的輸出 package com.huawei.mr.weather;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;/*** @author Lpf.* @version 創建時間:2019年4月13日 下午8:55:35 * reduce 里面的處理*/ public class WeatherReduce extends Reducer<Weather, Text, Text, NullWritable> {@Overrideprotected void reduce(Weather key, Iterable<Text> values, Context context)throws IOException, InterruptedException {Iterator<Text> iterator = values.iterator();Text text = null;String day = null;while (iterator.hasNext()) {text = iterator.next();if (day != null) {if (!day.equals(key.getDay())) {// 輸出本月溫度最高的第二天context.write(text, NullWritable.get());break;}} else {// 輸出本月溫度最高的第一天context.write(text, NullWritable.get());day = key.getDay();}}} }年紀上來了 坐一下腰就酸的要死注釋補充的不是很完整,有不明白的留言,樂意解答
總結
以上是生活随笔為你收集整理的最经典的大数据案例解析(附代码)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: esp8266 擦拭_【一起玩esp82
- 下一篇: jmeter聚个报告怎么看qps_【jm