sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式
Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是創建輸入流直接從Kafka 集群節點拉取消息。輸入流保證每個消息從Kafka 集群拉取以后只完全轉換一次,保證語義一致性。但是當作業發生故障或重啟時,要保障從當前的消費位點去處理數據(即Exactly Once語義),單純的依靠SparkStreaming本身的機制是不太理想的,生產環境中通常借助手動管理offset的方式來維護kafka的消費位點。本文分享將介紹如何手動管理Kafka的Offset,希望對你有所幫助。本文主要包括以下內容:
- 如何使用MySQL管理Kafka的Offset
- 如何使用Redis管理Kafka的OffSet
如何使用MySQL管理Kafka的Offset
我們可以從Spark Streaming 應用程序中編寫代碼來手動管理Kafka偏移量,偏移量可以從每一批流處理中生成的RDDS偏移量來獲取,獲取方式為:
KafkaUtils.createDirectStream(...).foreachRDD { rdd => // 獲取偏移量 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges...}當獲取到偏移量之后,可以將將其保存到外部存儲設備中(MySQL、Redis、Zookeeper、HBase等)。
使用案例代碼
- MySQL中用于保存偏移量的表
- 常量配置類:ConfigConstants
- JDBC連接工具類:JDBCConnPool
- Kafka生產者:KafkaProducerTest
- 讀取和保存Offset:
該對象的作用是從外部設備中讀取和寫入Offset,包括MySQL和Redis
object OffsetReadAndSave {/*** 從MySQL中獲取偏移量** @param groupid* @param topic* @return*/def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {val conn = JDBCConnPool.getConnection()val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"val ppst = conn.prepareStatement(selectSql)ppst.setString(1, groupid)ppst.setString(2, topic)val result: ResultSet = ppst.executeQuery()// 主題分區偏移量val topicPartitionOffset = mutable.Map[TopicPartition, Long]()while (result.next()) {val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))topicPartitionOffset += (topicPartition -> result.getLong("offset"))}JDBCConnPool.closeConnection(ppst, conn)topicPartitionOffset}/*** 從Redis中獲取偏移量** @param groupid* @param topic* @return*/def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {val jedis: Jedis = JedisConnPool.getConnection()var offsets = mutable.Map[TopicPartition, Long]()val key = s"${topic}_${groupid}"val fields : java.util.Map[String, String] = jedis.hgetAll(key)for (partition <- JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)}offsets.toMap}/*** 將偏移量寫入MySQL** @param groupid 消費者組ID* @param offsetRange 消息偏移量范圍*/def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val conn = JDBCConnPool.getConnection()val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"val ppst = conn.prepareStatement(insertSql)for (offset <- offsetRange) {ppst.setString(1, offset.topic)ppst.setInt(2, offset.partition)ppst.setString(3, groupid)ppst.setLong(4, offset.untilOffset)ppst.executeUpdate()}JDBCConnPool.closeConnection(ppst, conn)}/*** 將偏移量保存到Redis中* @param groupid* @param offsetRange*/def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {val jedis :Jedis = JedisConnPool.getConnection()for(offsetRange<-offsetRange){val topic=offsetRange.topicval partition=offsetRange.partitionval offset=offsetRange.untilOffset// key為topic_groupid,field為partition,value為offsetjedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)}} }- 業務處理類
該對象是業務處理邏輯,主要是消費Kafka數據,再處理之后進行手動將偏移量保存到MySQL中。在啟動程序時,會判斷外部存儲設備中是否存在偏移量,如果是首次啟動則從最初的消費位點消費,如果存在Offset,則從當前的Offset去消費。
觀察現象:當首次啟動時會從頭消費數據,手動停止程序,然后再次啟動,會發現會從當前提交的偏移量消費數據。object ManualCommitOffset {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer",ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必須開啟checkpoint,否則會報錯ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic創建direct kafka streamval topicSet = topics.split(" ").toSet// kafka連接參數val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 從MySQL中讀取該主題對應的消費者組的分區偏移量val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果MySQL中已經存在了偏移量,則應該從該偏移量處開始消費if (offsetMap.size > 0) {println("存在偏移量,從該偏移量處進行消費!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果MySQL中沒有存在了偏移量,從最早開始消費inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint時間間隔,必須是batchInterval的整數倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 獲取當前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 獲取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 狀態更新函數* @param newValues:新的value值* @param stateValue:狀態值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 獲取狀態值// 遍歷當前數據,并更新狀態for (newValue <- newValues) {oldvalue += newValue}// 返回最新的狀態Option(oldvalue)}// 業務邏輯處理// 該示例統計消息key的個數,用于查看是否是從已經提交的偏移量消費數據transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和數據信息,觀察輸出的結果transformDS.foreachRDD { (rdd, time) =>// 遍歷打印該RDD數據rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消費偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//將偏移量保存到到MySQL中OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()} }如何使用Redis管理Kafka的OffSet
- Redis連接類
- 業務邏輯處理
該對象與上面的基本類似,只不過使用的是Redis來進行存儲Offset,存儲到Redis的數據類型是Hash,基本格式為:[key field value] -> [ topic_groupid partition offset],即 key為topic_groupid,field為partition,value為offset。
object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer", ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必須開啟checkpoint,否則會報錯ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic創建direct kafka streamval topicSet = topics.split(" ").toSet// kafka連接參數val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 從Redis中讀取該主題對應的消費者組的分區偏移量val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果Redis中已經存在了偏移量,則應該從該偏移量處開始消費if (offsetMap.size > 0) {println("存在偏移量,從該偏移量處進行消費!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果Redis中沒有存在了偏移量,從最早開始消費inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint時間間隔,必須是batchInterval的整數倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 獲取當前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 獲取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 狀態更新函數** @param newValues :新的value值* @param stateValue :狀態值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 獲取狀態值// 遍歷當前數據,并更新狀態for (newValue <- newValues) {oldvalue += newValue}// 返回最新的狀態Option(oldvalue)}// 業務邏輯處理// 該示例統計消息key的個數,用于查看是否是從已經提交的偏移量消費數據transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和數據信息,觀察輸出的結果transformDS.foreachRDD { (rdd, time) =>// 遍歷打印該RDD數據rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消費偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//將偏移量保存到到Redis中OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}}總結
本文介紹了如何使用外部存儲設備來保存Kafka的消費位點,通過詳細的代碼示例說明了使用MySQL和Redis管理消費位點的方式。當然,外部存儲設備很多,用戶也可以使用其他的存儲設備進行管理Offset,比如Zookeeper和HBase等,其基本處理思路都十分相似。
大數據技術與數倉總結
以上是生活随笔為你收集整理的sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gogs可以自动化部署吗_三千、五千平方
- 下一篇: html中加减号怎么输入,jQuery