关于Kafka 的 consumer 消费者手动提交详解
前言
在上一篇 Kafka使用Java實現(xiàn)數(shù)據(jù)的生產(chǎn)和消費demo 中介紹如何簡單的使用kafka進行數(shù)據(jù)傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。
應(yīng)用場景
在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標(biāo)。
但是offset下標(biāo)自動提交其實在很多場景都不適用,因為自動提交是在kafka拉取到數(shù)據(jù)之后就直接提交,這樣很容易丟失數(shù)據(jù),尤其是在需要事物控制的時候。
很多情況下我們需要從kafka成功拉取數(shù)據(jù)之后,對數(shù)據(jù)進行相應(yīng)的處理之后再進行提交。如拉取數(shù)據(jù)之后進行寫入mysql這種 , 所以這時我們就需要進行手動提交kafka的offset下標(biāo)。
這里順便說下offset具體是什么。
offset:指的是kafka的topic中的每個消費組消費的下標(biāo)。
簡單的來說就是一條消息對應(yīng)一個offset下標(biāo),每次消費數(shù)據(jù)的時候如果提交offset,那么下次消費就會從提交的offset加一那里開始消費。
比如一個topic中有100條數(shù)據(jù),我消費了50條并且提交了,那么此時的kafka服務(wù)端記錄提交的offset就是49(offset從0開始),那么下次消費的時候offset就從50開始消費。
測試
說了這么,那么我們開始進行手動提交測試。
首先,使用kafka 的producer 程序往kafka集群發(fā)送了100條測試數(shù)據(jù)。
程序打印中已經(jīng)成功發(fā)送了,這里我們在kafka服務(wù)器使用命令中來查看是否成功發(fā)送.
命令如下:
注:
1.master 是我在linux中做了IP映射的關(guān)系,實際可以換成IP。
2.因為kafka是集群,所以也可以在集群的其他機器進行消費。
可以看到已經(jīng)成功發(fā)送了100條。
成功發(fā)送消息之后,我們再使用kafka的consumer 進行數(shù)據(jù)消費。
因為是用來測試手動提交
所以 將 enable.auto.commit 改成 false 進行手動提交
并且設(shè)置每次拉取最大10條
將提交方式改成false之后
需要手動提交只需加上這段代碼
那么首先嘗試消費不提交,測試能不能重復(fù)消費。
右鍵運行main方法進行消費,不提交offset下標(biāo)。
成功消費之后,結(jié)束程序,再次運行main方法進行消費,也不提交offset下標(biāo)。
并未手動進行提交,而且并未更改消費組名,但是可以看到已經(jīng)重復(fù)消費了!
接下來,開始測試手動提交。
1.測試手動提交之后的offset,能不能再次消費。
2.測試未提交的offset,能不能再次進行消費。
為了達(dá)到上述目的,我們測試只需添加如下代碼即可:
if(list.size()==50){consumer.commitSync(); }更改代碼之后,開始運行程序
測試示例圖如下:
簡單的一看,和之前未提交的一樣,貌似沒有什么問題。
但是正常來說,未提交的下標(biāo)不應(yīng)該重復(fù)進行消費,直到它提交為止嗎?
因為要進行重復(fù)消費,但是messageNo 會一直累加,只會手動的提交前50條offset,
后面的50條offset會一直無法消費,所以打印的條數(shù)不應(yīng)該是100,而是應(yīng)該一直打印。
那么測試的結(jié)果和預(yù)想的為什么不一致呢?
之前不是已經(jīng)測試過可以重復(fù)消費未提交的offset嗎?
其實這點可以根據(jù)兩次啟動方式的不同而得出結(jié)論。
開始測試未提交重復(fù)消費的時候,實際我是啟動-暫停-啟動,那么本地的consumer實際是被初始化過兩次。
而剛剛測試的實際consumer只有初始化一次。
至于為什么初始化一次就不行呢?
因為kafka的offset下標(biāo)的記錄實際會有兩份,服務(wù)端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務(wù)端已經(jīng)消費到這了,但是本地的并不會因此而改變offset進行再次消費。
簡單的來說假如有10條數(shù)據(jù),在第5條的時候進行提交了offset下標(biāo),那么服務(wù)端就知道該組消費的下標(biāo)到第5條了,如果同組其他的consumer進行消費的時候就會從第6條開始進行消費。但是本地的消費者客戶端并不會因此而改變,它還是會繼續(xù)消費下去,并不會再次從第6條開始消費,所以會出現(xiàn)上圖情況。
但是項目中運行之后,是不會因此而重啟的,所以這時我們可以換一種思路。
就是如果觸發(fā)某個條件,所以導(dǎo)致offset未提交,我們就可以關(guān)閉之前的consumer,然后新new一個consumer,這樣就可以再次進行消費了! 當(dāng)然配置要和之前的一樣。
那么將之前的提交代碼更改如下:
if(list.size()==50){consumer.commitSync(); }else if(list.size()>50){consumer.close();init();list.clear();list2.clear(); }注:這里因為是測試,為了簡單明了,所以條件我寫的很簡單。實際情況請根據(jù)個人的為準(zhǔn)。
示例圖如下:
說明:
1.因為每次是拉取10條,所以在60條的時候kafka的配置初始化了,然后又從新拉取了50-60條的數(shù)據(jù),但是沒有提交,所以并不會影響實際結(jié)果。
2.這里為了方便截圖展示,所以打印條件改了,但是不影響程序!
從測試結(jié)果中,我們達(dá)到了之前想要測試的目的,未提交的offset可以重復(fù)進行消費。
這種做法一般也可以滿足大部分需求。
例如從kafka獲取數(shù)據(jù)入庫,如果一批數(shù)據(jù)入庫成功,就提交offset,否則不提交,然后再次拉取。
但是這種做法并不能最大的保證數(shù)據(jù)的完整性。比如在運行的時候,程序掛了之類的。
所以還有一種方法是手動的指定offset下標(biāo)進行獲取數(shù)據(jù),直到kafka的數(shù)據(jù)處理成功之后,將offset記錄下來,比如寫在數(shù)據(jù)庫中。那么這種做法,等到下一篇再進行嘗試吧!
該項目我放在github上了,有興趣的可以看看!
地址:https://github.com/xuwujing/kafka
到此,本文結(jié)束,謝謝閱讀!
轉(zhuǎn)載于:https://www.cnblogs.com/xuwujing/p/8432984.html
總結(jié)
以上是生活随笔為你收集整理的关于Kafka 的 consumer 消费者手动提交详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java9新特性系列(模块化系统: Ji
- 下一篇: 5G全球声量升级:Verizon宣布固定