2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目錄
整合?Kafka
說明
Kafka特定配置
???????KafkaSoure
1.消費一個Topic數據
2.消費多個Topic數據
3.消費通配符匹配Topic數據
???????KafkaSink
???????整合?Kafka
說明
http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
Apache Kafka 是目前最流行的一個分布式的實時流消息系統,給下游訂閱消費系統提供了并行處理和可靠容錯機制,現在大公司在流式數據的處理場景,Kafka基本是標配。
Structured Streaming很好的集成Kafka,可以從Kafka拉取消息,然后就可以把流數據看做一個DataFrame, 一張無限增長的大表,在這個大表上做查詢,Structured Streaming保證了端到端的 exactly-once,用戶只需要關心業務即可,不用費心去關心底層是怎么做的StructuredStreaming既可以從Kafka讀取數據,又可以向Kafka 寫入數據
添加Maven依賴:
dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>
?
- 注意:
目前僅支持Kafka 0.10.+版本及以上,底層使用Kafka New Consumer API拉取數據 ???
- 消費位置
Kafka把生產者發送的數據放在不同的分區里面,這樣就可以并行進行消費了。每個分區里面的數據都是遞增有序的,跟structured commit log類似,生產者和消費者使用Kafka 進行解耦,消費者不管你生產者發送的速率如何,只要按照一定的節奏進行消費就可以了。每條消息在一個分區里面都有一個唯一的序列號offset(偏移量),Kafka 會對內部存儲的消息設置一個過期時間,如果過期了,就會標記刪除,不管這條消息有沒有被消費。
Kafka 可以被看成一個無限的流,里面的流數據是短暫存在的,如果不消費,消息就過期滾動沒了。如果開始消費,就要定一下從什么位置開始。
?
1.earliest:從最起始位置開始消費,當然不一定是從0開始,因為如果數據過期就清掉了,所以可以理解為從現存的數據里最小位置開始消費;
2.latest:從最末位置開始消費;
3.per-partition assignment:對每個分區都指定一個offset,然后從offset位置開始消費;
當第一次開始消費一個Kafka 流的時候,上述策略任選其一,如果之前已經消費了,而且做了 checkpoint ,這時候就會從上次結束的位置開始繼續消費。目前StructuredStreaming和Flink框架從Kafka消費數據時,都支持上述的策略。
?
???????Kafka特定配置
從Kafka消費數據時,相關配置屬性可以通過帶有kafka.prefix的DataStreamReader.option進行設置,例如前面設置Kafka Brokers地址屬性:stream.option("kafka.bootstrap.servers", "host:port"),更多關于Kafka 生產者Producer Config配置屬和消費者Consumer Config配置屬性,參考文檔:
?生產者配置(Producer Configs):
http://kafka.apache.org/20/documentation.html#producerconfigs
?消費者配置(New Consumer Configs):
http://kafka.apache.org/20/documentation.html#newconsumerconfigs
注意以下Kafka參數屬性可以不設置,如果設置的話,Kafka source或者sink可能會拋出錯誤:
?
1)、group.id:Kafka source將會自動為每次查詢創建唯一的分組ID;
2)、auto.offset.reset:在將source選項startingOffsets設置為指定從哪里開始。結構化流管理內部消費的偏移量,而不是依賴Kafka消費者來完成。這將確保在topic/partitons動態訂閱時不會遺漏任何數據。注意,只有在啟動新的流式查詢時才會應用startingOffsets,并且恢復操作始終會從查詢停止的位置啟動;
3)、key.deserializer/value.deserializer:Keys/Values總是被反序列化為ByteArrayDeserializer的字節數組,使用DataFrame操作顯式反序列化keys/values;
4)、key.serializer/value.serializer:keys/values總是使用ByteArraySerializer或StringSerializer進行序列化,使用DataFrame操作將keysvalues/顯示序列化為字符串或字節數組;
5)、enable.auto.commit:Kafka source不提交任何offset;
6)、interceptor.classes:Kafka source總是以字節數組的形式讀取key和value。使用ConsumerInterceptor是不安全的,因為它可能會打斷查詢;
?
???????KafkaSoure
Structured Streaming消費Kafka數據,采用的是poll方式拉取數據,與Spark Streaming中New Consumer API集成方式一致。從Kafka Topics中讀取消息,需要指定數據源(kafka)、Kafka集群的連接地址(kafka.bootstrap.servers)、消費的topic(subscribe或subscribePattern), 指定topic 的時候,可以使用正則來指定,也可以指定一個 topic 的集合。
官方提供三種方式從Kafka topic中消費數據,主要區別在于每次消費Topic名稱指定,
1.消費一個Topic數據
?
?
2.消費多個Topic數據
?
3.消費通配符匹配Topic數據
?
從Kafka 獲取數據后Schema字段信息如下,既包含數據信息有包含元數據信息:
?
在實際開發時,往往需要獲取每條數據的消息,存儲在value字段中,由于是binary類型,需要轉換為字符串String類型;此外了方便數據操作,通常將獲取的key和value的DataFrame轉換為Dataset強類型,偽代碼如下:
?
從Kafka數據源讀取數據時,可以設置相關參數,包含必須參數和可選參數:
- ?必須參數:kafka.bootstrap.servers和subscribe,可以指定開始消費偏移量assign。
?
?
- ?可選參數:
?
?
???????KafkaSink
往Kafka里面寫數據類似讀取數據,可以在DataFrame上調用writeStream來寫入Kafka,設置參數指定value,其中key是可選的,如果不指定就是null。
- 配置說明
將DataFrame寫入Kafka時,Schema信息中所需的字段:
?
需要寫入哪個topic,可以像上述所示在操作DataFrame 的時候在每條record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置。
寫入數據至Kafka,需要設置Kafka Brokers地址信息及可選配置:
1.kafka.bootstrap.servers,使用逗號隔開【host:port】字符;
2.topic,如果DataFrame中沒有topic列,此處指定topic表示寫入Kafka Topic。
官方提供示例代碼如下:
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十九):Structured Streaming 整合 Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十八):S
- 下一篇: 2021年大数据Spark(五十):St