2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
目錄
案例一?實時數(shù)據(jù)ETL架構(gòu)
準備主題
???????模擬基站日志數(shù)據(jù)
???????實時增量ETL
案例一?實時數(shù)據(jù)ETL架構(gòu)
?????在實際實時流式項目中,無論使用Storm、SparkStreaming、Flink及Structured Streaming處理流式數(shù)據(jù)時,往往先從Kafka 消費原始的流式數(shù)據(jù),經(jīng)過ETL后將其存儲到Kafka Topic中,以便其他業(yè)務(wù)相關(guān)應(yīng)用消費數(shù)據(jù),實時處理分析,技術(shù)架構(gòu)流程圖如下所示:
?
?????接下來模擬產(chǎn)生運營商基站數(shù)據(jù),實時發(fā)送到Kafka 中,使用StructuredStreaming消費,經(jīng)過ETL(獲取通話狀態(tài)為success數(shù)據(jù))后,寫入Kafka中,便于其他實時應(yīng)用消費處理分析。
???????準備主題
創(chuàng)建Topic,相關(guān)命令如下:
#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#刪除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic#創(chuàng)建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic#模擬生產(chǎn)者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic#模擬消費者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning
?
???????模擬基站日志數(shù)據(jù)
運行如下代碼,實時產(chǎn)生模擬日志數(shù)據(jù),發(fā)送Kafka Topic:
package cn.itcast.structedstreamingimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializerimport scala.util.Random/*** 模擬產(chǎn)生基站日志數(shù)據(jù),實時發(fā)送Kafka Topic中,數(shù)據(jù)字段信息:* 基站標識符ID, 主叫號碼, 被叫號碼, 通話狀態(tài), 通話時間,通話時長*/
object MockStationLog {def main(args: Array[String]): Unit = {// 發(fā)送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val random = new Random()val allStatus = Array("fail", "busy", "barring", "success", "success", "success","success", "success", "success", "success", "success", "success")while (true) {val callOut: String = "1860000%04d".format(random.nextInt(10000))val callIn: String = "1890000%04d".format(random.nextInt(10000))val callStatus: String = allStatus(random.nextInt(allStatus.length))val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L// 隨機產(chǎn)生一條基站日志數(shù)據(jù)val stationLog: StationLog = StationLog("station_" + random.nextInt(10),callOut,callIn,callStatus,System.currentTimeMillis(),callDuration)println(stationLog.toString)Thread.sleep(100 + random.nextInt(100))val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)producer.send(record)}producer.close() // 關(guān)閉連接}/*** 基站通話日志數(shù)據(jù)*/case class StationLog(stationId: String, //基站標識符IDcallOut: String, //主叫號碼callIn: String, //被叫號碼callStatus: String, //通話狀態(tài)callTime: Long, //通話時間duration: Long //通話時長) {override def toString: String = {s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"}}}
運行程序,基站通話日志數(shù)據(jù)格式如下:
?
???????實時增量ETL
編寫代碼實時從Kafka的【stationTopic】消費數(shù)據(jù),經(jīng)過處理分析后,存儲至Kafka的【etlTopic】,其中需要設(shè)置檢查點目錄,保證應(yīng)用一次且僅一次的語義。
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** 實時從Kafka Topic消費基站日志數(shù)據(jù),過濾獲取通話轉(zhuǎn)態(tài)為success數(shù)據(jù),再存儲至Kafka Topic中* 1、從KafkaTopic中獲取基站日志數(shù)據(jù)* 2、ETL:只獲取通話狀態(tài)為success日志數(shù)據(jù)* 3、最終將ETL的數(shù)據(jù)存儲到Kafka Topic中*/
object StructuredEtlSink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// 1. 從KAFKA讀取數(shù)據(jù)val kafkaStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "stationTopic").load()// 2. 對基站日志數(shù)據(jù)進行ETL操作// station_0,18600004405,18900009049,success,1589711564033,9000val etlStreamDF: Dataset[String] = kafkaStreamDF// 獲取value字段的值,轉(zhuǎn)換為String類型.selectExpr("CAST(value AS STRING)").as[String]// 過濾數(shù)據(jù):通話狀態(tài)為success.filter(log => StringUtils.isNoneBlank(log) && "success".equals(log.trim.split(",")(3)))etlStreamDF.printSchema()// 3. 針對流式應(yīng)用來說,輸出的是流val query: StreamingQuery = etlStreamDF.writeStream// 對流式應(yīng)用輸出來說,設(shè)置輸出模式.outputMode(OutputMode.Append()).format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("topic", "etlTopic")// 設(shè)置檢查點目錄.option("checkpointLocation", "./ckp" + System.currentTimeMillis()).start()query.awaitTermination()query.stop()}
}
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十九):S
- 下一篇: 2021年大数据Spark(五十一):S