Apache Kafka-消费端消费重试和死信队列
文章目錄
- 概述
- Code
- POM依賴
- 配置文件
- 配置類
- SeekToCurrentErrorHandler
- 自定義邏輯處理消費異常
- 生產者
- 消費者
- 單元測試
- 測速結果
- 源碼地址
概述
Spring-Kafka 提供消費重試的機制。當消息消費失敗的時候,Spring-Kafka 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 重新消費消息 。
默認情況下,Spring-Kafka 達到配置的重試次數時,【每條消息的失敗重試時間,由配置的時間隔決定】Consumer 如果依然消費失敗 ,那么該消息就會進入到死信隊列。
Spring-Kafka 封裝了消費重試和死信隊列, 將正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
我們在應用中可以對死信隊列中的消息進行監控重發,來使得消費者實例再次進行消費,消費端需要做冪等性的處理。
Code
POM依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring-Kafka 依賴 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>配置文件
spring:# Kafka 配置項,對應 KafkaProperties 配置類kafka:bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以設置多個,以逗號分隔# Kafka Producer 配置項producer:acks: 1 # 0-不應答。1-leader 應答。all-所有 leader 和 follower 應答。retries: 3 # 發送失敗時,重試發送的次數key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化# Kafka Consumer 配置項consumer:auto-offset-reset: earliest # 設置消費者分組最初的消費進度為 earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:packages: com.artisan.springkafka.domain# Kafka Consumer Listener 監聽器配置listener:missing-topics-fatal: false # 消費監聽接口監聽的主題不存在時,默認會報錯。所以通過設置為 false ,解決報錯logging:level:org:springframework:kafka: ERROR # spring-kafkaapache:kafka: ERROR # kafka配置類
首先要寫一個配置類,用于處理消費異常 ErrorHandler
package com.artisan.springkafka.configuration;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.*; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/18 14:32* @mark: show me the code , change the world*/@Configuration public class KafkaConfiguration {private Logger logger = LoggerFactory.getLogger(getClass());@Bean@Primarypublic ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {logger.warn("kafkaErrorHandler begin to Handle");// <1> 創建 DeadLetterPublishingRecoverer 對象ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);// <2> 創建 FixedBackOff 對象 設置重試間隔 10秒 次數為 3次BackOff backOff = new FixedBackOff(10 * 1000L, 3L);// <3> 創建 SeekToCurrentErrorHandler 對象return new SeekToCurrentErrorHandler(recoverer, backOff);}// @Bean // @Primary // public BatchErrorHandler kafkaBatchErrorHandler() { // // 創建 SeekToCurrentBatchErrorHandler 對象 // SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler(); // // 創建 FixedBackOff 對象 // BackOff backOff = new FixedBackOff(10 * 1000L, 3L); // batchErrorHandler.setBackOff(backOff); // // 返回 // return batchErrorHandler; // } }Spring-Kafka 通過實現自定義的 SeekToCurrentErrorHandler ,當 Consumer 消費消息異常的時候,進行攔截處理:
- 重試小于最大次數時,重新投遞該消息給 Consumer
- 重試到達最大次數時,如果Consumer 還是消費失敗時,該消息就會發送到死信隊列。 死信隊列的 命名規則為: 原有 Topic + .DLT 后綴 = 其死信隊列的 Topic
創建 DeadLetterPublishingRecoverer 對象,它負責實現,在重試到達最大次數時,Consumer 還是消費失敗時,該消息就會發送到死信隊列。
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);也可以選擇 BackOff 的另一個子類 ExponentialBackOff 實現,提供指數遞增的間隔時間
new SeekToCurrentErrorHandler(recoverer, backOff);創建 SeekToCurrentErrorHandler 對象,負責處理異常,串聯整個消費重試的整個過程。
SeekToCurrentErrorHandler
在消息消費失敗時,SeekToCurrentErrorHandler 會將 調用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,將 Consumer 對于該消息對應的 TopicPartition 分區的本地進度設置成該消息的位置。
這樣,Consumer 在下次從 Kafka Broker 拉取消息的時候,又能重新拉取到這條消費失敗的消息,并且是第一條。
同時,Spring-Kafka 使用 FailedRecordTracker 對每個 Topic 的每個 TopicPartition 消費失敗次數進行計數,這樣相當于對該 TopicPartition 的第一條消費失敗的消息的消費失敗次數進行計數。
另外,在 FailedRecordTracker 中,會調用 BackOff 來進行計算,該消息的下一次重新消費的時間,通過 Thread#sleep(...) 方法,實現重新消費的時間間隔。
注意:
FailedRecordTracker 提供的計數是客戶端級別的,重啟 JVM 應用后,計數是會丟失的。所以,如果想要計數進行持久化,需要自己重新實現下 FailedRecordTracker 類,通過 ZooKeeper 存儲計數。
SeekToCurrentErrorHandler 是只針對消息的單條消費失敗的消費重試處理。如果想要有消息的批量消費失敗的消費重試處理,可以使用 SeekToCurrentBatchErrorHandler 。配置方式如下
@Bean @Primary public BatchErrorHandler kafkaBatchErrorHandler() {// 創建 SeekToCurrentBatchErrorHandler 對象SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();// 創建 FixedBackOff 對象BackOff backOff = new FixedBackOff(10 * 1000L, 3L);batchErrorHandler.setBackOff(backOff);// 返回return batchErrorHandler; }SeekToCurrentBatchErrorHandler 暫時不支持死信隊列的機制。
自定義邏輯處理消費異常
支持自定義 ErrorHandler 或 BatchErrorHandler 實現類,實現對消費異常的自定義的邏輯
比如 https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/LoggingErrorHandler.java
public class LoggingErrorHandler implements ErrorHandler {private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(LoggingErrorHandler.class));@Overridepublic void handle(Exception thrownException, ConsumerRecord<?, ?> record) {LOGGER.error(thrownException, () -> "Error while processing: " + ObjectUtils.nullSafeToString(record));}}配置方式同 SeekToCurrentErrorHandler 或 SeekToCurrentBatchErrorHandler。
生產者
package com.artisan.springkafka.producer;import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture;import java.util.Random; import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:25* @mark: show me the code , change the world*/@Component public class ArtisanProducerMock {@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate ;public ListenableFuture<SendResult<Object, Object>> sendMsgASync() {// 模擬發送的消息Integer id = new Random().nextInt(100);MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);// 異步發送消息ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);return result ;}}消費者
package com.artisan.springkafka.consumer;import com.artisan.springkafka.domain.MessageMock; import com.artisan.springkafka.constants.TOPIC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:33* @mark: show me the code , change the world*/@Component public class ArtisanCosumerMock {private Logger logger = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "MOCK-A" ;@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)public void onMessage(MessageMock messageMock){logger.info("【接受到消息][線程:{} 消息內容:{}]", Thread.currentThread().getName(), messageMock);// 模擬拋出一次一行throw new RuntimeException("MOCK Handle Exception Happened");}}在消費消息時候,拋出一個 RuntimeException 異常,模擬消費失敗
單元測試
package com.artisan.springkafka.produceTest;import com.artisan.springkafka.SpringkafkaApplication; import com.artisan.springkafka.producer.ArtisanProducerMock; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.support.SendResult; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** @author 小工匠* * @version 1.0* @description: TODO* @date 2021/2/17 22:40* @mark: show me the code , change the world*/@RunWith(SpringRunner.class) @SpringBootTest(classes = SpringkafkaApplication.class) public class ProduceMockTest {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate ArtisanProducerMock artisanProducerMock;@Testpublic void testAsynSend() throws ExecutionException, InterruptedException {logger.info("開始發送");artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable throwable) {logger.info(" 發送異常{}]]", throwable);}@Overridepublic void onSuccess(SendResult<Object, Object> objectObjectSendResult) {logger.info("回調結果 Result = topic:[{}] , partition:[{}], offset:[{}]",objectObjectSendResult.getRecordMetadata().topic(),objectObjectSendResult.getRecordMetadata().partition(),objectObjectSendResult.getRecordMetadata().offset());}});// 阻塞等待,保證消費new CountDownLatch(1).await();}}測速結果
我們把這個日志來梳理一下
2021-02-18 16:18:08.032 INFO 25940 --- [ main] c.a.s.produceTest.ProduceMockTest : 開始發送 2021-02-18 16:18:08.332 INFO 25940 --- [ad | producer-1] c.a.s.produceTest.ProduceMockTest : 回調結果 Result = topic:[C_RT_TOPIC] , partition:[0], offset:[0] 2021-02-18 16:18:08.371 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=15, name='messageSendByAsync-15'}] 2021-02-18 16:18:18.384 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception...... ...... ......2021-02-18 16:18:18.388 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=15, name='messageSendByAsync-15'}] 2021-02-18 16:18:28.390 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception...... ...... ......2021-02-18 16:18:28.394 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=15, name='messageSendByAsync-15'}] 2021-02-18 16:18:38.395 ERROR 25940 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception...... ...... ......2021-02-18 16:18:38.399 INFO 25940 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=15, name='messageSendByAsync-15'}]清晰了么 老兄?
是不是和我們設置的消費重試
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);10秒 重試3次
3次處理后依然失敗,轉入死信隊列
看看數據
源碼地址
https://github.com/yangshangwei/boot2/tree/master/springkafkaRetries
總結
以上是生活随笔為你收集整理的Apache Kafka-消费端消费重试和死信队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Kafka-通过设置Con
- 下一篇: Apache Kafka-通过concu