关于SparkStreaming的checkpoint的弊端
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
框架版本
spark2.1.0
kafka0.9.0.0
當(dāng)使用sparkstreaming處理流式數(shù)據(jù)的時(shí)候,它的數(shù)據(jù)源搭檔大部分都是Kafka,尤其是在互聯(lián)網(wǎng)公司頗為常見(jiàn)。 當(dāng)他們集成的時(shí)候我們需要重點(diǎn)考慮就是如果程序發(fā)生故障,或者升級(jí)重啟,或者集群宕機(jī),它究竟能否做到數(shù)據(jù)不丟不重呢?
也就是通常我們所說(shuō)的高可靠和穩(wěn)定性,通常框架里面都帶有不同層次的消息保證機(jī)制,一般來(lái)說(shuō)有三種就是:
at most once 最多一次 at least once 最少一次 exactly once 準(zhǔn)確一次在storm里面是通過(guò)ack和Trident,在sparkstreaming里面,如果是1.3版本之前是通過(guò)Receiver方式讀取kafka數(shù)據(jù),1.3之后通過(guò)Direct Approach方式直接讀取kafka的數(shù)據(jù),直接分配每個(gè)Batch及RDD最新的Topic partition offset,任務(wù)運(yùn)行后使用kafka的Simple Consumer API去獲取那一段的offset的數(shù)據(jù),這樣的好處是避免了原來(lái)Receiver接受數(shù)據(jù)宕機(jī)帶來(lái)的數(shù)據(jù)可靠性風(fēng)險(xiǎn),相當(dāng)于原來(lái)的數(shù)據(jù)是在內(nèi)存中而現(xiàn)在的數(shù)據(jù)是在kafka的磁盤中,通過(guò)偏移量可隨時(shí)再次消費(fèi)數(shù)據(jù),從而實(shí)現(xiàn)了數(shù)據(jù)的Exactly Once處理,此外還有個(gè)不同之處在于1.3之后,使用的checkpoint保存當(dāng)前消費(fèi)的kafka的offset,而之前用zk保存的,這就是今天這篇文章重點(diǎn)吐槽的地方。
在sparkstreaming如何做到數(shù)據(jù)不丟失呢?
(1)使用checkpoint (2)自己維護(hù)kafka偏移量
checkpoint配合kafka能夠在特定環(huán)境下保證不丟不重,注意為什么要加上特定環(huán)境呢,這里有一些坑,checkpoint是對(duì)sparkstreaming運(yùn)行過(guò)程中的元數(shù)據(jù)和 每次rdds的數(shù)據(jù)狀態(tài)保存到一個(gè)持久化系統(tǒng)中,當(dāng)然這里面也包含了offset,一般是HDFS,S3,如果程序掛了,或者集群掛了,下次啟動(dòng)仍然能夠從checkpoint中恢復(fù),從而做到生產(chǎn)環(huán)境的7*24高可用。
但是checkpoint的最大的弊端在于,一旦你的流式程序代碼或配置改變了,或者更新迭代新功能了,這個(gè)時(shí)候,你先停舊的sparkstreaming程序,然后新的程序打包編譯后執(zhí)行運(yùn)行,會(huì)發(fā)現(xiàn)兩種情況: (1)啟動(dòng)報(bào)錯(cuò),反序列化異常 (2)啟動(dòng)正常,但是運(yùn)行的代碼仍然是上一次的程序的代碼。
為什么會(huì)出現(xiàn)上面的兩種情況,這是因?yàn)閏heckpoint第一次持久化的時(shí)候會(huì)把整個(gè)相關(guān)的jar給序列化成一個(gè)二進(jìn)制文件,每次重啟都會(huì)從里面恢復(fù),但是當(dāng)你新的 程序打包之后序列化加載的仍然是舊的序列化文件,這就會(huì)導(dǎo)致報(bào)錯(cuò)或者依舊執(zhí)行舊代碼。有的同學(xué)可能會(huì)說(shuō),既然如此,直接把上次的checkpoint刪除了,不就能啟動(dòng)了嗎? 確實(shí)是能啟動(dòng),但是一旦你刪除了舊的checkpoint,新啟動(dòng)的程序,只能從kafka的smallest或者largest的偏移量消費(fèi),默認(rèn)是從最新的,如果是最新的,而不是上一次程序停止的那個(gè)偏移量 就會(huì)導(dǎo)致有數(shù)據(jù)丟失,如果是老的,那么就會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。不管怎么樣搞,都有問(wèn)題。 https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code
針對(duì)這種問(wèn)題,spark官網(wǎng)給出了2種解決辦法:
(1)舊的不停機(jī),新的程序繼續(xù)啟動(dòng),兩個(gè)程序并存一段時(shí)間消費(fèi)。 評(píng)價(jià):仍然有丟重復(fù)消費(fèi)的可能 (2)停機(jī)的時(shí)候,記錄下最后一次的偏移量,然后新恢復(fù)的程序讀取這個(gè)偏移量繼續(xù)工作,從而達(dá)到不丟消息。 評(píng)價(jià):官網(wǎng)沒(méi)有給出具體怎么操作,只是給了個(gè)思路
第二種思路是正確的,但還需要自己維護(hù)一個(gè)offset狀態(tài),這樣以來(lái)checkpoint這個(gè)功能只能在程序?qū)懞弥蟛辉试S再次變動(dòng),但可以重啟的情況保證高可靠。
但實(shí)際情況是大多數(shù)公司的代碼都會(huì)頻繁迭代和升級(jí),與checkpoint剛好相悖,這樣以來(lái)checkpoint的作用便顯的有點(diǎn)沒(méi)用了,既然還是需要自己維護(hù)offset狀態(tài), 那么不用checkpoint也罷,完全自己維護(hù)offset狀態(tài)到zk中即可。所以果斷棄用checkpoint,采用自己維護(hù)offset。其原理如下:
首次啟動(dòng),先從zk中找是否有上次存儲(chǔ)的偏移量,如果沒(méi)有就從最新的消費(fèi),然后保存偏移量至zk中
如果從zk中找到了偏移量,那么就從指定的偏移量處開始消費(fèi)處理,每個(gè)批處理處理完畢后,都會(huì)更新新的offset到zk中, 這樣以來(lái)無(wú)論是程序故障,還是宕機(jī),再次啟動(dòng)后都會(huì)從上次的消費(fèi)的偏移量處繼續(xù)開始消費(fèi),而且程序的升級(jí)或功能改動(dòng)新版本的發(fā)布都能正常運(yùn)行 并做到了消息不丟。
需要注意的是,雖然上游能夠做到準(zhǔn)確一次的消費(fèi),但是下游的落地存儲(chǔ)輸出,比如寫入hbase,redis,mysql,es等等如果失敗了,整條消息依舊會(huì)失敗,這個(gè)完全要靠自己的設(shè)計(jì)了,要么記錄log,針對(duì)特定數(shù)據(jù)記錄,如果失敗定期 重新打入kafka走程序恢復(fù)或者手動(dòng)恢復(fù)。
或者設(shè)計(jì)存儲(chǔ)的時(shí)候,有復(fù)合主鍵,把偏移量提前,就算重復(fù)消費(fèi),但主鍵一樣,最終只會(huì)有一條數(shù)據(jù)落地,這個(gè)要分場(chǎng)景和具體業(yè)務(wù)結(jié)合使用了。
回到主題,自己維護(hù)kafka的offset狀態(tài),如何做? github上已經(jīng)有大神貢獻(xiàn)了,我們只需要拿過(guò)來(lái)稍加改動(dòng)即可,使用自己維護(hù)的offset之后,就沒(méi)有必要再使用 checkpoint,github連接如下,有興趣的朋友可以了解下:
https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala
使用zk維護(hù)offset也是比較不錯(cuò)的選擇,如果將checkpoint存儲(chǔ)在HDFS上,每隔幾秒都會(huì)向HDFS上進(jìn)行一次寫入操作而且大部分都是小文件,且不說(shuō)寫入性能怎么樣,就小文件過(guò)多,對(duì)整個(gè)Hadoop集群都不太友好。因?yàn)橹挥涗浧屏啃畔?#xff0c;所以數(shù)據(jù)量非常小,zk作為一個(gè)分布式高可靠的的內(nèi)存文件系統(tǒng),非常適合這種場(chǎng)景。
所有參考鏈接:
http://aseigneurin.github.io/
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
http://why-not-learn-something.blogspot.jp/2016/08/upgrading-running-spark-streaming.html
http://www.binwang.me/2015-11-03-the-proper-way-to-use-spark-checkpoint.html
https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala
https://github.com/ippontech/spark-kafka-source
有什么問(wèn)題可以掃碼關(guān)注微信公眾號(hào):我是攻城師(woshigcs),在后臺(tái)留言咨詢。 技術(shù)債不能欠,健康債更不能欠, 求道之路,與君同行。
轉(zhuǎn)載于:https://my.oschina.net/u/1027043/blog/836036
總結(jié)
以上是生活随笔為你收集整理的关于SparkStreaming的checkpoint的弊端的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: json 和 jsonp
- 下一篇: Linux 源代码在线(http://l