java 连接kafka超时_java – Kafka KStreams – 处理超时
我試圖使用< KStream> .process()與Time
Windows.of(“name”,30000)批量處理一些KTable值并發(fā)送它們.似乎30秒超過了消費者超時間隔,之后Kafka認為該消費者已經(jīng)解散并釋放分區(qū).
我已經(jīng)嘗試提高輪詢頻率和提交間隔以避免這種情況:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
不幸的是,這些錯誤仍在發(fā)生:
(很多這些)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
其次是:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
顯然,我需要更頻繁地將心跳發(fā)送回服務器.怎么樣?
我的拓撲結構是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream lines = kStreamBuilder.stream(TOPIC);
KTable, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
KTable每隔30秒按鍵對值進行分組.在Processor.init()中,我調用context.schedule(30000).
DBProcessorSupplier提供DBProcessor的實例.這是AbstractProcessor的一個實現(xiàn),其中提供了所有覆蓋.他們只做LOG,所以我知道每個人都被擊中.
這是一個非常簡單的拓撲結構,但很明顯我在某個地方錯過了一個步驟.
編輯:
我知道我可以在服務器端進行調整,但我希望有一個客戶端解決方案.我喜歡在客戶端退出/死亡時很快就可以使用分區(qū)的概念.
編輯:
為了簡化問題,我從圖中刪除了聚合步驟.它現(xiàn)在只是消費者 – >處理器(). (如果我將消費者直接發(fā)送到.print(),它會很快工作,所以我知道沒關系). (類似地,如果我通過.print()輸出聚合(KTable),它似乎也可以.
我發(fā)現(xiàn).process() – 應該每隔30秒調用一次.punctuate()實際上阻塞了可變長度的時間并且隨機輸出(如果有的話).
進一步:
我將調試級別設置為’debug’并重新啟動.我看到很多消息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord
但是.punctuate()函數(shù)中的斷點沒有被擊中.所以它做了很多工作,但沒有讓我有機會使用它.
總結
以上是生活随笔為你收集整理的java 连接kafka超时_java – Kafka KStreams – 处理超时的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java循环队列配对_循环队列 链式队列
- 下一篇: java中队列的类是什么意思_java中