Spark _13_topN
生活随笔
收集整理的這篇文章主要介紹了
Spark _13_topN
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
數據準備:
class1 100 class2 86 class3 70 class1 102 class2 65 class1 45 class2 85 class3 70 class1 16 class2 88 class1 95 class2 37 class3 98 class1 99 class2 23代碼:【java API】
package ddd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;import java.util.Iterator;/*** @author George* @description**/ public class TopN {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOps");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> linesRDD = sc.textFile("./data/scores.txt");JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String str) throws Exception {String[] splited = str.split("\t");String clazzName = splited[0];Integer score = Integer.valueOf(splited[1]);return new Tuple2<String, Integer> (clazzName,score);}});pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String clazzName = tuple._1;Iterator<Integer> iterator = tuple._2.iterator();Integer[] top3 = new Integer[3];while (iterator.hasNext()) {Integer score = iterator.next();for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("class Name:"+clazzName);for(Integer sscore : top3){System.out.println(sscore);}}});} }運行結果:
class Name:class3 98 70 70 class Name:class1 102 100 99 class Name:class2 88 86 85?
總結
以上是生活随笔為你收集整理的Spark _13_topN的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: flatmap和map的区别!
- 下一篇: no.4_药丸称重