Lag 應(yīng)該算是最最重要的監(jiān)控指標(biāo)了。它直接反映了一個(gè)消費(fèi)者的運(yùn)行情況。一個(gè)正常工作的消費(fèi)者,它的 Lag 值應(yīng)該很小,甚至是接近于 0 的,這表示該消費(fèi)者能夠及時(shí)地消費(fèi)生產(chǎn)者生產(chǎn)出來(lái)的消息,滯后程度很小。反之,如果一個(gè)消費(fèi)者 Lag 值很大,通常就表明它無(wú)法跟上生產(chǎn)者的速度,最終 Lag 會(huì)越來(lái)越大,從而拖慢下游消息的處理速度。
通常來(lái)說(shuō),Lag 的單位是消息數(shù),而且我們一般是在主題這個(gè)級(jí)別上討論 Lag 的,但實(shí)際上,Kafka 監(jiān)控 Lag 的層級(jí)是在分區(qū)上的。如果要計(jì)算主題級(jí)別的,你需要手動(dòng)匯總所有主題分區(qū)的 Lag,將它們累加起來(lái),合并成最終的 Lag 值。
第3 處就是執(zhí)行相應(yīng)的減法操作,獲取 Lag 值并封裝進(jìn)一個(gè) Map 對(duì)象。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動(dòng)提交位移props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));}} catch (InterruptedException e) {Thread.currentThread().interrupt();// 處理中斷異常// ...return Collections.emptyMap();} catch (ExecutionException e) {// 處理ExecutionException// ...return Collections.emptyMap();} catch (TimeoutException e) {throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);}}}
?
Kafka JMX 監(jiān)控指標(biāo)
Kafka 默認(rèn)提供的 JMX 監(jiān)控指標(biāo)來(lái)監(jiān)控消費(fèi)者的 Lag 值
Kafka 消費(fèi)者提供了一個(gè)名為?kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標(biāo),里面有很多屬性。和我們今天所講內(nèi)容相關(guān)的有兩組屬性:records-lag-max?和?records-lead-min,它們分別表示此消費(fèi)者在測(cè)試窗口時(shí)間內(nèi)曾經(jīng)達(dá)到的最大的 Lag 值和最小的 Lead 值。
Lead 值是指消費(fèi)者最新消費(fèi)消息的位移與分區(qū)當(dāng)前第一條消息位移的差值。很顯然,Lag 和 Lead 是一體的兩個(gè)方面:Lag 越大的話(huà),Lead 就越小,反之也是同理。
監(jiān)控到 Lag 越來(lái)越大,消費(fèi)者程序變得越來(lái)越慢了,至少是追不上生產(chǎn)者程序了.
Kafka 消費(fèi)者還在分區(qū)級(jí)別提供了額外的 JMX 指標(biāo),用于單獨(dú)監(jiān)控分區(qū)級(jí)別的 Lag 和 Lead 值。JMX 名稱(chēng)為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”