java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)
生活随笔
收集整理的這篇文章主要介紹了
java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
微信公眾號: 大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)
關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問題或建議,請公眾號留言;
如果您覺得“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”對你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈
kafka中的數(shù)據(jù)通常是鍵值對的,所以我們這里自定義反序列化類從kafka中消費(fèi)鍵值對的消息,為方便大家學(xué)習(xí),這里我實(shí)現(xiàn)了Java/Scala兩個(gè)版本,由于比較簡單這里直接上代碼:
一、Scala代碼:
1.自定義反序列化類:
package comhadoop.ljs.flink010.kafkaimport org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchemaimport org.apache.kafka.clients.consumer.ConsumerRecord/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:31 * @version: v1.0 * @description: comhadoop.ljs.flink010.kafka */class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]]{ /*是否流結(jié)束,比如讀到一個(gè)key為end的字符串結(jié)束,這里不再判斷,直接返回false 不結(jié)束*/ override def isEndOfStream(t: ConsumerRecord[String, String]): Boolean ={ false } override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = { new ConsumerRecord(record.topic(),record.partition(),record.offset(),new String(record.key(),"UTF-8"),new String(record.value(),"UTF-8")) } /*用于獲取反序列化對象的類型*/ override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = { TypeInformation.of(new TypeHint[ConsumerRecord[String, String]] {}) }}2.主函數(shù)類:
package comhadoop.ljs.flink010.kafkaimport java.util.Propertiesimport org.apache.flink.api.common.functions.MapFunctionimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializer/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 16:32 * @version: v1.0 * @description: comhadoop.ljs.flink010.kafka */object KafkaDeserializerSchemaTest { def main(args: Array[String]): Unit = { /*環(huán)境初始化*/ val senv:StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment() /*啟用checkpoint,這里我沒有對消息體的key value進(jìn)行判斷,即使為空啟動(dòng)了checkpoint,遇到錯(cuò)誤也會(huì)無限次重啟*/ senv.enableCheckpointing(2000) /*topic2不存在話會(huì)自動(dòng)在kafka創(chuàng)建,一個(gè)分區(qū) 分區(qū)名稱0*/ val myConsumer=new FlinkKafkaConsumer[ConsumerRecord[String, String]]("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig()) /*指定消費(fèi)位點(diǎn)*/ val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() /*這里從topic3 的0分區(qū)的第一條開始消費(fèi)*/ specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L) myConsumer.setStartFromSpecificOffsets(specificStartOffsets) /*指定source數(shù)據(jù)源*/ val source:DataStream[ConsumerRecord[String, String]]=senv.addSource(myConsumer) val keyValue=source.map(new MapFunction[ConsumerRecord[String, String],String] { override def map(message: ConsumerRecord[String, String]): String = { "key" + message.key + " value:" + message.value } }) /*打印接收的數(shù)據(jù)*/ keyValue.print() /*啟動(dòng)執(zhí)行*/ senv.execute() } def getKafkaConfig():Properties={ val props:Properties=new Properties() props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667") props.setProperty("group.id","topic_1") props.setProperty("key.deserializer",classOf[StringDeserializer].getName) props.setProperty("value.deserializer",classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset","latest") props }}二、Java代碼:
1.自定義反序列化類:
2.主函數(shù)類:
package com.hadoop.ljs.flink110.kafka;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:41 * @version: v1.0 * @description: com.hadoop.ljs.flink110.kafka */public class KafkaDeserializerSchemaTest { public static void main(String[] args) throws Exception { /*環(huán)境初始化*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*啟用checkpoint,這里我沒有對消息體的key value進(jìn)行判斷,即使為空啟動(dòng)了checkpoint,遇到錯(cuò)誤也會(huì)無限次重啟*/ senv.enableCheckpointing(2000); /*topic2不存在話會(huì)自動(dòng)在kafka創(chuàng)建,一個(gè)分區(qū) 分區(qū)名稱0*/ FlinkKafkaConsumer> myConsumer=new FlinkKafkaConsumer>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig()); /*指定消費(fèi)位點(diǎn)*/ Map specificStartOffsets = new HashMap<>(); /*這里從topic3 的0分區(qū)的第一條開始消費(fèi)*/ specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets); DataStream> source = senv.addSource(myConsumer); DataStream keyValue = source.map(new MapFunction, String>() { @Override public String map(ConsumerRecord message) throws Exception { return "key"+message.key()+" value:"+message.value(); } }); /*打印結(jié)果*/ keyValue.print(); /*啟動(dòng)執(zhí)行*/ senv.execute(); } public static Properties getKafkaConfig(){ Properties props=new Properties(); props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667"); props.setProperty("group.id","topic_group2"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("auto.offset.reset","latest"); return props; }}三、函數(shù)測試
1.KafkaProducer發(fā)送測試數(shù)據(jù)類:
package com.hadoop.ljs.kafka220;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Date;import java.util.Properties;public class KafkaPartitionProducer extends Thread{ private static long count =10; private static String topic="topic3"; private static String brokerList="worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667"; public static void main(String[] args) { KafkaPartitionProducer jproducer = new KafkaPartitionProducer(); jproducer.start(); } @Override public void run() { producer(); } private void producer() { Properties props = config(); KafkaProducer producer = new KafkaProducer<>(props); ProducerRecord record=null; System.out.println("kafka生產(chǎn)數(shù)據(jù)條數(shù):"+count); for (int i = 1; i <= count; i++) { String json = "{"id":" + i + ","ip":"192.168.0." + i + "","date":" + new Date().toString() + "}"; String key ="key"+i; record = new ProducerRecord(topic, key, json); producer.send(record, (metadata, e) -> { // 使用回調(diào)函數(shù) if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println(String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } producer.close(); } private Properties config() { Properties props = new Properties(); props.put("bootstrap.servers",brokerList); props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*自定義分區(qū),兩種形式*/ /*props.put("partitioner.class", PartitionUtil.class.getName());*/ return props; }}2.測試結(jié)果
如果覺得我的文章能幫到您,請關(guān)注微信公眾號“大數(shù)據(jù)開發(fā)運(yùn)維架構(gòu)”,并轉(zhuǎn)發(fā)朋友圈,謝謝支持!!!
總結(jié)
以上是生活随笔為你收集整理的java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c语言中的用户标识符是什么意思
- 下一篇: 三点弯曲弹性模量怎么计算公式_怎么计算弯