Kafka 的 Lag 计算误区及正确实现
前言
消息堆積是消息中間件的一大特色,消息中間件的流量削峰、冗余存儲等功能正是得益于消息中間件的消息堆積能力。然而消息堆積其實是一把亦正亦邪的雙刃劍,如果應用場合不恰當反而會對上下游的業務造成不必要的麻煩,比如消息堆積勢必會影響上下游整個調用鏈的時效性,有些中間件如RabbitMQ在發生消息堆積時在某些情況下還會影響自身的性能。對于Kafka而言,雖然消息堆積不會對其自身性能帶來多大的困擾,但難免不會影響上下游的業務,堆積過多有可能會造成磁盤爆滿,或者觸發日志清除策略而造成消息丟失的情況。如何利用好消息堆積這把雙刃劍,監控是最為關鍵的一步。
正文
消息堆積是消費滯后(Lag)的一種表現形式,消息中間件服務端中所留存的消息與消費掉的消息之間的差值即為消息堆積量,也稱之為消費滯后(Lag)量。對于Kafka而言,消息被發送至Topic中,而Topic又分成了多個分區(Partition),每一個Partition都有一個預寫式的日志文件,雖然Partition可以繼續細分為若干個段文件(Segment),但是對于上層應用來說可以將Partition看成最小的存儲單元(一個由多個Segment文件拼接的“巨型文件”)。每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中。我們來看下圖,其就是Partition的一個真實寫照:
上圖中有四個概念:
要計算Kafka中某個消費者的滯后量很簡單,首先看看其消費了幾個Topic,然后針對每個Topic來計算其中每個Partition的Lag,每個Partition的Lag計算就顯得非常的簡單了,參考下圖:
由圖可知消費Lag=HW - ConsumerOffset。對于這里大家有可能有個誤區,就是認為Lag應該是LEO與ConsumerOffset之間的差值。LEO是對消費者不可見的,既然不可見何來消費滯后一說。
那么這里就引入了一個新的問題,HW和ConsumerOffset的值如何獲取呢?
首先來說說ConsumerOffset,Kafka中有兩處可以存儲,一個是Zookeeper,而另一個是”**consumer_offsets這個內部topic中,前者是0.8.x版本中的使用方式,但是隨著版本的迭代更新,現在越來越趨向于后者。就拿1.0.0版本來說,雖然默認是存儲在”**consumer_offsets”中,但是保不齊用于就將其存儲在了Zookeeper中了。這個問題倒也不難解決,針對兩種方式都去拉取,然后哪個有值的取哪個。不過這里還有一個問題,對于消費位移來說,其一般不會實時的更新,而更多的是定時更新,這樣可以提高整體的性能。那么這個定時的時間間隔就是ConsumerOffset的誤差區間之一。
再來說說HW,其也是Kafka中Partition的一個狀態。有可能你會察覺到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”這樣一個屬性,但是這個值不是LEO而是HW。
那么怎樣正確的計算消費的Lag呢?對Kafka熟悉的同學可能會想到Kafka中自帶的kafka-consumer_groups.sh腳本中就有Lag的信息,示例如下:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID我們深究一下kafka-consumer_groups.sh腳本,發現只有一句代碼:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"其含義就是執行kafka.admin.ConsumerGroupCommand而已。進一步深究,在ConsumerGroupCommand內部抓住了2句關鍵代碼:
val consumerGroupService = new KafkaConsumerGroupService(opts) val (state, assignments) = consumerGroupService.describeGroup()代碼詳解:consumerGroupService的類型是ConsumerGroupServicesealed trait類型),而KafkaConsumerGroupService只是ConsumerGroupService的一種實現,還有一種實現是ZkConsumerGroupService,分別對應新版的消費方式(消費位移存儲在__consumer_offsets中)和舊版的消費方式(消費位移存儲在zk中),詳細計算步驟參考下一段落的內容。opt參數是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等參數。第2句代碼是調用describeGroup()方法來獲取具體的信息,即二元組中的assignments,這個assignments中保存了上面打印信息中的所有內容。
Scala小知識:
在Scala中trait(特征)相當于Java的接口,實際上它比接口更大強大。與Java中的接口不同的是,它還可以定義屬性和方法的實現(JDK8起的接口默認方法)。一般情況下Scala中的類只能繼承單一父類,但是如果是trait的話就可以繼承多個,從結果來看是實現了多重繼承。被sealed聲明的trait僅能被同一文件的類繼承。
ZkConsumerGroupService中計算消費lag的步驟如下:
KafkaConsumerGroupService中計算消費lag的步驟如下:
FindCoordinatorRequest請求來獲取coordinator信息,如果不了解coordinator在這里也沒影響。
可以看到KafkaConsumerGroupService與ZkConsumerGroupService的計算Lag的方式都差不多,但是KafkaConsumerGroupService能獲取更多消費詳情,并且ZkConsumerGroupService也被標注為@Deprecated的了,后面內容都針對KafkaConsumerGroupService來做說明。既然Kafka已經為我們提供了線程的方法來獲取Lag,那么我們有何必再重復造輪子,這里筆者寫了一個調用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala語言編寫的,在Java的程序里使用類似scala.collection.Seq這樣的全名稱以防止混淆):
String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId}; ConsumerGroupCommand.ConsumerGroupCommandOptions opts =new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs); ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =new ConsumerGroupCommand.KafkaConsumerGroupService(opts); scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup(); scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get(); scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator(); while (iterable.hasNext()) {ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",pas.topic().get(), pas.partition().get(), pas.offset().get(),pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),pas.host().get(), pas.clientId().get())); }在使用時,你可以封裝一下這段代碼然后返回一個類似List<ConsumerGroupCommand.PartitionAssignmentState>的東西給上層業務代碼做進一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代碼如下:
case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String],partition: Option[Int], offset: Option[Long], lag: Option[Long],consumerId: Option[String], host: Option[String],clientId: Option[String], logEndOffset: Option[Long])Scala小知識:
對于case class, 在這里你可以簡單的把它看成是一個JavaBean,但是它遠比JavaBean強大,比如它會自動生成equals、hashCode、toString、copy、伴生對象、apply、unapply等等東西。在 scala 中,對保護(Protected)成員的訪問比 java 更嚴格一些。因為它只允許保護成員在定義了該成員的的類的子類中被訪問。而在java中,用protected關鍵字修飾的成員,除了定義了該成員的類的子類可以訪問,同一個包里的其他類也可以進行訪問。Scala中,如果沒有指定任何的修飾符,則默認為 public。這樣的成員在任何地方都可以被訪問。
如果你正在試著運行上面一段程序,你會發現編譯失敗,報錯:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。這時候需要將所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState類前面的protected修飾符去掉才能編譯通過。
總結
以上是生活随笔為你收集整理的Kafka 的 Lag 计算误区及正确实现的全部內容,希望文章能夠幫你解決所遇到的問題。