大数据之flink数据一致性
生活随笔
收集整理的這篇文章主要介紹了
大数据之flink数据一致性
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、flink分析結果寫入redis
1、下載flink-hadoop整合包,放入所有節點
2、KafkaToRedisWordCount
package cn._51doit.flink.day08;import cn._51doit.flink.day02.RedisSinkDemo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector;import java.util.Properties;/*** 當前的程序能不能容錯(保證數據的一致性)* 當前程序如果可以保證數據的一致性,是使用ExactlyOnce還是AtLeastOnce,使用的是AtLeastOnce* KafkaSource:可以記錄偏移量,可以將偏移量保存到狀態中(OperatorState)* keyBy后調用sum:sum有狀態(ValueState)* RedisSink:使用HSET方法可以將數據覆蓋(冪等性)*/ public class KafkaToRedisWordCount {//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn //--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021public static void main(String[] args) throws Exception{//System.setProperty("HADOOP_USER_NAME", "root");ParameterTool parameterTool = ParameterTool.fromArgs(args);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//可以間內存中的狀態持久化到StateBackendenv.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));//設置狀態存儲的后端env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));//如果你手動canceljob后,不刪除job的checkpoint數據env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置Kafka相關參數Properties properties = new Properties();//設置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接著讀properties.setProperty("auto.offset.reset", "earliest");//設置消費者組IDproperties.setProperty("group.id", parameterTool.get("groupId"));//開啟checkpoint,不然讓flink的消費(source對他的subtask)自動提交偏移量properties.setProperty("enable.auto.commit", "false");//創建FlinkKafkaConsumer并傳入相關參數FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(parameterTool.getRequired("topic"), //要讀取數據的Topic名稱new SimpleStringSchema(), //讀取文件的反序列化Schemaproperties //傳入Kafka的參數);//設置在checkpoint是不將偏移量保存到kafka特殊的topic中,可設可不設//kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分組KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);//將聚合后的結果寫入到Redis中//調用Sink//summed.addSink()FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(parameterTool.getRequired("redisHost")).setPassword(parameterTool.getRequired("redisPwd")).setDatabase(9).build();summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisSinkDemo.RedisWordCountMapper()));env.execute();}private static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}}備注:若redis掛了,flink繼續寫入數據,redis恢復,錯過數據依舊寫進來,因為;
取消flink, 不刪除偏移量數據,重啟后指定上次checkpoint,還能繼續計算, 上面的案例就使用的這種方式或者使用savePoint,取消時手動保存。
二、從kafka讀取數據,處理后寫回kafka
package cn._51doit.flink.day09;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** 從Kafka中讀取數據,并且將數據處理后在寫回到Kafka* 要求:保證數據的一致性* ExactlyOnce(Source可以記錄偏移量【重放】,如果出現異常,的偏移量不更新),Sink要求支持事務* 開啟Checkpointping,Source的偏移量保存到狀態中(OperatorState),然后將處理的數據也保存狀態中*/ public class KafkaToKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//開啟checkpointingenv.enableCheckpointing(30000);env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));//設置Kafka相關參數Properties properties = new Properties();//設置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//讀取偏移量策略:如果沒有記錄偏移量,就從頭讀,如果記錄過偏移量,就接著讀properties.setProperty("auto.offset.reset", "earliest");//設置消費者組IDproperties.setProperty("group.id", "g1");//沒有開啟checkpoint,讓flink提交偏移量的消費者定期自動提交偏移量properties.setProperty("enable.auto.commit", "false");//創建FlinkKafkaConsumer并傳入相關參數FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("doit2021", //要讀取數據的Topic名稱new SimpleStringSchema(), //讀取文件的反序列化Schemaproperties //傳入Kafka的參數);//使用addSource添加kafkaConsumerkafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint時,不將偏移量寫入到kafka特殊的topic中DataStreamSource<String> lines = env.addSource(kafkaConsumer);SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));//使用的是AtLeastOnce // FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( // "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema() // );//寫入Kafka的topicString topic = "out2021";//設置Kafka相關參數properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");//創建FlinkKafkaProducerFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(topic, //指定topicnew KafkaStringSerializationSchema(topic), //指定寫入Kafka的序列化Schemaproperties, //指定Kafka的相關參數FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定寫入Kafka為EXACTLY_ONCE語義);filtered.addSink(kafkaProducer);env.execute();} }2、定義KafkaStringSerializationSchema
package cn._51doit.flink.day09;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable; import java.nio.charset.Charset;/*** 自定義String類型數據Kafka的序列化Schema*/ public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {private String topic;private String charset;//構造方法傳入要寫入的topic和字符集,默認使用UTF-8public KafkaStringSerializationSchema(String topic) {this(topic, "UTF-8");}public KafkaStringSerializationSchema(String topic, String charset) {this.topic = topic;this.charset = charset;}//調用該方法將數據進行序列化@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {//將數據轉成bytes數組byte[] bytes = element.getBytes(Charset.forName(charset));//返回ProducerRecordreturn new ProducerRecord<>(topic, bytes);} }總結
以上是生活随笔為你收集整理的大数据之flink数据一致性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Games202 Lecture3-4之
- 下一篇: 2023王道数据结构考研习题汇总