storm kafkaSpout 踩坑问题记录! offset问题!
整合kafka和storm例子網上很多,自行查找
?
問題描述:
kafka是之前早就搭建好的,新建的storm集群要消費kafka的主題,由于kafka中已經記錄了很多消息,storm消費時從最開始消費
?
問題解決:
下面是摘自官網的一段話:
How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting?KafkaConfig.startOffsetTime?as follows:
As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path?SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.
Important:?When re-deploying a topology make sure that the settings for?SpoutConfig.zkRoot?and?SpoutConfig.id?were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
This means that when a topology has run once the setting?KafkaConfig.startOffsetTime?will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter?KafkaConfig.ignoreZkOffsets?to?true. If?true, the spout will always begin reading from the offset defined by?KafkaConfig.startOffsetTime?as described above.
?
這段話的包含的內容大概有,通過SpoutConfig對象的startOffsetTime字段設置消費進度,默認值是kafka.api.OffsetRequest.EarliestTime(),也就是從最早的消息開始消費,如果想從最新的消息開始消費需要手動設置成kafka.api.OffsetRequest.LatestTime()。另外還有一個問題是,這個字段只會在第一次消費消息時起作用,之后消費的offset是從zookeeper中記錄的offset開始的(存放消費記錄的地方是SpoutConfig對象的zkroot字段,未驗證)
如果想要當前的topology的消費進度接著上一個topology的消費進度繼續消費,那么不要修改SpoutConfig對象的id。換言之,如果你第一次已經從最早的消息開始消費了,那么如果不換id的話,它就要從最早的消息一直消費到最新的消息,這個時候如果想要跳過中間的消息直接從最新的消息開始消費,那么修改SpoutConfig對象的id就可以了
?
下面是SpoutConfig對象的一些字段的含義,其實是繼承的KafkaConfig的字段,可看源碼
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所請求的offset對應的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics
?
轉載于:https://www.cnblogs.com/wsss/p/6745493.html
總結
以上是生活随笔為你收集整理的storm kafkaSpout 踩坑问题记录! offset问题!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4月21日云栖精选夜读:【校园头条】第1
- 下一篇: centos下部署tomcat详解