Spark _07_补充部分算子【二】
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                Spark _07_补充部分算子【二】
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                接Spark _06_補充部分算子【一】
https://blog.csdn.net/qq_41946557/article/details/102673673
scala API?
package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject ElseFun2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ddd").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")/*** cogroup*/val nameRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("george", 18), ("george", 180), ("george", 2000),("MM",18)),4)val scoreRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("george", 100), ("george", 200), ("kk", 300),("ll",400)),4)val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = nameRDD.cogroup(scoreRDD)result.foreach(println)/*** (ll,(CompactBuffer(),CompactBuffer(400)))* (kk,(CompactBuffer(),CompactBuffer(300)))* (MM,(CompactBuffer(18),CompactBuffer()))* (george,(CompactBuffer(18, 180, 2000),CompactBuffer(100, 200)))*//*** distinct*/ // val rdd = sc.parallelize(List[String]("a","b","a","c","d","d"))//去重 // rdd.map(one =>{(one,1)}).reduceByKey((v1,v2)=>{v1+v2}).map(one=>{one._1}).foreach(println)//去重2 // val result: RDD[String] = rdd.distinct() // result.foreach(println)/*** mapPartitions && foreachPartition*/ // val rdd = sc.parallelize(List[String]("george","love","kk","ll","like","dd"),2)/*val l = rdd.map(one => {println("建立數據庫連接")println(s"插入數據:$one")println("關閉數據庫連接")one + "!"}).count()println(l)//6 頻繁創建連接,浪費資源 *///一個分區一個分區的處理/*val l = rdd.mapPartitions(iter => {val list = new ListBuffer[String]println("建立數據庫連接")while (iter.hasNext) {val str = iter.next()list.+=(str)println(s"插入數據:$str")}println("關閉數據庫連接")list.iterator}).count()println(l)*///2 建立了兩次數據庫連接/*rdd.foreachPartition(iter =>{println("建立數據庫連接!")while(iter.hasNext){val str:String = iter.next();println(s"插入數據庫,$str");}println("關閉數據庫連接!!!")})*//*val rdd1 = sc.parallelize(List[Int](1,2,3,4))val rdd2 = sc.parallelize(List[Int](3,4,5,6))*///subtract 差集 rdd2.subtract(rdd1)/*val result = rdd1.subtract(rdd2)result.foreach(println)*//*** 1* 2*///intersection 交集/*val result: RDD[Int] = rdd1.intersection(rdd2)*//*** 4* 3*/} }JAVA API
package eee;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;import java.util.Arrays;/*** @author George* @description**/ public class ElseFun {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("test");JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String,Integer> nameRdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("george", 22), new Tuple2<String,Integer>("lucy", 22), new Tuple2<String,Integer>("dk", 22), new Tuple2<String,Integer>("ll", 22)));JavaPairRDD<String,Integer> scoreRdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("george", 22), new Tuple2<String,Integer>("lucy", 22), new Tuple2<String,Integer>("dk", 22), new Tuple2<String,Integer>("ll", 22)));JavaPairRDD<String, Tuple2<Integer, Integer>> result = nameRdd.join(scoreRdd);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Integer>> s) throws Exception {System.out.println(s);}});/*** (ll,(22,22))* (dk,(22,22))* (george,(22,22))* (lucy,(22,22))*/} }?
總結
以上是生活随笔為你收集整理的Spark _07_补充部分算子【二】的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 使用crontab定时保存top信息到文
- 下一篇: 浅析Nginx
