kafka使用_kafka使用Interceptors实现消息端到端跟踪
現在,Kafka指標的采集都僅包括客戶端或broker,這使得用戶跟蹤消息在集群內的傳遞路徑,構建系統端到端的性能和行為畫像變的困難。從技術上講,通過修改應用以收集或跟蹤額外的信息來測量系統端到端的性能是可行的,但對于關鍵的基礎設施應用來說,這種方案并不一定是切實可行的。在生產環境中,能夠快速部署工具來觀察,測量和監控Kafka客戶端行為(粒度直至消息級別),是非常有用的。同時,不同應用的度量指標需要的上下文元數據各異。無需重新編寫代碼或重新編譯即可實現監控客戶端的能力十分重要(在某些場景下,這種能力有助于連接到正在運行的應用程序)。
為了實現這個功能,kafka 更加傾向于增加生產者和消費者攔截器,攔截器可以在生產者和消費者處理消息的不同階段攔截消息。在Apache Flume 攔截器接口的啟發下,kafka開發了現在的機制。雖然,有很多功能都可以使用攔截器實現(例如,異常檢測,數據加密,字段過濾等),但是每個功能都需要仔細的評估是否應該使用攔截器還是其他機制來完成。當這些場景有明確的使用動機時,提供明確的API是一種良好的實踐。因此,kafka提供了最小化的生產者和消費者攔截器接口,旨在僅支持測量和監控。
盡管增加更多的指標或改進kafka的監控是可能的,但是基于以下原因我們認為提供靈活的,用戶可定制的接口更加有益:
構建通用監控工具。在一家大公司,不同的團隊合作構建系統。通常來說,隨著時間的推移,不同的團隊開發部署不同的組件。此外,組織對于通用的指標、數據格式和數據系統希望實現標準化。對于一個組織,我們認為開發部署通用的Kafka客戶端監控工具并在所有使用Kafka的應用中部署該工具是非常有價值的。
高昂的監控代價。向kafka添加其他指標可能會影響kafka的性能。不幸的是,有時候需要在系統性能和數據收集之間進行權衡。舉個例子,考慮檢測消息大小的場景。代價最低,最簡單,最直接的方法是計算消息的平均大小。計算分布式系統中的百分比要比計算簡單的平均值代價更高,更復雜,但是在很多應用中這是非常有用的。我們希望能夠讓客戶使用不同的算法收集指標數據,或者不收集。
應用對指標的要求不同。例如,一個用戶可能認為監控kafka中不同key的消息數非常的重要。在kafka內部提供所有的指標是不切實際的。插件化的攔截系統為指標的定制化提供了簡單可行的能力。
在一個組織中kafka通常是大型基礎設施的一部分,在基礎設施中實現端到端的跟蹤是非常有用的。攔截器提供了在相同基礎設施中跟蹤kafka客戶端的能力。
為了支持攔截器功能,Kafka在0.10.0.0版本增加了兩個全新的接口:ProducerInterceptor和ConsumerInterceptor并支持實現和配置攔截器鏈。攔截器API允許修改消息以支持給消息增加額外元數據實現端到端跟蹤的能力。
生產者攔截器 ProducerInterceptor
public interface ProducerInterceptor<K, V> extends Configurable {?public?ProducerRecord?onSend(ProducerRecord?record); public void onAcknowledgement(RecordMetadata metadata, Exception exception);? public void close();}消費者攔截器 ConsumerInterceptor
public interface ConsumerInterceptor extends Configurable { ConsumerRecords onConsume(ConsumerRecords var1); void onCommit(Map var1); void close();}下面以實現一個簡單的kafka指標采集小功能為例,進一步了解kafka攔截器的功能和使用方法。采集指標包括:生產和消費消費的線程名
生產者生產消息成功失敗次數統計
3.1 修改消息,增加處理線程名
在生產端,實現ProducerInterceptor接口并覆寫onSend方法,修改ProducerRecord,在Heads中增加生產者線程名:
public class TraceProducerInterceptor implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { Header producerThread = new RecordHeader("producerThread",Thread.currentThread().getName().getBytes()); record.headers().add(producerThread); return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),record.headers()); }}在消費端,實現ConsumerInterceptor接口并覆寫onConsume方法,修改ConsumerRecord,在Heads中增加消費者線程名:
public class TraceConsumerInterceptor implements ConsumerInterceptor { @Override public ConsumerRecords onConsume(ConsumerRecords records) { byte[] currentThreadName = Thread.currentThread().getName().getBytes(); Header header = new RecordHeader("consumer Thread", currentThreadName); records.forEach(record -> record.headers().add(header)); return records; }}3.2 實現生產者消息成功失敗統計
在生產端,實現ProducerInterceptor接口并覆寫onAcknowledgement方法,對發送成功和失敗的消息進行統計,并在攔截器關閉時將數據打印到控制臺:
public class TraceProducerInterceptor implements ProducerInterceptor { private AtomicLong successCounts = new AtomicLong(0); private AtomicLong failedCounts = new AtomicLong(0); @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (null == exception) { successCounts.getAndIncrement(); } else { failedCounts.getAndIncrement(); } } @Override public void close() { System.out.println("success counts " + successCounts.get()); System.out.println("failed counts " + failedCounts); }}3.3 . 攔截器配置:
生產者和消費者可以通過interceptor.classes屬性配置攔截器,屬性的值為一個字符串集合,集合中的元素為攔截器類的全路徑名(包括包名)。
生產者只包含攔截器的配置如下:
Properties props = new Properties();List interceptors = new ArrayList<>();interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);消費者只包含攔截器的配置如下:Properties props = new Properties();List interceptors = new ArrayList<>();interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);3.4. 測試生產者使用三個線程,每個線程發送一個消息到kafka,在主線程啟動消費者消費kafka的消息,收到的每條消息打印消息的Heads信息。
生產者
創建發送消息的線程池:
private?static?ExecutorService?executor?=?Executors.newFixedThreadPool(3);為了避免主線程退出導致發送消息失敗,在添加任務時,將返回的Future對象保存到隊列中,然后逐個檢查任務是否完成,詳細的代碼如下:Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("delivery.timeout.ms", 300000); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); List interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceProducerInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer producer = new KafkaProducer<>(props); List futures = new ArrayList<>(3); for (int i = 0; i < 3; i++) { futures.add(executor.submit(() -> { producer.send(new ProducerRecord<>("TEST", "hello world ")); })); } futures.forEach(future -> { try { future.get(); } catch (Exception e) { System.out.println(e.getMessage()); } }); producer.close();代碼的輸出結果如下:
success counts 3failed counts 0消費者
消費者拉取消息,打印收到消息的Heads信息以驗證攔截器是否生效。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "chentong"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); List interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Consumer consumer = new KafkaConsumer<>(props); consumer.assign(Arrays.asList(new TopicPartition("TEST", 0))); consumer.seek(new TopicPartition("TEST", 0), 0L); while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); records.forEach(record -> { record.headers().headers("producer thread") .forEach(header -> System.out.print("producer thread = " + new String(header.value()))); record.headers().headers("consumer thread") .forEach(header -> System.out.println("\t consumer thread = " + new String(header.value()))); }); }代碼輸出結果如下:
producer thread = pool-1-thread-2 consumer thread = mainproducer thread = pool-1-thread-1 consumer thread = mainproducer thread = pool-1-thread-3 consumer thread = main本文首先介紹了kafka攔截器引入的動機,主要為了解決當前kafka指標采集和監控的痛點問題;接著簡單介紹了ProducerInterceptor和ConsumerInterceptor兩個接口,最后以一個實際修改kafka消息Heads的例子進一步闡述了如何使用kafka提供的攔截器功能。轉自:https://www.jianshu.com/p/a344b3bba8f0原文閱讀可以跳至作者專欄推薦閱讀:
HDFS卷(磁盤)選擇策略
java實操|mysql數據增量同步到kafka
RabbitMQ和Kafka的比較
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka使用_kafka使用Interceptors实现消息端到端跟踪的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vb获取textbox数字_Spectr
- 下一篇: matlab怎么删除上一条命令_怎么恢复