2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析
目錄
???????物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析
???????設(shè)備監(jiān)控?cái)?shù)據(jù)準(zhǔn)備
???????創(chuàng)建Topic
???????模擬數(shù)據(jù)
???????SQL風(fēng)格
???????DSL風(fēng)格
物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析
在物聯(lián)網(wǎng)時(shí)代,大量的感知器每天都在收集并產(chǎn)生著涉及各個(gè)領(lǐng)域的數(shù)據(jù)。物聯(lián)網(wǎng)提供源源不斷的數(shù)據(jù)流,使實(shí)時(shí)數(shù)據(jù)分析成為分析數(shù)據(jù)的理想工具。
?
模擬一個(gè)智能物聯(lián)網(wǎng)系統(tǒng)的數(shù)據(jù)統(tǒng)計(jì)分析,產(chǎn)生設(shè)備數(shù)據(jù)發(fā)送到Kafka,結(jié)構(gòu)化流Structured Streaming實(shí)時(shí)消費(fèi)統(tǒng)計(jì)。對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析:
?1)、信號(hào)強(qiáng)度大于30的設(shè)備;
?2)、各種設(shè)備類型的數(shù)量;
?3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度;
?
???????設(shè)備監(jiān)控?cái)?shù)據(jù)準(zhǔn)備
編寫程序模擬生成物聯(lián)網(wǎng)設(shè)備監(jiān)控?cái)?shù)據(jù),發(fā)送到Kafka Topic中,此處為了演示字段較少,實(shí)際生產(chǎn)項(xiàng)目中字段很多。
???????創(chuàng)建Topic
啟動(dòng)Kafka Broker服務(wù),創(chuàng)建Topic【search-log-topic】,命令如下所示:
#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#刪除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic#創(chuàng)建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic#模擬生產(chǎn)者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic#模擬消費(fèi)者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic?--from-beginning
?
???????模擬數(shù)據(jù)
模擬設(shè)備監(jiān)控日志數(shù)據(jù),字段信息封裝到CaseClass樣例類【DeviceData】類:
模擬產(chǎn)生日志數(shù)據(jù)類【MockIotDatas】具體代碼如下:
package cn.itcast.structedstreamingimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Jsonimport scala.util.Randomobject MockIotDatas {def main(args: Array[String]): Unit = {// 發(fā)送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val deviceTypes = Array("db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata")val random: Random = new Random()while (true) {val index: Int = random.nextInt(deviceTypes.length)val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"val deviceType: String = deviceTypes(index)val deviceSignal: Int = 10 + random.nextInt(90)// 模擬構(gòu)造設(shè)備數(shù)據(jù)val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())// 轉(zhuǎn)換為JSON字符串val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)println(deviceJson)Thread.sleep(100 + random.nextInt(500))val record = new ProducerRecord[String, String]("iotTopic", deviceJson)producer.send(record)}// 關(guān)閉連接producer.close()}/*** 物聯(lián)網(wǎng)設(shè)備發(fā)送狀態(tài)數(shù)據(jù)*/case class DeviceData(device: String, //設(shè)備標(biāo)識(shí)符IDdeviceType: String, //設(shè)備類型,如服務(wù)器mysql, redis, kafka或路由器routesignal: Double, //設(shè)備信號(hào)time: Long //發(fā)送數(shù)據(jù)時(shí)間)}
?
相當(dāng)于大機(jī)房中各個(gè)服務(wù)器定時(shí)發(fā)送相關(guān)監(jiān)控?cái)?shù)據(jù)至Kafka中,服務(wù)器部署服務(wù)有數(shù)據(jù)庫db、大數(shù)據(jù)集群bigdata、消息隊(duì)列kafka及路由器route等等,數(shù)據(jù)樣本:
{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}{"device":"device_30","deviceType":"kafka","signal":81.0,"time":1590660340442}{"device":"device_32","deviceType":"kafka","signal":29.0,"time":1590660340787}{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}
?
???????SQL風(fēng)格
按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),提取字段信息,將DataFrame注冊(cè)為臨時(shí)視圖,其中使用函數(shù)get_json_object提取JSON字符串中字段值,編寫SQL執(zhí)行分析,將最終結(jié)果打印控制臺(tái)
代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析,基于SQL編程* 1)、信號(hào)強(qiáng)度大于30的設(shè)備* 2)、各種設(shè)備類型的數(shù)量* 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度*/
object IotStreamingOnlineSQL {def main(args: Array[String]): Unit = {// 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 獲取value字段的值,轉(zhuǎn)換為String類型.selectExpr("CAST(value AS STRING)")// 將數(shù)據(jù)轉(zhuǎn)換Dataset.as[String] // 內(nèi)部字段名為value// 過濾數(shù)據(jù).filter(StringUtils.isNotBlank(_))// 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依據(jù)業(yè)務(wù),分析處理// TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度// 4.1 注冊(cè)DataFrame為臨時(shí)視圖etlStreamDF.createOrReplaceTempView("t_iots")// 4.2 編寫SQL執(zhí)行查詢val resultStreamDF: DataFrame = spark.sql("""|SELECT| ?device_type,| ?COUNT(device_type) AS count_device,| ?ROUND(AVG(signal), 2) AS avg_signal|FROM t_iots|WHERE signal > 30 GROUP BY device_type|""".stripMargin)// 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái)val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println("===========================================")println(s"BatchId = ${batchId}")println("===========================================")if (!batchDF.isEmpty) {batchDF.coalesce(1).show(20, truncate = false)}}).start()query.awaitTermination()query.stop()}
}
?
???????DSL風(fēng)格
按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),基于DataFrame數(shù)據(jù)結(jié)構(gòu)調(diào)用函數(shù)分析,代碼如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析:* 1)、信號(hào)強(qiáng)度大于30的設(shè)備* 2)、各種設(shè)備類型的數(shù)量* 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度*/
object IotStreamingOnlineDSL {def main(args: Array[String]): Unit = {// 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer APIval iotStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "iotTopic")// 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值.option("maxOffsetsPerTrigger", "100000").load()// 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中val etlStreamDF: DataFrame = iotStreamDF// 獲取value字段的值,轉(zhuǎn)換為String類型.selectExpr("CAST(value AS STRING)")// 將數(shù)據(jù)轉(zhuǎn)換Dataset.as[String] // 內(nèi)部字段名為value// 過濾數(shù)據(jù).filter(StringUtils.isNotBlank(_))// 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}.select(get_json_object($"value", "$.device").as("device_id"),get_json_object($"value", "$.deviceType").as("device_type"),get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),get_json_object($"value", "$.time").cast(LongType).as("time"))// 4. 依據(jù)業(yè)務(wù),分析處理// TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度val resultStreamDF: DataFrame = etlStreamDF// 信號(hào)強(qiáng)度大于10.filter($"signal" > 30)// 按照設(shè)備類型 分組.groupBy($"device_type")// 統(tǒng)計(jì)數(shù)量、評(píng)價(jià)信號(hào)強(qiáng)度.agg(count($"device_type").as("count_device"),round(avg($"signal"), 2).as("avg_signal"))// 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái)val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(五十):St
- 下一篇: 2021年大数据Spark(五十二):S