string 中的offset_Kafka+Spark Streaming管理offset的两种方法
?Kafka配合Spark Streaming是大數據領域常見的黃金搭檔之一,主要是用于數據實時入庫或分析。
為了應對可能出現的引起Streaming程序崩潰的異常情況,我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。只有管理好offset,才能使整個流式系統最大限度地接近exactly once語義。
管理offset的流程
下面這張圖能夠簡要地說明管理offset的大致流程。
offset管理流程
- 在Kafka DirectStream初始化時,取得當前所有partition的存量offset,以讓DirectStream能夠從正確的位置開始讀取數據。
- 讀取消息數據,處理并存儲結果。
- 提交offset,并將其持久化在可靠的外部存儲中。
- 圖中的“process and store results”及“commit offsets”兩項,都可以施加更強的限制,比如存儲結果時保證冪等性,或者提交offset時采用原子操作。
- 圖中提出了4種offset存儲的選項,分別是HBase、Kafka自身、HDFS和ZooKeeper。綜合考慮實現的難易度和效率,我們目前采用過的是Kafka自身與ZooKeeper兩種方案。
Kafka自身
在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。Spark Streaming也專門提供了commitAsync() API用于提交offset。使用方法如下。
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 確保結果都已經正確且冪等地輸出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}上面是Spark Streaming官方文檔中給出的寫法。但在實際上我們總會對DStream進行一些運算,這時我們可以借助DStream的transform()算子。
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] stream.transform(rdd => { // 利用transform取得OffsetRanges offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }).mapPartitions(records => { var result = new ListBuffer[...]() // 處理流程 result.toList.iterator }).foreachRDD(rdd => { if (!rdd.isEmpty()) { // 數據入庫 session.createDataFrame... } // 提交offset stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })特別需要注意,在轉換過程中不能破壞RDD分區與Kafka分區之間的映射關系。亦即像map()/mapPartitions()這樣的算子是安全的,而會引起shuffle或者repartition的算子,如reduceByKey()/join()/coalesce()等等都是不安全的。
另外需要注意的是,HasOffsetRanges是KafkaRDD的一個trait,而CanCommitOffsets是DirectKafkaInputDStream的一個trait。從spark-streaming-kafka包的源碼中,可以看得一清二楚。
private[spark] class KafkaRDD[K, V]( sc: SparkContext, val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], useConsumerCache: Boolean) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRangesprivate[spark] class DirectKafkaInputDStream[K, V]( _ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V], ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {這就意味著不能對stream對象做transformation操作之后的結果進行強制轉換(會直接報ClassCastException),因為RDD與DStream的類型都改變了。只有RDD或DStream的包含類型為ConsumerRecord才行。
ZooKeeper
雖然Kafka將offset從ZooKeeper中移走是考慮到可能的性能問題,但ZooKeeper內部是采用樹形node結構存儲的,這使得它天生適合存儲像offset這樣細碎的結構化數據。并且我們的分區數不是很多,batch間隔也相對長(20秒),因此并沒有什么瓶頸。
Kafka中還保留了一個已經標記為過時的類ZKGroupTopicDirs,其中預先指定了Kafka相關數據的存儲路徑,借助它,我們可以方便地用ZooKeeper來管理offset。為了方便調用,將存取offset的邏輯封裝成一個類如下。
class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers//offsets// partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}總結
以上是生活随笔為你收集整理的string 中的offset_Kafka+Spark Streaming管理offset的两种方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark如何防止内存溢出_Spark
- 下一篇: stm32 adc过采样_产生ADC误差