第1關:QueueStream
編程要求
在右側編輯器補充代碼,完成以下需求:
Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl: https://search.yahoo.com/search?p=反叛的魯魯修,statusCode:200
package net.educoder;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.LinkedBlockingQueue;public class Step1 {private static SparkConf conf;static {conf = new SparkConf().setMaster("local[*]").setAppName("Step1");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");}public static void main(String[] args) throws InterruptedException {/*********begin*********///1.初始化JavaStreamingContext并設置處理批次的時間間隔,Durations.seconds(1) --> 1秒一個批次JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.獲取QueueStream流LinkedBlockingQueue queue = QueueStream.queueStream(ssc);JavaDStream<String> dStream = ssc.queueStream(queue);/**** 數據格式如下:* 100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的魯魯修,404* 數據從左往右分別代表:用戶IP、訪問時間戳、起始URL及相關信息(訪問方式,起始URL,http版本)、目標URL、狀態碼*** 原始數據的切割符為逗號,(英文逗號)** 需求:* 1.將時間戳轉換成規定時間(格式為:yyyy-MM-dd HH:mm:ss )* 2.提取數據中的起始URL(切割符為空格)* 3.拼接結果數據,格式如下:* Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的魯魯修,statusCode:200* 4.判斷rdd是否為空,如果為空,調用 ssc.stop(false, false)與sys.exit(0) 兩個方法,反之將結果數據存儲到mysql數據庫中,調用JdbcTools.saveData(Iterator[String])即可*///3.獲取隊列流中的數據,進行清洗、轉換(按照上面的需求)SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");JavaDStream<String> map = dStream.map(x -> {String[] split = x.split(",");String ip = split[0];String time = simpleDateFormat.format(new Date(new Long(split[1])));String startUrl = split[2].split(" ")[1];String targetUrl = split[3];String statusCode = split[4];return "Ip:" + ip + ",visitTime:" + time + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode;});//4.判斷rdd是否為空,如果為空,調用 ssc.stop(false, false)與sys.exit(0) 兩個方法,反之將結果數據存儲到mysql數據庫中,調用JdbcTools.saveData(Iterator[String])即可map.foreachRDD(rdd -> {if (rdd.isEmpty()) {ssc.stop(false, false);System.exit(1);} else {rdd.foreachPartition(partitionOfRecords -> {JdbcTools.saveData(partitionOfRecords);});}});//5.啟動SparkStreamingssc.start();//6.等待計算結束ssc.awaitTermination();/*********end*********/}}
第2關:File Streams
編程要求
在右側編輯器中補全代碼,要求如下:
- /root/step11_fils下有兩個文件,文件內容分別為:
hadoop hadoop hadoop hadoop hadoop hadoop hadoop hadoop spark sparkhello hello hello hello hello hello hello hello study study
- 要求清洗數據并實時統計單詞個數,并將最終結果導入MySQL
step表結構:
列名數據類型長度非空
| word | varchar | 255 | √ |
| count | int | 255 | √ |
package com.educoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.io.Serializable;import java.sql.*;import java.util.Arrays;import java.util.Iterator;public class SparkStreaming {public static void main(String[] args) throws Exception {SparkConf conf=new SparkConf().setAppName("edu").setMaster("local");/********** Beign **********///1.初始化StreamingContext,設置時間間隔為1sJavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.設置文件流,監控目錄/root/step11_filsJavaDStream<String> DStream = ssc.textFileStream("file:///root/step11_fils");/* *數據格式如下:hadoop hadoop spark spark*切割符為空格*需求:*累加各個批次單詞出現的次數*將結果導入Mysql*判斷MySQL表中是否存在即將要插入的單詞,不存在就直接插入,存在則把先前出現的次數與本次出現的次數相加后插入*庫名用educoder,表名用step,單詞字段名用word,出現次數字段用count*///3.對數據進行清洗轉換JavaPairDStream<String, Integer> wordcount = DStream.flatMap(x -> Arrays.asList(x.split(" ")).iterator()).mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);//4.將結果導入MySQL,wordcount.foreachRDD(rdd->{rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> r) throws Exception {Connection connection= myconn();while(r.hasNext()){Tuple2<String, Integer> record = r.next();String querySql = "SELECT t.count FROM step t WHERE t.word = '" + record._1 + "'";ResultSet queryResultSet = connection.createStatement().executeQuery(querySql);Boolean hasNext = queryResultSet.next();if (!hasNext) {String insertSql = "insert into step(word,count) values('" + record._1 + "'," + record._2 + ")";connection.createStatement().execute(insertSql);} else {Integer newWordCount = queryResultSet.getInt("count") + record._2;String updateSql = "UPDATE step SET count = " + newWordCount + " where word = '" + record._1 + "'";connection.createStatement().execute(updateSql);}}connection.close();}});});//5.啟動SparkStreamingssc.start();/********** End **********/Thread.sleep(15000);ssc.stop();ssc.awaitTermination();}/***獲取mysql連接*@return*/public static Connection myconn()throws SQLException,Exception{Class.forName("com.mysql.jdbc.Driver");Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123");return conn;}}
第3關:socketTextStream
編程要求
在右側編輯器中補全代碼,要求如下:
- 要求清洗數據并實時統計單詞個數,并將最終結果導入MySQL
word表結構
列名數據類型長度主鍵非空
| word | varchar | 255 | √ | √ |
| wordcount | int | 11 | | √ |
package com;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.Optional;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.sql.Connection;import java.sql.DriverManager;public class JSocketSpark {public static void main(String[] args) throws InterruptedException{SparkConf conf = new SparkConf().setAppName("socketSparkStreaming").setMaster("local[*]");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));/**********begin**********///1.連接socket流 主機名:localhost 端口:5566JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 5566);//2.切分壓平JavaDStream<String> rdd1 = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());//3.組裝JavaPairDStream<String, Integer> rdd2 = rdd1.mapToPair(x -> new Tuple2<>(x, 1));//4.設置檢查點ssc.checkpoint("/root/check");//5.每個時間窗口內得到的統計值都累加到上個時間窗口得到的值,將返回結果命名為reducedJavaPairDStream<String, Integer> rdd3 = rdd2.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {//對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state)throws Exception {//第一個參數就是key傳進來的數據,第二個參數是曾經已有的數據//如果第一次,state沒有,updatedValue為0,如果有,就獲取Integer updatedValue = 0;if (state.isPresent()) {updatedValue = state.get();}//遍歷batch傳進來的數據可以一直加,隨著時間的流式會不斷去累加相同key的value的結果。for (Integer value : values) {updatedValue += value;}return Optional.of(updatedValue);//返回更新的值}});//6.將結果寫入MySQL// 語法:如果存在這個單詞就更新它所對應的次數// 如果不存在將其添加rdd3.foreachRDD(rdd -> {rdd.foreachPartition(x -> {Connection myconn = myconn();while (x.hasNext()){Tuple2<String, Integer> record = x.next();String sql = "insert into wordcount (word,wordcount) values('" + record._1 + "',"+record._2+") on DUPLICATE key update wordcount="+record._2;myconn.createStatement().execute(sql);}myconn.close();});});/********** End **********/ssc.start();ssc.awaitTermination();}public static Connection myconn()throws Exception{Class.forName("com.mysql.jdbc.Driver");Connection conn= DriverManager.getConnection("jdbc:mysql://localhost:3306/edu","root","123123");return conn;}}
第4關:KafkaStreaming
編程要求
在右側編輯器補充代碼,完成以下需求:
Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的魯魯修,statusCode:200
package net.educoder;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import java.text.SimpleDateFormat;import java.util.*;public class Step2 {private static SparkConf conf;static {conf = new SparkConf().setMaster("local[*]").setAppName("Step2");conf.set("spark.streaming.stopGracefullyOnShutdown", "true");}public static void main(String[] args) throws InterruptedException {Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "sparkStreaming");kafkaParams.put("enable.auto.commit", "false");TopicPartition topicPartition = new TopicPartition("test", 0);List<TopicPartition> topicPartitions = Arrays.asList(topicPartition);HashMap<TopicPartition, Long> offsets = new HashMap<>();offsets.put(topicPartition, 0l);/********** Begin **********///1.初始化JavaStreamingContext并設置處理批次的時間間隔,Durations.seconds(1) --> 1秒一個批次JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));//2.使用 KafkaUtils 對象創建流,使用 Assign 訂閱主題(Topic),上面已經為你定義好了 Topic列表:topicPartitions,kafka參數:kafkaParams,偏移量:offsetsJavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Assign(topicPartitions, kafkaParams, offsets));JavaDStream<String> dStream = javaInputDStream.map(x -> x.value());/**** 數據格式如下:* 100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的魯魯修,404* 數據從左往右分別代表:用戶IP、訪問時間戳、起始URL及相關信息(訪問方式,起始URL,http版本)、目標URL、狀態碼*** 原始數據的切割符為逗號,(英文逗號)** 需求:* 1.將時間戳轉換成規定時間(格式為:yyyy-MM-dd HH:mm:ss )* 2.提取數據中的起始URL(切割符為空格)* 3.拼接結果數據,格式如下:* Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的魯魯修,statusCode:200* 4.判斷rdd是否為空,如果為空,調用 ssc.stop(false, false)與sys.exit(0) 兩個方法,反之將結果數據存儲到mysql數據庫中,調用JdbcTools.saveData2(Iterator[String])即可*///3.獲取kafka流中的數據,進行清洗、轉換(按照上面的需求)SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");JavaDStream<String> map = dStream.map(x -> {String[] split = x.split(",");String ip = split[0];String time = simpleDateFormat.format(new Date(new Long(split[1])));String startUrl = split[2].split(" ")[1];String targetUrl = split[3];String statusCode = split[4];return "Ip:" + ip + ",visitTime:" + time + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCode;});//4.判斷rdd是否為空,如果為空,調用 ssc.stop(false, false)與sys.exit(0) 兩個方法,反之將結果數據存儲到mysql數據庫中,調用JdbcTools.saveData2(Iterator[String])即可map.foreachRDD(rdd -> {if (rdd.isEmpty()) {ssc.stop(false, false);System.exit(0);} else {rdd.foreachPartition(partitionOfRecords -> {JdbcTools.saveData2(partitionOfRecords);});}});//5.啟動SparkStreamingssc.start();//6.等待計算結束ssc.awaitTermination();/********** End **********/}}
總結
以上是生活随笔為你收集整理的Educoder中Spark算子--java版本的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。