Apache Flink和Kafka入门
介紹
Apache Flink是用于分布式流和批處理數(shù)據(jù)處理的開源平臺。 Flink是具有多個API的流數(shù)據(jù)流引擎,用于創(chuàng)建面向數(shù)據(jù)流的應用程序。
Flink應用程序通常使用Apache Kafka進行數(shù)據(jù)輸入和輸出。 本文將指導您逐步使用Apache Flink和Kafka。
先決條件
- Apache Kafka 0.9.x
- 吉特
- Maven 3.x或更高版本
創(chuàng)建您的Flink流項目
第一步是創(chuàng)建Java應用程序,最簡單的方法是使用flink-quickstart-java原型,該原型包含核心依賴關系和打包任務。 本文與Apache Flink快速入門示例相似,重點明確介紹了MapR Streams的數(shù)據(jù)輸入和輸出。
在此應用程序中,我們將創(chuàng)建兩個作業(yè):
- WriteToKafka :生成隨機字符串,然后使用Kafka Flink連接器及其Producer API將其發(fā)布到MapR Streams主題。
- ReadFromKafka :讀取相同的主題,并使用Kafka Flink連接器及其使用方在標準輸出中顯示消息。 API。
完整項目可在GitHub上找到:
- Flink和Kakfa應用
讓我們使用Apache Maven創(chuàng)建項目:
mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=falseMaven將創(chuàng)建以下結構:
tree kafka-flink-101/ kafka-flink-101/ ├── pom.xml └── src└── main├── java│?? └── com│?? └── grallandco│?? └── demos│?? ├── BatchJob.java│?? ├── SocketTextStreamWordCount.java│?? ├── StreamingJob.java│?? └── WordCount.java└── resources└── log4j.properties7 directories, 6 files該項目被配置為創(chuàng)建一個Jar文件,該文件包含您的flink項目代碼,還包括運行該文件所需的所有依賴項。
該項目包含其他一些示例工作,本文不需要它們,您可以將其用于教育目的,也可以將其從項目中刪除。
添加Kafka連接器
打開pom.xml并將以下依賴項添加到您的項目中:
第一步,我們必須添加Flink Kafka連接器作為依賴項,以便我們可以使用Kafka接收器。 將此添加到“依賴項”部分的pom.xml文件中:
您現(xiàn)在必須添加Flink Kafka Connector依賴項才能使用Kafka接收器。 在<dependencies>元素中添加以下條目:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>現(xiàn)在,Flink項目已準備就緒,可以通過Kafka連接器使用DataStream,因此您可以從Apache Kafka發(fā)送和接收消息。
安裝并啟動Kafka
下載Kafka,在終端中輸入以下命令:
curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz tar -xzf kafka_2.11-0.9.0.0.tgz cd kafka_2.11-0.9.0.0Kafka使用ZooKeeper,如果您沒有運行Zookeeper,則可以使用以下命令啟動它:
./bin/zookeeper-server-start.sh config/zookeeper.properties通過在新終端中運行以下命令來啟動Kafka代理:
./bin/kafka-server-start.sh config/server.properties在另一個終端中,運行以下命令來創(chuàng)建一個名為flink-demo的Kafka主題:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo使用Kafka工具將消息發(fā)布和使用到flink-demo主題。
制片人
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo消費者
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning在生產者窗口中,您可以發(fā)布一些消息,并在消費者窗口中查看它們。 我們將使用這些工具來跟蹤Kafka和Flink之間的交互。
編寫您的Flink應用程序
現(xiàn)在讓我們使用Flink Kafka Connector將消息發(fā)送到Kafka并使用它們。
制片人
生產者使用SimpleStringGenerator()類生成消息,并將該字符串發(fā)送到flink-demo主題。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “l(fā)ocalhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}SimpleStringGenerator()方法代碼在此處可用。
主要步驟是:
- 在任何Flink應用程序的基礎上創(chuàng)建一個新的StreamExecutionEnvironment
- 在應用程序環(huán)境中創(chuàng)建一個新的DataStream時, SimpleStringGenerator類將Flink中所有流數(shù)據(jù)源的Source接口實現(xiàn)SourceFunction 。
- 將FlinkKafkaProducer09器添加到主題。
消費者
使用者只需從flink-demo主題中讀取消息,然后將它們打印到控制臺中即可。
public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “l(fā)ocalhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}主要步驟是:
- 在任何Flink應用程序的基礎上創(chuàng)建一個新的StreamExecutionEnvironment
- 使用消費者信息創(chuàng)建一組屬性,在此應用程序中,我們只能設置消費者group.id 。
- 使用FlinkKafkaConsumer09從主題flink-demo獲取消息
生成并運行應用程序
讓我們直接從Maven(或從您最喜歡的IDE)運行應用程序。
1-建立專案:
$ mvn clean package2-運行Flink生產者作業(yè)
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka3-運行Flink消費者工作
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka在終端中,您應該看到生產者生成的消息
現(xiàn)在,您可以在Flink群集上部署并執(zhí)行此作業(yè)。
結論
在本文中,您學習了如何將Flink與kafka結合使用來寫入和讀取數(shù)據(jù)流。
翻譯自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html
總結
以上是生活随笔為你收集整理的Apache Flink和Kafka入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 独的拼音和组词 独的拼音怎样组词
- 下一篇: 裳多音字组词 裳多音字组词有哪些