kafka lag
一.如何記錄kafka的lag值
獲取Lag的三種方法:
-
使用 Kafka 自帶的命令行工具 kafka-consumer-groups 腳本。
(新舊兩個版本之間是以0.90) 【新版本的兩種方式】: bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連接信息> --describe --group <group名稱> ./kafka-consumer-groups.sh --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --group zyj-in./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --group zyj-in --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --new-consumer 【舊版本的方式】: ./kafka-run-class.sh kafka.tools.ConsumerOfferChecker --group zyj-in --topic zyj-in --zookeeper 10.0.90.74:2181,10.0.90.75:2181,10.0.90.76:2181查看所有的用戶組 ./kafka-consumer-groups.sh --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --list -
使用 Kafka Java Consumer API 編程(但是下面這段代碼只使用kafka2.0.0版本)
第 1 處是調用 AdminClient.listConsumerGroupOffsets 方法獲取給定消費者組的最新消費消息的位移;第 2 處則是獲取訂閱分區的最新消息位移;第3 處就是執行相應的減法操作,獲取 Lag 值并封裝進一個 Map 對象。 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));}} catch (InterruptedException e) {Thread.currentThread().interrupt();// 處理中斷異常// ...return Collections.emptyMap();} catch (ExecutionException e) {// 處理ExecutionException// ...return Collections.emptyMap();} catch (TimeoutException e) {throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);}} -
使用 Kafka 自帶的 JMX 監控指標。
Lag取值有正負數和0都表示什么意思
- 正數:就是kafka數據積壓了,往kafka進數據的速度,大于這個數據被消費的速度。a-b就是正數了。供大于求。
- 負數:就是有時候,我剛剛取了a還沒來得及做減法呢,b已經查、超過a了,導致結果是負數,說明kafka的消費者干活很快,分分鐘就處理完消費的數據,供小于求。
- 0:生產者和消費者速率基本相當,說明2者都工作正常。
Flink本身暴露的相關的指標是否能滿足需求
-
register.consumer.metrics specifies whether to register metrics of KafkaConsumer in Flink metric group
register.consumer.metrics 指定是否在 Flink metric group 中注冊 KafkaConsumer 的 metrics能開啟這個kafka的指標
-
The last successfully committed offsets to Kafka, for each partition. A particular partition’s metric can be specified by topic name and partition id。
KafkaSourceReader.committedOffsets -
The consumer’s current read offset, for each partition. A particular partition’s metric can be specified by topic name and partition id.
KafkaSourceReader.currentOffsets
用以上的知識得到flink 連接kafka獲取kafka的代碼
//自定義source源public static class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>> {private final MetaJsonKeyValueDeserializationSchema metaJsonKeyValueDeserializationSchema;public CustomerKafkaConsumer(List<String> topic, KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> valueDeserializer, Properties props) {super(topic, valueDeserializer, props);this.metaJsonKeyValueDeserializationSchema = (MetaJsonKeyValueDeserializationSchema) valueDeserializer;}@Overrideprotected AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> createFetcher(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext,Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,SerializedValue<WatermarkStrategy<Tuple2<Map<String, byte[]>, byte[]>>> watermarkStrategy,StreamingRuntimeContext runtimeContext,OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup,boolean useMetrics) throws Exception {AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher =super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);metaJsonKeyValueDeserializationSchema.setFetcher(fetcher);return fetcher;}@Overridepublic void run(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext) throws Exception {metaJsonKeyValueDeserializationSchema.setRuntimeContext(getRuntimeContext());super.run(sourceContext);}}//自定義Gaugepublic static class CustomerKafkaLag implements Gauge<Long> {private Map<TopicPartition, OffsetAndMetadata> consumedOffsets;private final Properties properties;private final Set<TopicPartition> assignedPartitions;private final Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher;private final Time time;private final TopicPartition topicPartition;private final AdminClient adminClient;public CustomerKafkaLag(Map<TopicPartition, OffsetAndMetadata> consumedOffsets, Properties properties, Set<TopicPartition> assignedPartitions,Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher, Time time, TopicPartition topicPartition, AdminClient adminClient) {this.consumedOffsets = consumedOffsets;this.properties = properties;this.assignedPartitions = assignedPartitions;this.fetcher = fetcher;this.time = time;this.topicPartition = topicPartition;this.adminClient = adminClient;}@Overridepublic Long getValue() {try {consumedOffsets = adminClient.listConsumerGroupOffsets(properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)).partitionsToOffsetAndMetadata().get(60, TimeUnit.SECONDS);} catch (InterruptedException | ExecutionException | TimeoutException e) {log.error("kafka get currentOffset failed = {}", e.getMessage());}Map<TopicPartition, Long> endOffsets = fetcher.endOffsets(assignedPartitions, time.timer(Duration.ofMillis(60000)));Map<TopicPartition, OffsetAndMetadata> finalConsumedOffsets = consumedOffsets;Map<TopicPartition, Long> collect = endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - finalConsumedOffsets.get(entry.getKey()).offset()));return collect.get(topicPartition);}}//自定義序列化器:注冊指標:public static class MetaJsonKeyValueDeserializationSchema implements KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> {private AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher;private RuntimeContext runtimeContext;private Properties properties;public MetaJsonKeyValueDeserializationSchema(Properties properties) {this.properties = properties;}public void setFetcher(AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher) {this.fetcher = fetcher;}public void setRuntimeContext(RuntimeContext runtimeContext) {this.runtimeContext = runtimeContext;}@Overridepublic Tuple2<Map<String, byte[]>, byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {if (first.get()) {first.set(false);registerPtMetric(properties);}Map<String, byte[]> map = new HashMap<>();for (Header header : record.headers().toArray()) {String key = header.key();byte[] value = header.value();map.put(key, value);}return new Tuple2<>(map, record.value());}@Overridepublic boolean isEndOfStream(Tuple2<Map<String, byte[]>, byte[]> nextElement) {return false;}@Overridepublic TypeInformation<Tuple2<Map<String, byte[]>, byte[]>> getProducedType() {return TypeInformation.of(new TypeHint<Tuple2<Map<String, byte[]>, byte[]>>() {});}//注冊指標protected void registerPtMetric(Properties properties) throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");consumerThreadField.setAccessible(true);KafkaConsumerThread<Tuple2<Map<String, byte[]>, byte[]>> consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");hasAssignedPartitionsField.setAccessible(true);boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);if (!hasAssignedPartitions) {throw new RuntimeException("wait 50 secs, but not assignedPartitions");}Field consumerField = consumerThread.getClass().getDeclaredField("consumer");consumerField.setAccessible(true);KafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>, byte[]> kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");subscriptionStateField.setAccessible(true);SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();Field time = kafkaConsumer.getClass().getDeclaredField("time");time.setAccessible(true);Time time1 = (Time) time.get(kafkaConsumer);Field fetcher = kafkaConsumer.getClass().getDeclaredField("fetcher");fetcher.setAccessible(true);Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher1 = (Fetcher) fetcher.get(kafkaConsumer);AdminClient adminClient = AdminClient.create(properties);Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();for (TopicPartition topicPartition : assignedPartitions) {runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic()).addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "").gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new CustomerKafkaLag(consumedOffsets, properties, assignedPartitions,fetcher1, time1, topicPartition, adminClient));}}}參考文獻:https://blog.csdn.net/daijiguo/article/details/107868359
總結
- 上一篇: 数码宝贝
- 下一篇: YGG SEA 通证经济模型概览