kafka可靠数据传递
【README】
本文闡述了kafka可靠消息傳遞機制;
本文部分內容總結于《kafka權威指南》(一本好書,墻裂推薦),再加上自己的理解;
【1】可靠性保證
1,在討論可靠性時,一般使用保證這個詞;
保證指的是, 確保系統在各種不同的環境下能夠發生一致的行為;?
2,kafka在哪些方面做了保證呢?
【2】復制
1,kafka復制機制和分區多副本架構是kafka可靠性保證的核心;
2,把消息寫入多個副本可以使kafka 在發生崩潰時仍能保證消息的持久性;
3,分區:分區是kafka存儲數據的基本單位,一個主題的數據被分到多個分區進行存儲;分區內的數據是有序的;
4,分區副本:每個分區可以有多個副本;副本又分為首領副本,跟隨者副本;生產者消費者只與首領副本交互;跟隨者副本只需要及時從首領副本復制最新事件,以與首領副本保持同步;
當首領副本不可用時,其中一個同步副本選舉為新首領;(注意是同步副本才可以選舉為新首領)
5,跟隨者副本成為同步副本的條件
【補充】非同步副本問題
- 1,如果一個副本在同步與不同步狀態間頻繁切換,說明集群內部出現了問題,通常是由于jvm 不恰當gc導致的,需要優化系統性能;
- 2,一個滯后的同步副本會導致生產者消費者變慢,因為消息被認為已提交前,客戶端會等待所有同步副本接收消息;
- 2.1, 當一個副本不再同步了,我們就不需要關心它是否接收到消息(參見跟隨者副本稱為同步副本的條件,即一個同步副本變為不同步中間的時長是可以配置的);
- 2.2,更少的同步副本,意味著更低的有效復制系統,發生宕機時丟失數據的風險更大;
【3】broker配置
1, broker指的是kakfa服務器,又稱中心點(算是知識review了);
2,broker有3個參數影響可靠性(非常重要*)
2.1)創建topic的復制系數,replication.factor
- 每個分區有多少個副本; 建議在要求可用性場景里把副本系數設置為3;
2.2)是否允許不完全的首領選舉,unclean.leader.election
- 在首領不可用但其他副本都是不同步的,我們應該怎么辦?
情況1:分區有3個副本,1個正常的首領副本,2個不可用的跟隨者副本;消息被寫入首領副本后,首領所在broker宕機了;這個時候,如果之前的一個跟隨者副本重新啟動,他就成為了分區的唯一不同步副本;
問題來了:是否選擇它作為首領副本,即便它是不同步副本?
情況2:分區有3個副本,1個正常首領副本,2個因為網絡問題導致同步滯后的跟隨者副本;盡管跟隨者副本還在復制消息,但已經不同步了;首領副本作為唯一同步副本還在接收生產者消費者請求。這個時候如果首領不可用,另外兩個副本就再也無法變成同步的了;
問題來了:是否選擇它作為首領副本,即便它是不同步副本?
如何選擇?
- 選擇1,如果不同步副本不能提升為新首領,則分區在舊首領恢復前都是不可用的;有時候這種狀態會持續數個小時(在舊首領恢復前會導致整個集群不可用,甚至長時間不可用);
- 選擇2,如果不同步副本提升為新首領,則在這個副本變為不同步之后寫入舊首領的消息全部丟失,這會導致數據不一致問題;
這需要我們根據具體的業務場景在系統可用性和一致性兩方面做出權衡;
小結:
不完全首領選舉的意思就是, 允許不同步副本成為首領(unclean.leader.election 設置為true);
2.3)最少同步副本,min.insync.replicas:;
默認情況下,一條消息被寫入所有副本,才被認為是已提交的;才可以繼續向分區寫入和消費下一條數據;
如果設置了 min.insync.replicas=X,則一條消息被寫入了X個副本(而無需寫入所有副本),則消息就會被認為已提交;
如,對于一個包含3個副本的主題,若 min.insync.replicas 設置為2,則至少要存在兩個同步副本才能向分區寫入數據;若只剩下一個同步副本,集群就變成只讀了,這是為了避免在發生不完全選舉時數據的寫入和讀取出現非預期的行為(數據不一致);
【4】在可靠系統里使用生產者
即便把broker配置為可靠,生產者若沒有進行可靠性配置,仍有可能發生數據丟失風險;
即kafka可靠系統依賴 broker,生產者,消費者這三者的可靠性配置;
1)看個例子
情況1,為broker配置了3個副本,禁用不完全首領選舉;生產者發送消息設置acks=1;
生產者發送消息A給首領,首領成功寫入,并告訴生產者成功寫入,但跟隨者副本還沒有收到這個消息;這是首領崩潰了,而此時,消息還沒有被跟隨者副本復制過去。
結果:另外兩個副本仍然被認為是同步的(畢竟判斷一個副本不同步需要一段時間),而其中一個副本稱為新首領。
小結:從生產者角度來講,實際上就是丟失了一條消息;即便kafka系統看起來數據是一致的;
情況2,為broker配置了3個副本,禁用不完全首領選舉;生產者發送消息設置acks=all;
生產者往kafka發送消息,分區首領剛好崩潰了,新首領正在選舉中,kafka會向生產者返回首領不可用的響應;
這個時候,若生產者沒有正確處理錯誤或沒有重試直到發送成功,則消息就有可能丟失;
結果:
問題在于, 生產者沒有正確處理錯誤,弄丟消息的是它自己;
2)如何避免上述兩種問題?
【4.1】發送確認
acks的3個值, 0, 1 , all;
- 0 表示 發送到kafka broker就認為寫入成功,而不管是否寫入首領副本和所有副本;
- 1 表示消息寫入首領副本就算成功;
- all 表示消息寫入所有副本才算成功;
【4.2】配置生產者重試次數
1)錯誤: 生產者需要處理的錯誤分為兩類,包括自動處理的錯誤,手動處理的錯誤;
2)重試: 若broker返回的錯誤可以通過重試來解決,則生產者自動處理這些錯誤;
3)錯誤響應碼
注意:重試發送一個已經失敗的消息會帶來風險,因為如果兩個消息都寫入成功,則消息重復;這需要消費者在處理消息時保證冪等性;
冪等性: 服務器對先后多次相同客戶端請求的響應是相同的;如轉賬;
4)其他錯誤場景
【5】在可靠系統里使用消費者
1)消費者讀取消息時不丟失消息的關鍵:
- 消費者需要跟蹤哪些消息是讀取過的,哪些還沒有讀取;
2)丟失消息
如果消費者1 提交了消息X偏移量T, 卻沒有處理完消息X,那就有可能造成消息X丟失;
- 因為如果消費者1宕機,其他消費者接手處理,它是不會再次消費消息X的,會被忽略;
【5.1】消費者可靠性配置
消費者需要注意以下4個參數配置, 如下:
- 1)group.id:消費者組編號,兩個消費者具有相同組id,每個消費者會分到主題消息的子集;如果希望看到所有消息,消費者組編號需要唯一;
- 2)auto.offset.reset:重置消費者讀取消息的偏移量;兩個值如下:
- earliest, 從分區開始位置讀取數據;
- latest,從分區末尾位置讀取數據;
- 3)enable.auto.commit, 啟動自動提交偏移量,也可以在代碼里手動提交; 取值 [true | false]?
- 如果在輪詢里處理所有數據,那么自動提交可以保證只提交已經處理過的消息的偏移量;
- 但如果在子線程處理數據,自動提交可能在消息沒有處理完就提交了(有風險);
- 4)auto.commit.interval.ms: 設置自動提交偏移量的頻率;默認值是每5秒自動提交一次;?
消費者屬性配置例子
/* 1.創建消費者配置信息 */ Properties props = new Properties(); /*2.給配置信息賦值*/ /*2.1連接的集群*/ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*2.2開啟自動提交 */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); /*2.3 自動提交的間隔時間*/ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); /*2.4 key value的反序列化 */ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); /*2.5 消費者組 */ props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello04Group01"); // group.id /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認值是 lastest /*2.7 關閉自動提交 */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);【5.2】顯式提交偏移量(手動提交)
可靠消費者需要注意的8個事項:
1)總是在處理完事件后再提交偏移量;
- 如果消費處理過程都在輪詢里完成,且不需要在輪詢間維護狀態,可以使用自動提交,或在輪詢結束時使用手動提交;?
2)提交頻度是性能和重復消息數量之間的權衡;
- 如果消費處理過程都在輪詢里完成,且不需要在輪詢間維護狀態,可以在一個循環里多次提交偏移量(每循環一次提交一次也是可以的),或者循環退出后提交一次;這取決于在性能和重復處理消息間做出的權衡;
3)確保對提交的偏移量心里有數;
- 處理完消息后再提交偏移量是非常關鍵的,否則會導致消費者錯過消息;
4)消費者再均衡;
- 在分區被撤銷之前提交偏移量,并在分配到新分區是清理之前的狀態;
- 消費者再均衡 :在同一個消費者組當中,分區所有權從一個消費者轉移到另外一個消費者的機制;
5)消費者可能需要重試;
- 情況1:在遇到可重試錯誤時,提交最后一個處理成功的偏移量,然后把沒有處理好的消息保存到緩存里(下一個輪詢就不會把它覆蓋掉),調用消費者的 pause() 來確保其他輪詢不會返回數據;如果重試成功或重試次數達到上限,把錯誤消息丟棄,調用 resume()? 讓消費者從輪詢重新獲取新數據;
- 情況2:在遇到可重試錯誤時,把錯誤消息寫入另外的主題B(解耦);由主題B的消費者來處理錯誤;類似于MQ的死信隊列;
6)消費者可能需要維護狀態;
- 不建議多個輪詢間維護狀態,太復雜;
- 建議嘗試使用 kafkaStreams 類庫,為聚合,連接,時間窗和其他復雜分析提供了高級的dsl api;
7)長時間處理;
- 暫停輪詢時間不能超過幾秒鐘;即使不想獲取更多數據,也要保持輪詢,這樣消費者才會往 broker 發送心跳;才不會發生消費者再均衡;
- 推薦做法: 把數據交給工作線程(線程池)去處理,然后暫停消費者但保持輪詢(以防止消費者再均衡),不獲取新數據;當工作線程處理完成后,讓消費者繼續獲取新數據;(干貨——消費者處理大量數據的推薦做法)
注意: 區分暫停輪詢 與 暫停消費者間的區別;
8)僅一次傳遞;
- 方案1: 最常用的方法是把結果寫到一個支持唯一鍵的系統里,如存儲引擎,關系型數據庫,es;要么消息自帶一個唯一鍵,要么使用消費者組+主題+分區+偏移量的組合創建唯一鍵;這可以唯一標識一個kafka記錄;且消費者邏輯保證冪等性即可;
- 方案2:若消費者系統支持事務,可以把消息和偏移量持久化到數據庫;當消費者啟動時,從數據庫讀取偏移量,并調用seek() 方法從該偏移量的下一個位置讀取數據即可;
【6】驗證系統可靠性
【6.1】配置驗證
【6.2】應用程序驗證
【6.3】在生產環境監控可靠性
1)kafka的java客戶端包含了 JMX 度量指標;? 可以監控客戶端的狀態和事件;
- 對于生產者,最重要的可靠性指標是 error-rate 和 retry-rate ;
- 對于消費者,最重要的可靠性指標是 consumer-lag ;
總結
以上是生活随笔為你收集整理的kafka可靠数据传递的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手机怎么清除跳转网址(手机怎么清除跳转网
- 下一篇: 淘宝app怎么扫描(淘宝app怎么扫描二