kafka rebalance与数据重复消费问题
問題和現(xiàn)象:
某個程序在消費(fèi)kafka數(shù)據(jù)時,總是重復(fù)消費(fèi)相關(guān)數(shù)據(jù),仿佛在數(shù)據(jù)消費(fèi)完畢之后,沒有提交相應(yīng)的偏移量。然而在程序中設(shè)置了自動提交:enable.auto.commit為true
檢查日志,發(fā)現(xiàn)日志提示:
2020-03-26 17:20:21.414 ?WARN 28800 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator ?: [Consumer clientId=consumer-1,?
groupId=test-consumer-group] Synchronous auto-commit of offsets
?{E2C-GDFS-0=OffsetAndMetadata{offset=9632, leaderEpoch=8, metadata=''}} failed:?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.?
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
?You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
日志顯示時當(dāng)前消費(fèi)者組中某個消費(fèi)者自動提交出現(xiàn)錯誤,查看對應(yīng)消費(fèi)者信息:
可以看到,最后兩個topic對應(yīng)的消費(fèi)者處于離線狀態(tài),程序雖然在消費(fèi)數(shù)據(jù),但無法和kafka同步偏移量等信息。
原因:
這里就涉及到問題是消費(fèi)者在創(chuàng)建時會有一個屬性max.poll.interval.ms,
該屬性意思為kafka消費(fèi)者在每一輪poll()調(diào)用之間的最大延遲,消費(fèi)者在獲取更多記錄之前可以空閑的時間量的上限。如果此超時時間期滿之前poll()沒有被再次調(diào)用,則消費(fèi)者被視為失敗,并且分組將重新平衡,以便將分區(qū)重新分配給別的成員。
通俗點(diǎn)講,就是消費(fèi)者消費(fèi)一次數(shù)據(jù),對于該數(shù)據(jù)的處理過程太慢,導(dǎo)致消費(fèi)下一條消息的時間延后,超過這個門限,默認(rèn)300秒,消費(fèi)者就會唄認(rèn)為失敗,被踢出消費(fèi)者組。
解決
1、修改提交方式,改為手動提交(默認(rèn)為自動提交);
2、根據(jù)實(shí)際情況,修改提交時間;
max.poll.interval.ms=300
3、kafkaConsumer調(diào)用一次輪詢方法只是拉取一次消息。客戶端為了不斷拉取消息,會用一個外部循環(huán)不斷調(diào)用輪詢方法poll()。每次輪詢后,在處理完這一批消息后,才會繼續(xù)下一次的輪詢。
max.poll.records = 50
其他問題
如果一個消費(fèi)者組有多個消費(fèi)者,因某些原因其中一個消費(fèi)者被踢出,當(dāng)重啟程序繼續(xù)消費(fèi)時,如果剛好kafka 消費(fèi)者組在rebalance狀態(tài),很可能報JVM運(yùn)行時錯誤。解決辦法是,盡量將不相干的數(shù)據(jù)處理設(shè)置在不同的消費(fèi)者組。
sh kafka-consumer-groups.sh --bootstrap-server node5:9092 --describe --group ?test-consumer-group
Warning: Consumer group 'test-consumer-group' is rebalancing.
?
總結(jié)
以上是生活随笔為你收集整理的kafka rebalance与数据重复消费问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka一直rebalance故障,重
- 下一篇: Kafka Consumer多线程实例