2021年大数据Spark(四十五):Structured Streaming Sources 输入源
目錄
Sources 輸入源
Socket數據源-入門案例
需求
編程實現
???????文件數據源-了解
???????需求
???????代碼實現
???????Rate source-了解
Sources 輸入源
從Spark 2.0至Spark 2.4版本,目前支持數據源有4種,其中Kafka 數據源使用作為廣泛,其他數據源主要用于開發測試程序。
文檔:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources
?
?
?????可以認為Structured Streaming = SparkStreaming + SparkSQL,對流式數據處理使用SparkSQL數據結構,應用入口為SparkSession,對比SparkSQL與SparkStreaming編程:
?Spark Streaming:將流式數據按照時間間隔(BatchInterval)劃分為很多Batch,每批次數據封裝在RDD中,底層RDD數據,構建StreamingContext實時消費數據;
?Structured Streaming屬于SparkSQL模塊中一部分,對流式數據處理,構建SparkSession對象,指定讀取Stream數據和保存Streamn數據,具體語法格式:
靜態數據
讀取spark.read
保存ds/df.write
?
流式數據
讀取spark.readStream
保存ds/df.writeStrem
?
Socket數據源-入門案例
需求
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example
實時從TCP Socket讀取數據(采用nc)實時進行詞頻統計WordCount,并將結果輸出到控制臺Console。
?
- Socket 數據源
從Socket中讀取UTF8文本數據。一般用于測試,使用nc -lk?端口號向Socket監聽的端口發送數據,用于測試使用,有兩個參數必須指定:
1.host
2.port
?
- Console 接收器
?????將結果數據打印到控制臺或者標準輸出,通常用于測試或Bedug使用,三種輸出模式OutputMode(Append、Update、Complete)都支持,兩個參數可設置:
1.numRows,打印多少條數據,默認為20條;
2.truncate,如果某列值字符串太長是否截取,默認為true,截取字符串;
?
編程實現
?
完整案例代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 使用Structured Streaming從TCP Socket實時讀取數據,進行詞頻統計,將結果打印到控制臺。*/
object StructuredWordCount {def main(args: Array[String]): Unit = {//TODO: 0. 環境val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2") // 設置Shuffle分區數目.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO: 1. 從TCP Socket 讀取數據val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()//注意:返回的df不是普通的分布式表,而是實時流數據對應的分布式的無界表!//df.show()//注意:該寫法是離線的寫法,會報錯,所以應使用實時的寫法:Queries with streaming sources must be executed with writeStream.start();inputStreamDF.printSchema()// TODO: 2. 業務分析:詞頻統計WordCountval resultStreamDF: DataFrame = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).groupBy($"value").count()//.orderBy($"count".desc)resultStreamDF.printSchema()// TODO: 3. 設置Streaming應用輸出及啟動val query: StreamingQuery = resultStreamDF.writeStream//- append:默認的追加模式,將新的數據輸出!只支持簡單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數據輸出,支持聚合和排序//- update:更新模式,將有變化的數據輸出,支持聚合但不支持排序,如果沒有聚合就和append一樣//.outputMode(OutputMode.Append())//.outputMode(OutputMode.Complete()).outputMode(OutputMode.Update()).format("console").option("numRows", "10").option("truncate", "false")// 流式應用,需要啟動start.start()// 流式查詢等待流式應用終止query.awaitTermination()// 等待所有任務運行完成才停止運行query.stop()}
}
?
???????文件數據源-了解
將目錄中寫入的文件作為數據流讀取,支持的文件格式為:text、csv、json、orc、parquet
?
???????需求
監聽某一個目錄,讀取csv格式數據,統計年齡小于25歲的人群的愛好排行榜。
測試數據
jack1;23;runningjack2;23;runningjack3;23;runningbob1;20;swimmingbob2;20;swimmingtom1;28;footballtom2;28;footballtom3;28;footballtom4;28;football
?
???????代碼實現
package cn.itcast.structedstreamingimport org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** 使用Structured Streaming從目錄中讀取文件數據:統計年齡小于25歲的人群的愛好排行榜*/
object StructuredFileSource {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO: 從文件系統,監控目錄,讀取CSV格式數據// 數據格式:// jack;23;runningval csvSchema: StructType = new StructType().add("name", StringType, nullable = true).add("age", IntegerType, nullable = true).add("hobby", StringType, nullable = true)val inputStreamDF: DataFrame = spark.readStream.option("sep", ";").option("header", "false")// 指定schema信息.schema(csvSchema).csv("data/input/persons")// 依據業務需求,分析數據:統計年齡小于25歲的人群的愛好排行榜val resultStreamDF: Dataset[Row] = inputStreamDF.filter($"age" < 25).groupBy($"hobby").count().orderBy($"count".desc)// 設置Streaming應用輸出及啟動val query: StreamingQuery = resultStreamDF.writeStream//- append:默認的追加模式,將新的數據輸出!只支持簡單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數據輸出,支持聚合和排序//- update:更新模式,將有變化的數據輸出,支持聚合但不支持排序,如果沒有聚合就和append一樣.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}
???????Rate source-了解
以每秒指定的行數生成數據,每個輸出行包含2個字段:timestamp和value。
其中timestamp是一個Timestamp含有信息分配的時間類型,并且value是Long(包含消息的計數從0開始作為第一行)類型。此源用于測試和基準測試,可選參數如下:
?
演示范例代碼如下:
package cn.itcast.structedstreamingimport org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 數據源:Rate Source,以每秒指定的行數生成數據,每個輸出行包含一個timestamp和value。*/
object StructuredRateSource {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO:從Rate數據源實時消費數據val rateStreamDF: DataFrame = spark.readStream.format("rate").option("rowsPerSecond", "10") // 每秒生成數據條數.option("rampUpTime", "0s") // 每條數據生成間隔時間.option("numPartitions", "2") // 分區數目.load()rateStreamDF.printSchema()//root// |-- timestamp: timestamp (nullable = true)// |-- value: long (nullable = true)// 3. 設置Streaming應用輸出及啟動val query: StreamingQuery = rateStreamDF.writeStream//- append:默認的追加模式,將新的數據輸出!只支持簡單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數據輸出,支持聚合和排序//- update:更新模式,將有變化的數據輸出,支持聚合但不支持排序,如果沒有聚合就和append一樣.outputMode(OutputMode.Append()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十五):Structured Streaming Sources 输入源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十九):S
- 下一篇: 2021年大数据Spark(四十六):S