kafka重复消费问题
開篇提示:kafka重復(fù)消費(fèi)的根本原因就是“數(shù)據(jù)消費(fèi)了,但是offset沒更新”!而我們要探究一般什么情況下會導(dǎo)致offset沒更新?
今天查看Elasticsearch索引的時候發(fā)現(xiàn)有一個索引莫名的多了20w+的數(shù)據(jù),頓時心里一陣驚訝,然后趕緊打開訂閱服務(wù)的日志(消費(fèi)者),眼前的一幕讓我驚呆了,我的消費(fèi)服務(wù)的控制臺一直在不斷的刷著消費(fèi)日志(剛開始我并沒有意識到這是重復(fù)消費(fèi)造成的),我還傻傻的以為是因為今天有人在刷單,所以導(dǎo)致日志狂刷,畢竟之前也遇到過有人用自動交易軟件瘋狂刷單的,所以當(dāng)時也沒在意;等過了幾分鐘,又去瞅了一眼控制臺仍然在瘋狂的刷著日志,媽呀!頓時隱隱感覺不對勁,趕緊看了一眼es索引,我滴天一下子多了幾萬的數(shù)據(jù),突然在想是不是程序出問題了(因為頭一天晚上發(fā)了一個版本),然后就開始死盯這日志看,發(fā)現(xiàn)了一個奇葩的問題:tmd怎么日志打印的數(shù)據(jù)都是重復(fù)的呀!這才恍然大悟,不用想了絕逼是kakfa重復(fù)消費(fèi)了,好吧!能有什么辦法了,開始瘋狂的尋找解決的辦法......
既然之前沒有問題,那就是我昨天發(fā)版所導(dǎo)致的,那么我昨天究竟改了什么配置呢?對照了之前的版本比較了一下,發(fā)現(xiàn)這個參數(shù)enable-auto-commit被改成了true,即自動提交,理論上在數(shù)據(jù)并發(fā)不大,以及數(shù)據(jù)處理不耗時的情況下設(shè)置自動提交是沒有什么問題的,但是我的情況恰恰相反,可能突然會并發(fā)很大(畢竟交易流水不好說的),所以可能在規(guī)定的時間(session.time.out默認(rèn)30s)內(nèi)沒有消費(fèi)完,就會可能導(dǎo)致re-blance重平衡,導(dǎo)致一部分offset自動提交失敗,然后重平衡后重復(fù)消費(fèi)(這種很常見);或者關(guān)閉kafka時,如果在close之前,調(diào)用consumer.unsubscribe()則可能有部分offset沒提交,下次重啟會重復(fù)消費(fèi)
try {
consumer.unsubscribe();
} catch (Exception e) {
}
try {
consumer.close();
} catch (Exception e) {
}
?
所以一般情況下我們設(shè)置offset自動提交為false!
解決方法:
1.設(shè)置
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest2.就是修改offset為最新的偏移量唄!我們都知道offset是存在zookeeper中的,所以我就不贅述了!
我的解決方法:
我并沒有去修改offset偏移量,畢竟生產(chǎn)環(huán)境還是不直接改這個了;
我重新指定了一個消費(fèi)組(group.id=order_consumer_group),然后指定auto-offset-reset=latest這樣我就只需要重啟我的服務(wù)了,而不需要動kafka和zookeeper了!
?
#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest注:如果你想要消費(fèi)者從頭開始消費(fèi)某個topic的全量數(shù)據(jù),可以重新指定一個全新的group.id=new_group,然后指定auto-offset-reset=earliest即可
?
補(bǔ)充:
在kafka0.9.0版本的時候,開始啟用了新的consumer config,這個新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要漸漸弱化zk的依賴,把zk依賴隱藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相關(guān)的有如下兩個改動:
從0.8.2版本開始Kafka開始支持將consumer的位移信息保存在Kafka內(nèi)部的topic中(從0.9.0版本開始默認(rèn)將offset存儲到系統(tǒng)topic中)
Coordinator一般指的是運(yùn)行在broker上的group Coordinator,用于管理Consumer Group中各個成員,每個KafkaServer都有一個GroupCoordinator實(shí)例,管理多個消費(fèi)者組,主要用于offset位移管理和Consumer Rebalance。
rebalance時機(jī)
在如下條件下,partition要在consumer中重新分配:
__consumer_offsets
Consumer通過發(fā)送OffsetCommitRequest請求到指定broker(偏移量管理者)提交偏移量。這個請求中包含一系列分區(qū)以及在這些分區(qū)中的消費(fèi)位置(偏移量)。偏移量管理者會追加鍵值(key-value)形式的消息到一個指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition組成的,而value是偏移量。
?
參考:https://segmentfault.com/a/1190000011441747
總結(jié)
以上是生活随笔為你收集整理的kafka重复消费问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 博客转移至 https://www.ba
- 下一篇: kafka自动提交offset失败:Au