2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
目錄
輸出終端/位置
文件接收器
???????Memory Sink
Foreach和ForeachBatch Sink
Foreach
???????ForeachBatch
???????代碼演示
輸出終端/位置
Structured Streaming 非常顯式地提出了輸入(Source)、執行(StreamExecution)、輸出(Sink)的3個組件,并且在每個組件顯式地做到fault-tolerant(容錯),由此得到整個streaming程序的 end-to-end exactly-once guarantees。
目前Structured Streaming內置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中測試最為方便的是Console Sink。
?
?
???????文件接收器
將輸出存儲到目錄文件中,支持文件格式:parquet、orc、json、csv等,示例如下:
?
相關注意事項如下:
- ?支持OutputMode為:Append追加模式;
- ?必須指定輸出目錄參數【path】,必選參數,其中格式有parquet、orc、json、csv等等;
- ?容災恢復支持精確一次性語義exactly-once;
- ?此外支持寫入分區表,實際項目中常常按時間劃分;
?
???????Memory Sink
此種接收器作為調試使用,輸出作為內存表存儲在內存中,?支持Append和Complete輸出模式。這應該用于低數據量的調試目的,因為整個輸出被收集并存儲在驅動程序的內存中,因此,請謹慎使用,示例如下:
?
?
Foreach和ForeachBatch Sink
Foreach
?????Structured Streaming提供接口foreach和foreachBatch,允許用戶在流式查詢的輸出上應用任意操作和編寫邏輯,比如輸出到MySQL表、Redis數據庫等外部存系統。其中foreach允許每行自定義寫入邏輯,foreachBatch允許在每個微批量的輸出上進行任意操作和自定義邏輯,建議使用foreachBatch操作。
foreach表達自定義編寫器邏輯具體來說,需要編寫類class繼承ForeachWriter,其中包含三個方法來表達數據寫入邏輯:打開,處理和關閉。
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html
streamingDatasetOfString.writeStream.foreach(new ForeachWriter[String] {def open(partitionId: Long, version: Long): Boolean = {// Open connection}def process(record: String): Unit = {// Write string to connection}def close(errorOrNull: Throwable): Unit = {// Close the connection}}).start()
?
???????ForeachBatch
方法foreachBatch允許指定在流式查詢的每個微批次的輸出數據上執行的函數,需要兩個參數:微批次的輸出數據DataFrame或Dataset、微批次的唯一ID。
?
使用foreachBatch函數輸出時,以下幾個注意事項:
1.重用現有的批處理數據源,可以在每個微批次的輸出上使用批處理數據輸出Output;
2.寫入多個位置,如果要將流式查詢的輸出寫入多個位置,則可以簡單地多次寫入輸出 DataFrame/Dataset 。但是,每次寫入嘗試都會導致重新計算輸出數據(包括可能重新讀取輸入數據)。要避免重新計算,您應該緩存cache輸出 DataFrame/Dataset,將其寫入多個位置,然后 uncache 。
?
?
3.應用其他DataFrame操作,流式DataFrame中不支持許多DataFrame和Dataset操作,使用foreachBatch可以在每個微批輸出上應用其中一些操作,但是,必須自己解釋執行該操作的端到端語義。
4.默認情況下,foreachBatch僅提供至少一次寫保證。 但是,可以使用提供給該函數的batchId作為重復數據刪除輸出并獲得一次性保證的方法。
5.foreachBatch不適用于連續處理模式,因為它從根本上依賴于流式查詢的微批量執行。 如果以連續模式寫入數據,請改用foreach。
?
???????代碼演示
使用foreachBatch將詞頻統計結果輸出到MySQL表中,代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 使用Structured Streaming從TCP Socket實時讀取數據,進行詞頻統計,將結果存儲到MySQL數據庫表中*/
object StructuredForeachBatch {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()val resultStreamDF: DataFrame = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).groupBy($"value").count()val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println(s"BatchId = ${batchId}")if (!batchDF.isEmpty) {batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc")//.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8//.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8").option("user", "root").option("password", "root").option("dbtable", "bigdata.t_struct_words").save()}}).start()query.awaitTermination()query.stop()}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十八):Structured Streaming 输出终端/位置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十六):S
- 下一篇: 2021年大数据Spark(四十九):S