1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等
1.12.Flink Kafka-Connector詳解
1.12.1.Kafka Consumer消費策略設置
1.12.2.Kafka Consumer的容錯
1.12.3.動態加載Topic
1.12.4.Kafka Consumers Offset 自動提交
1.12.5.Kafka Producer
1.12.6.Kafka Producer的容錯-Kafka 0.9 and 0.10
1.12.7.Kafka Producer的容錯-Kafka 0.11
1.12.Flink Kafka-Connector詳解
?Kafka中的partition機制和Flink的并行度機制深度結合。
?Kafka可以作為Flink的source和sink
?任務失敗,通過設置kafka的offset來恢復應用
Scala案例:
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingKafkaSinkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val text = env.socketTextStream("hadoop100",9001,'\n')val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")//第一種解決方案,設置FlinkKafkaProducer011里面的事務超時時間//設置事務超時時間//prop.setProperty("transaction.timeout.ms",60000*15+"");//第二種解決方案,設置kafka的最大事務超時時間//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)text.addSink(myProducer)env.execute("StreamingFromCollectionScala")}} import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingKafkaSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema(),prop)val text = env.addSource(myConsumer)text.print()env.execute("StreamingFromCollectionScala")}}1.12.1.Kafka Consumer消費策略設置
?setStartFromGroupOffsets() 【默認消費策略】
- ?默認讀取上次保存的offset信息
- ?如果是應用第一次啟動,讀取不到上次的offset信息,則會根據這個參數auto.offset.reset的值來進行消費數據。
?setStartFromEarliest() - ?從最早的數據開始進行消費,忽略存儲的offset信息。
?setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)
1.12.2.Kafka Consumer的容錯
?當checkpoint機制開啟的時候,Kafka Consumer會定期把kafka的offset信息還有其他operator的狀態信息一塊保存起來。當job失敗重啟的時候,Flink會從最近一次的checkpoint中進行恢復數據,重新消費kafka中的數據。
?為了能夠使用支持容錯的kafka Consumer,需要開啟checkpoint
- ?env.enableCheckpointing(5000); // 每5s checkpoint一次
1.12.3.動態加載Topic
1.12.4.Kafka Consumers Offset 自動提交
?針對job是否開啟checkpoint來區分
?Checkpoint關閉時: 可以通過下面兩個參數配置
- ?enable.auto.commit
- ?auto.commit.interval.ms
?Checkpoint開啟時:當執行checkpoint的時候才會保存offset,這樣保證了kafka的offset和checkpoint的狀態偏移量保持一致。
- ?可以通過這個參數設置setCommitOffsetsOnCheckpoints(boolean)
- ?這個參數默認就是true。表示在checkpoint的時候提交offset
- ?此時,kafka中的自動提交機制就會被忽略
1.12.5.Kafka Producer
1.12.6.Kafka Producer的容錯-Kafka 0.9 and 0.10
?如果Flink開啟了checkpoint,針對FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的語義,還需要配置下面兩個參數
- ?setLogFailuresOnly(false)
- ?setFlushOnCheckpoint(true)
?注意:建議修改kafka 生產者的重試次數
retries【這個參數的值默認是0】
1.12.7.Kafka Producer的容錯-Kafka 0.11
?如果Flink開啟了checkpoint,針對FlinkKafkaProducer011 就可以提供 exactly-once的語義
?但是需要選擇具體的語義
- ?Semantic.NONE
- ?Semantic.AT_LEAST_ONCE【默認】
- ?Semantic.EXACTLY_ONCE
總結
以上是生活随笔為你收集整理的1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 160Hz的屏幕刷新率即将到来 160W
- 下一篇: 网贷为什么要开通存管账户