5.Flink对接Kafka入门
生活随笔
收集整理的這篇文章主要介紹了
5.Flink对接Kafka入门
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Flink Connector Kafka
- 1. Kafka
- 1.1. [Kafka官網](http://kafka.apache.org/)
- 1.2. Kafka 簡述
- 1.3. Kafka特性
- 1.4. kafka的應用場景
- 1.5. kafka-manager的部署
- 1.6. `使用Kafka Connect導入/導出數據`
- 1.7. [Kafka日志存儲原理](https://blog.csdn.net/shujuelin/article/details/80898624)
- 2. Kafka與Flink的融合
- 2.1. kafka連接flink流計算,實現flink消費kafka的數據
- 2.2. flink 讀取kafka并且自定義水印再將數據寫入kafka中
- 3. Airbnb 是如何通過 balanced Kafka reader 來擴展 Spark streaming 實時流處理能力的
- 4. 寄語:海闊憑魚躍,天高任鳥飛
1. Kafka
1.1. Kafka官網
1.2. Kafka 簡述
- Kafka 是一個分布式消息系統:具有生產者、消費者的功能。它提供了類似于JMS 的特性,但是在設計實現上完全不同,此外它并不是JMS 規范的實現。
1.3. Kafka特性
- 消息持久化:基于文件系統來存儲和緩存消息
- 高吞吐量
- 多客戶端支持:核心模塊用Scala 語言開發,Kafka 提供了多種開發語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等
- 安全機制
- 通過SSL 和SASL(Kerberos), SASL/PLA時驗證機制支持生產者、消費者與broker連接時的身份認證;
- 支持代理與ZooKeeper 連接身份驗證
- 通信時數據加密
- 客戶端讀、寫權限認證
- Kafka 支持與外部其他認證授權服務的集成
- 數據備份
- 輕量級
- 消息壓縮
1.4. kafka的應用場景
- Kafka作為消息傳遞系統
- Kafka 作為存儲系統
- Kafka用做流處理
- 消息,存儲,流處理結合起來使用
1.5. kafka-manager的部署
Kafka Manager 由 yahoo 公司開發,該工具可以方便查看集群 主題分布情況,同時支持對 多個集群的管理、分區平衡以及創建主題等操作。
-
Centos7安裝kafka-manager
-
啟動腳本
- bin/cmak -Dconfig.file=conf/application.conf -java-home /usr/lib/jdk-11.0.6 -Dhttp.port=9008 &
-
界面效果
-
注意
1.6. 使用Kafka Connect導入/導出數據
- 替代Flume——Kafka Connect
- 集群模式
- 注意: 在集群模式下,配置并不會在命令行傳進去,而是需要REST API來創建,修改和銷毀連接器。
- 通過一個示例了解kafka connect連接器
- kafka connect簡介以及部署
1.7. Kafka日志存儲原理
Kafka的Message存儲采用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性
-
查看分區.index文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files kafka-logs/t2-2/00000000000000000000.index -
查看log文件
/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-1/00000000000000000000.log --print-data-log -
查看TimeIndex文件
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files t1-2/00000000000000000000.timeindex --verify-index-only -
引入時間戳的作用
2. Kafka與Flink的融合
Flink 提供了專門的 Kafka 連接器,向 Kafka topic 中讀取或者寫入數據。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機制,可提供 exactly-once 的處理語義。為此,Flink 并不完全依賴于跟蹤 Kafka 消費組的偏移量,而是在內部跟蹤和檢查偏移量。
2.1. kafka連接flink流計算,實現flink消費kafka的數據
-
創建flink項目
sbt new tillrohrmann/flink-project.g8 -
配置sbt
ThisBuild / resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",Resolver.mavenLocal )name := "FlinkKafkaProject"version := "1.0"organization := "com.xiaofan"ThisBuild / scalaVersion := "2.12.6"val flinkVersion = "1.10.0" val kafkaVersion = "2.2.0"val flinkDependencies = Seq("org.apache.flink" %% "flink-scala" % flinkVersion % "provided","org.apache.kafka" %% "kafka" % kafkaVersion % "provided","org.apache.flink" %% "flink-connector-kafka" % flinkVersion,"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")lazy val root = (project in file(".")).settings(libraryDependencies ++= flinkDependencies)assembly / mainClass := Some("com.xiaofan.Job")// make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath,Compile / run / mainClass,Compile / run / runner).evaluated// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain" Compile / run / fork := true Global / cancelable := true// exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false) -
源代碼
package com.xiaofanimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer/*** 用flink消費kafka** @author xiaofan*/ object ReadingFromKafka {val ZOOKEEPER_HOST = "192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181"val KAFKA_BROKER = "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091"val TRANSACTION_GROUP = "com.xiaofan.flink"def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(1000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// configure kafka consumerval kafkaProps = new Properties()kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER)kafkaProps.setProperty("group.id", TRANSACTION_GROUP)val transaction: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("xiaofan01", new SimpleStringSchema(), kafkaProps))transaction.printenv.execute()} } -
啟動kafka集群,運行結果
2.2. flink 讀取kafka并且自定義水印再將數據寫入kafka中
-
需求說明(自定義窗口,每分鐘的詞頻統計)
- 從kafka中讀取數據(topic:t1)
- kafka中有event time時間值,通過該時間戳來進行時間劃分,窗口長度為10秒,窗口步長為5秒
- 由于生產中可能會因為網絡或者其他原因導致數據延時,比如 00:00:10 時間的數據可能 00:00:12 才會傳入kafka中,所以在flink的處理中應該設置延時等待處理,這里設置的2秒,可以自行修改。
- 結果數據寫入kafka中(topic:t2)(數據格式 time:時間 count:每分鐘的處理條數)
-
準備環境flink1.10.0 + kafka2.2.0
-
創建topic
bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t1 bin/kafka-topics.sh --create --bootstrap-server 192.168.1.25:9091 --replication-factor 2 --partitions 3 --topic t2 -
向t1中生產數據
package com.xiaofanimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object ProduceData {def main(args: Array[String]): Unit = {val props = new Properties()props.put("bootstrap.servers", "192.168.1.25:9091")props.put("acks", "1")props.put("retries", "3")props.put("batch.size", "16384") // 16Kprops.put("linger.ms", "1")props.put("buffer.memory", "33554432") // 32Mprops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)var i = 0while (true) {i += 1// 模擬標記事件時間val record = new ProducerRecord[String, String]("t1", i + "," + System.currentTimeMillis())// 只管發送消息,不管是否發送成功producer.send(record)Thread.sleep(300)}} } -
消費t1數據,處理后再次傳入kafka t2
package com.xiaofanimport java.text.SimpleDateFormat import java.util.{Date, Properties}import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}/*** Watermark 案例* 根據自定義水印定義時間,計算每秒的消息數并且寫入 kafka中*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","192.168.1.25:9091")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),prop)// 添加源val text = env.addSource(myConsumer)val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).trim.toLong)})// 添加水印val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 3000L// 最大允許的亂序時間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.map(x=>(x._2,1)).timeWindowAll(Time.seconds(1),Time.seconds(1)).sum(1).map(x=>"time:"+tranTimeToString(x._1.toString)+" count:"+x._2)// .window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調用TimeWindow效果一樣val topic2 = "t2"val props = new Properties()props.setProperty("bootstrap.servers","192.168.1.25:9091")//使用支持僅一次語義的形式val myProducer = new FlinkKafkaProducer[String](topic2,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)window.addSink(myProducer)env.execute("StreamingWindowWatermarkScala")}def tranTimeToString(timestamp:String) :String={val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val time = fm.format(new Date(timestamp.toLong))time}} -
運行效果
3. Airbnb 是如何通過 balanced Kafka reader 來擴展 Spark streaming 實時流處理能力的
- 參考鏈接1
- 參考鏈接2
4. 寄語:海闊憑魚躍,天高任鳥飛
總結
以上是生活随笔為你收集整理的5.Flink对接Kafka入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手机html端悬浮球,大屏手机绝配!一款
- 下一篇: Latex中插入.eps图片遇到的问题