string 中的offset_Kafka+Spark Streaming管理offset的几种方法
來源:大數據技術與架構作者:王知無
大數據技術與架構
點擊右側關注,大數據開發領域最強公眾號!
暴走大數據
點擊右側關注,暴走大數據!
By 大數據技術與架構
場景描述:Kafka配合Spark Streaming是大數據領域常見的黃金搭檔之一,主要是用于數據實時入庫或分析。為了應對可能出現的引起Streaming程序崩潰的異常情況,我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。只有管理好offset,才能使整個流式系統最大限度地接近exactly once語義。
關鍵詞:offset Spark Streaming
Kafka+Spark Streaming主要用于實時流處理。到目前為止,在大數據領域中是一種非常常見的架構。Kafka在其中主要起著一個緩沖的作用,所有的實時數據都會經過kafka。所以對kafka offset的管理是其中至關重要的一環。
我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。
一但管理不善,就會到導致數據丟失或重復消費。
offset的管理方式
一個簡單的流程如下:
- 在Kafka DirectStream初始化時,取得當前所有partition的存量offset,以讓DirectStream能夠從正確的位置開始讀取數據。
- 讀取消息數據,處理并存儲結果。
- 提交offset,并將其持久化在可靠的外部存儲中。
- 圖中的“process and store results”及“commit offsets”兩項,都可以施加更強的限制,比如存儲結果時保證冪等性,或者提交offset時采用原子操作。
保存offset的方式
Checkpoint:
Spark Streaming的checkpoints是最基本的存儲狀態信息的方式,一般是保存在HDFS中。但是最大的問題是如果streaming程序升級的話,checkpoints的數據無法使用,所以幾乎沒人使用。
offset的三種管理方式:
自動提交offset:
- enable.auto.commit=true。
- 一但consumer掛掉,就會導致數據丟失或重復消費。
- offset不可控。
Kafka自身的offset管理:
- (屬于At-least-once語義,如果做好了冪等性,可以使用這種方式):
- 在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。
- Spark Streaming也專門提供了commitAsync() API用于提交offset。
- 需要將參數修改為enable.auto.commit=false。
- 在我實際測試中發現,這種offset的管理方式,不會丟失數據,但會出現重復消費。
- 停掉streaming應用程序再次啟動后,會再次消費停掉前最后的一個批次數據,應該是由于offset是異步提交的方式導致,offset更新不及時引起的。
- 因此需要做好數據的冪等性。
- (修改源碼將異步改為同步,應該是可以做到Exactly-once語義的)
自定義offset:
- (推薦,采用這種方式,可以做到At-least-once語義):
- 可以將offset存放在第三方儲中,包括RDBMS、Redis、ZK、ES等。
- 若消費數據存儲在帶事務的組件上,則強烈推薦將offset存儲在一起,借助事務實現 Exactly-once 語義。
示例
Kafka自身管理offset:
在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。所以我們讀寫offset的對象正是這個topic,Spark Streaming也專門提供了commitAsync() API用于提交offset。實際上,一切都已經封裝好了,直接調用相關API即可。
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 確保結果都已經正確且冪等地輸出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ZooKeeper
在Spark Streaming連接Kafka應用中使用Zookeeper來存儲offsets也是一種比較可靠的方式。
在這個方案中,Spark Streaming任務在啟動時會去Zookeeper中讀取每個分區的offsets。如果有新的分區出現,那么他的offset將會設置在最開始的位置。在每批數據處理完之后,用戶需要可以選擇存儲已處理數據的一個offset或者最后一個offset。此外,新消費者將使用跟舊的Kafka 消費者API一樣的格式將offset保存在ZooKeeper中。因此,任何追蹤或監控Zookeeper中Kafka Offset的工具仍然生效的。
一個典型的工具類:
class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers//offsets// partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}總結
以上是生活随笔為你收集整理的string 中的offset_Kafka+Spark Streaming管理offset的几种方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python面向对象还是过程_Pytho
- 下一篇: python处理数据库_python操作