10、自定义排序
文本中有如下內容:1 ?52 ?43 ?61 ?32 1
現要求對文本行進行排序,第一列相同時,比較第二列
package?sparkcore.java;
import?java.io.Serializable;import?scala.math.Ordered;/**?*?自定義的二次排序key?*/public?class?SecondarySortKey?implements?Ordered<SecondarySortKey>,?Serializable?{????private?static?final?long?serialVersionUID?=?-2366006422945129991L;????//?首先在自定義key里面,定義需要進行排序的列????private?int?first;????private?int?second;????public?SecondarySortKey(int?first,?int?second)?{????????this.first?=?first;????????this.second?=?second;????}????@Override????//?大于????public?boolean?$greater(SecondarySortKey?other)?{????????if?(this.first?>?other.getFirst())?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?>?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?大于等于????public?boolean?$greater$eq(SecondarySortKey?other)?{????????if?(this.$greater(other))?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?==?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?小于????public?boolean?$less(SecondarySortKey?other)?{????????if?(this.first?<?other.getFirst())?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?<?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?小于等于????public?boolean?$less$eq(SecondarySortKey?other)?{????????if?(this.$less(other))?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?==?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????public?int?compare(SecondarySortKey?other)?{????????if?(this.first?-?other.getFirst()?!=?0)?{????????????return?this.first?-?other.getFirst();????????}?else?{????????????return?this.second?-?other.getSecond();????????}????}????@Override????public?int?compareTo(SecondarySortKey?other)?{????????return?this.compare(other);????}????//?為要進行排序的多個列,提供getter和setter方法,以及hashcode和equals方法????public?int?getFirst()?{????????return?first;????}????public?void?setFirst(int?first)?{????????this.first?=?first;????}????public?int?getSecond()?{????????return?second;????}????public?void?setSecond(int?second)?{????????this.second?=?second;????}????@Override????public?int?hashCode()?{????????final?int?prime?=?31;????????int?result?=?1;????????result?=?prime?*?result?+?first;????????result?=?prime?*?result?+?second;????????return?result;????}????@Override????public?boolean?equals(Object?obj)?{????????if?(this?==?obj)????????????return?true;????????if?(obj?==?null)????????????return?false;????????if?(getClass()?!=?obj.getClass())????????????return?false;????????SecondarySortKey?other?=?(SecondarySortKey)?obj;????????if?(first?!=?other.first)????????????return?false;????????if?(second?!=?other.second)????????????return?false;????????return?true;????}}
package?sparkcore.java;import?org.apache.spark.SparkConf;import?org.apache.spark.api.java.JavaPairRDD;import?org.apache.spark.api.java.JavaRDD;import?org.apache.spark.api.java.JavaSparkContext;import?org.apache.spark.api.java.function.Function;import?org.apache.spark.api.java.function.PairFunction;import?org.apache.spark.api.java.function.VoidFunction;import?scala.Tuple2;/**?*?二次排序??*?1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法?*?2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD??*?3、使用sortByKey算子按照自定義的key進行排序?*?4、再次映射,剔除自定義的key,只保留文本行?*/public?class?SecondarySort?{????public?static?void?main(String[]?args)?{????????SparkConf?conf?=?new?SparkConf().setAppName("SecondarySort").setMaster("local");????????JavaSparkContext?sc?=?new?JavaSparkContext(conf);????????JavaRDD<String>?lines?=?sc.textFile("test.txt");????????JavaPairRDD<SecondarySortKey,?String>?pairs?=?lines.mapToPair(????????????????new?PairFunction<String,?SecondarySortKey,?String>()?{????????????????????private?static?final?long?serialVersionUID?=?1L;????????????????????@Override????????????????????public?Tuple2<SecondarySortKey,?String>?call(String?line)?throws?Exception?{????????????????????????String[]?lineSplited?=?line.split("?");????????????????????????SecondarySortKey?key?=?new?SecondarySortKey(Integer.valueOf(lineSplited[0]),????????????????????????????????Integer.valueOf(lineSplited[1]));????????????????????????//自定義Key做為key,原文本行作為value值????????????????????????return?new?Tuple2<SecondarySortKey,?String>(key,?line);????????????????????}????????????????});????????JavaPairRDD<SecondarySortKey,?String>?sortedPairs?=?pairs.sortByKey();????????JavaRDD<String>?sortedLines?=?sortedPairs.map(????????????????new?Function<Tuple2<SecondarySortKey,?String>,?String>()?{????????????????????private?static?final?long?serialVersionUID?=?1L;????????????????????@Override????????????????????public?String?call(Tuple2<SecondarySortKey,?String>?v1)?throws?Exception?{????????????????????????return?v1._2;//key丟棄,取文本行????????????????????}????????????????});????????sortedLines.foreach(new?VoidFunction<String>()?{????????????private?static?final?long?serialVersionUID?=?1L;????????????@Override????????????public?void?call(String?t)?throws?Exception?{????????????????System.out.println(t);????????????}????????});????????sc.close();????}}
package?sparkcore.scalaimport?org.apache.spark.SparkConfimport?org.apache.spark.SparkContextobject?SecondSort?{????def?main(args:?Array[String]):?Unit?=?{????val?conf?=?new?SparkConf()????????.setAppName("SecondSort")??????????.setMaster("local")??????val?sc?=?new?SparkContext(conf)??????val?lines?=?sc.textFile("test.txt",?1)????val?pairs?=?lines.map?{?line?=>?(????????new?SecondSortKey(line.split("?")(0).toInt,?line.split("?")(1).toInt),????????line)}????val?sortedPairs?=?pairs.sortByKey()????val?sortedLines?=?sortedPairs.map(sortedPair?=>?sortedPair._2)??????????sortedLines.foreach?{?sortedLine?=>?println(sortedLine)?}????}}
現要求對文本行進行排序,第一列相同時,比較第二列
Java版:
package?sparkcore.java;import?java.io.Serializable;import?scala.math.Ordered;/**?*?自定義的二次排序key?*/public?class?SecondarySortKey?implements?Ordered<SecondarySortKey>,?Serializable?{????private?static?final?long?serialVersionUID?=?-2366006422945129991L;????//?首先在自定義key里面,定義需要進行排序的列????private?int?first;????private?int?second;????public?SecondarySortKey(int?first,?int?second)?{????????this.first?=?first;????????this.second?=?second;????}????@Override????//?大于????public?boolean?$greater(SecondarySortKey?other)?{????????if?(this.first?>?other.getFirst())?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?>?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?大于等于????public?boolean?$greater$eq(SecondarySortKey?other)?{????????if?(this.$greater(other))?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?==?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?小于????public?boolean?$less(SecondarySortKey?other)?{????????if?(this.first?<?other.getFirst())?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?<?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????//?小于等于????public?boolean?$less$eq(SecondarySortKey?other)?{????????if?(this.$less(other))?{????????????return?true;????????}?else?if?(this.first?==?other.getFirst()?&&?this.second?==?other.getSecond())?{????????????return?true;????????}????????return?false;????}????@Override????public?int?compare(SecondarySortKey?other)?{????????if?(this.first?-?other.getFirst()?!=?0)?{????????????return?this.first?-?other.getFirst();????????}?else?{????????????return?this.second?-?other.getSecond();????????}????}????@Override????public?int?compareTo(SecondarySortKey?other)?{????????return?this.compare(other);????}????//?為要進行排序的多個列,提供getter和setter方法,以及hashcode和equals方法????public?int?getFirst()?{????????return?first;????}????public?void?setFirst(int?first)?{????????this.first?=?first;????}????public?int?getSecond()?{????????return?second;????}????public?void?setSecond(int?second)?{????????this.second?=?second;????}????@Override????public?int?hashCode()?{????????final?int?prime?=?31;????????int?result?=?1;????????result?=?prime?*?result?+?first;????????result?=?prime?*?result?+?second;????????return?result;????}????@Override????public?boolean?equals(Object?obj)?{????????if?(this?==?obj)????????????return?true;????????if?(obj?==?null)????????????return?false;????????if?(getClass()?!=?obj.getClass())????????????return?false;????????SecondarySortKey?other?=?(SecondarySortKey)?obj;????????if?(first?!=?other.first)????????????return?false;????????if?(second?!=?other.second)????????????return?false;????????return?true;????}}
package?sparkcore.java;import?org.apache.spark.SparkConf;import?org.apache.spark.api.java.JavaPairRDD;import?org.apache.spark.api.java.JavaRDD;import?org.apache.spark.api.java.JavaSparkContext;import?org.apache.spark.api.java.function.Function;import?org.apache.spark.api.java.function.PairFunction;import?org.apache.spark.api.java.function.VoidFunction;import?scala.Tuple2;/**?*?二次排序??*?1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法?*?2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD??*?3、使用sortByKey算子按照自定義的key進行排序?*?4、再次映射,剔除自定義的key,只保留文本行?*/public?class?SecondarySort?{????public?static?void?main(String[]?args)?{????????SparkConf?conf?=?new?SparkConf().setAppName("SecondarySort").setMaster("local");????????JavaSparkContext?sc?=?new?JavaSparkContext(conf);????????JavaRDD<String>?lines?=?sc.textFile("test.txt");????????JavaPairRDD<SecondarySortKey,?String>?pairs?=?lines.mapToPair(????????????????new?PairFunction<String,?SecondarySortKey,?String>()?{????????????????????private?static?final?long?serialVersionUID?=?1L;????????????????????@Override????????????????????public?Tuple2<SecondarySortKey,?String>?call(String?line)?throws?Exception?{????????????????????????String[]?lineSplited?=?line.split("?");????????????????????????SecondarySortKey?key?=?new?SecondarySortKey(Integer.valueOf(lineSplited[0]),????????????????????????????????Integer.valueOf(lineSplited[1]));????????????????????????//自定義Key做為key,原文本行作為value值????????????????????????return?new?Tuple2<SecondarySortKey,?String>(key,?line);????????????????????}????????????????});????????JavaPairRDD<SecondarySortKey,?String>?sortedPairs?=?pairs.sortByKey();????????JavaRDD<String>?sortedLines?=?sortedPairs.map(????????????????new?Function<Tuple2<SecondarySortKey,?String>,?String>()?{????????????????????private?static?final?long?serialVersionUID?=?1L;????????????????????@Override????????????????????public?String?call(Tuple2<SecondarySortKey,?String>?v1)?throws?Exception?{????????????????????????return?v1._2;//key丟棄,取文本行????????????????????}????????????????});????????sortedLines.foreach(new?VoidFunction<String>()?{????????????private?static?final?long?serialVersionUID?=?1L;????????????@Override????????????public?void?call(String?t)?throws?Exception?{????????????????System.out.println(t);????????????}????????});????????sc.close();????}}
Scala版本:
package?sparkcore.scalaclass?SecondSortKey(val?first:?Int,?val?second:?Int)????extends?Ordered[SecondSortKey]?with?Serializable?{??def?compare(that:?SecondSortKey):?Int?=?{????if?(this.first?-?that.first?!=?0)?{??????this.first?-?that.first????}?else?{??????this.second?-?that.second????}??}}package?sparkcore.scalaimport?org.apache.spark.SparkConfimport?org.apache.spark.SparkContextobject?SecondSort?{????def?main(args:?Array[String]):?Unit?=?{????val?conf?=?new?SparkConf()????????.setAppName("SecondSort")??????????.setMaster("local")??????val?sc?=?new?SparkContext(conf)??????val?lines?=?sc.textFile("test.txt",?1)????val?pairs?=?lines.map?{?line?=>?(????????new?SecondSortKey(line.split("?")(0).toInt,?line.split("?")(1).toInt),????????line)}????val?sortedPairs?=?pairs.sortByKey()????val?sortedLines?=?sortedPairs.map(sortedPair?=>?sortedPair._2)??????????sortedLines.foreach?{?sortedLine?=>?println(sortedLine)?}????}}
轉載于:https://www.cnblogs.com/jiangzhengjun/p/7265173.html
總結
- 上一篇: java中日期格式转换
- 下一篇: JustOj 1486: Hello,