kafka seek方法
我們知道消息的拉取是根據poll()方法中的邏輯來處理的,這個poll()方法中的邏輯對于普通的開發人員而言是一個黑盒,無法精確地掌控其消費的起始位置。提供的auto.offset.reset 參數也只能在找不到消費位移或位移越界的情況下粗粒度地從開頭或
末尾開始消費。有些時候,我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取消息,而KafkaConsumer 中的seek()方法正好提供了這個功能,讓我們得以追前消費或回溯消費。seek()方法的具體定義如下:
seek()方法中的參數partition 表示分區,而offset 參數用來指定從分區的哪個位置開始消費。seek()方法只能重置消費者分配到的分區的消費位置,而分區的分配是在po ll()方法的調用過程中實現的。也就是說,在執行seek()方法之前需要先執行一次p oll ()方法, 等到分配到分區之后才可以重置消費位置(如果用subscribe訂閱的話就需要poll一次,如果用assign()手動訂閱分區就不需要poll一次)。seek()方法的使用示例如代碼清單3 - 5 所示(只列出關鍵代碼〉。
如果對未分配到的分區執行seek()方法, 那么會報出IllegalStateException 的異常。類似在調用subscribe()方法之后直接調用seek()方法:
endOffsets()方法
用來獲取指定分區的末尾的消息位置,endOffsets 的具體方法定義如下:
public Map<TopicPartition , Long> endOffsets( Collection<TopicPartition> partitions )public Map<TopicPartition , Long> endOffsets( Collection<TopicPartition> partitions , Duration timeout)其中partitions 參數表示分區集合,而timeout 參數用來設置等待獲取的超時時間。如果沒有指定timeout 參數的值, 那么endOffets()方法的等待時間由客戶端參數request.timeout.ms 來設置, 默認值為3 0000 。與endOffsets 對應的是
beginningOffsets()方法
一個分區的起始位置起初是0,但并不代表每時每刻都為0 , 因為日志清理的動作會清理舊的數據,所以分區的起始位置會自然而然地增加, beginningOffsets()方法的具體定義如下:
public Map<TopicPartition , Long> beginningOffsets( Collection<TopicPartition> partitions )public Map<TopicPartition , Long> beginningOffsets( Collection<TopicPartition> partitions , Duration timeout)beginningOffsets()方法中的參數內容和含義都與endOffsets()方法中的一樣,配合這兩個方法我們就可以從分區的開頭或末尾開始消費。其實KafkaConsumer 中直接提供了seekToB eginning()方法和seekToEnd()方法來實現這兩個功能, 這兩個方法的具體定義如下:
有時候我們并不知道特定的消費位置, 卻知道一個相關的時間點, 比如我們想要消費昨天8 點之后的消息,這個需求更符合正常的思維邏輯。此時我們無法直接使用seek()方法來追溯到相應的位置。KafkaConsumer 同樣考慮到了這種情況,它提供了一個offsetsForTimes()方法,通過timestamp 來查詢與此對應的分區位置。
offsetsF orTimes()方法的參數timestamps T oSearch 是一個Map 類型, key 為待查詢的分區,而value 為待查詢的時間戳,該方法會返回時間戳大于等于待查詢時間的第一條消息對應的位置和時間戳,對應于OffsetAndTimestamp 中的off set 和time stamp 字段。下面的示例演示了offsetsForTimes()和seek()之間的使用方法, 首先通過offsetFor Times()方法獲取一天之前的消息位置,然后使用seek() 方法追溯到相應位置開始消費,示例中的assignment 變量和代碼清單3-7 中的一樣,表示消費者分配到的分區集合。
?
總結
以上是生活随笔為你收集整理的kafka seek方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言多任务,多进程,多线程
- 下一篇: 安卓system userdata镜像解