Flink SQL 1.11 on Zeppelin 平台化实践
簡(jiǎn)介:?鑒于有很多企業(yè)都無(wú)法配備專門的團(tuán)隊(duì)來(lái)解決 Flink SQL 平臺(tái)化的問(wèn)題,那么到底有沒(méi)有一個(gè)開(kāi)源的、開(kāi)箱即用的、功能相對(duì)完善的組件呢?答案就是本文的主角——Apache Zeppelin。
作者:LittleMagic
大數(shù)據(jù)領(lǐng)域 SQL 化開(kāi)發(fā)的風(fēng)潮方興未艾(所謂"Everybody knows SQL"),Flink 自然也不能“免俗”。Flink SQL 是 Flink 系統(tǒng)內(nèi)部最高級(jí)別的 API,也是流批一體思想的集大成者。用戶可以通過(guò)簡(jiǎn)單明了的 SQL 語(yǔ)句像查表一樣執(zhí)行流任務(wù)或批任務(wù),屏蔽了底層 DataStream/DataSet API 的復(fù)雜細(xì)節(jié),降低了使用門檻。
但是,Flink SQL 的默認(rèn)開(kāi)發(fā)方式是通過(guò) Java/Scala API 編寫,與純 SQL 化、平臺(tái)化的目標(biāo)相去甚遠(yuǎn)。目前官方提供的 Flink SQL Client 僅能在配備 Flink 客戶端的本地使用,局限性很大。而 Ververica 開(kāi)源的 Flink SQL Gateway 組件是基于 REST API 的,仍然需要二次開(kāi)發(fā)才能供給上層使用,并不是很方便。
鑒于有很多企業(yè)都無(wú)法配備專門的團(tuán)隊(duì)來(lái)解決 Flink SQL 平臺(tái)化的問(wèn)題,那么到底有沒(méi)有一個(gè)開(kāi)源的、開(kāi)箱即用的、功能相對(duì)完善的組件呢?答案就是本文的主角——Apache Zeppelin。
Flink SQL on Zeppelin!
Zeppelin 是基于 Web 的交互式數(shù)據(jù)分析筆記本,支持 SQL、Scala、Python 等語(yǔ)言。Zeppelin 通過(guò)插件化的 Interpreter(解釋器)來(lái)解析用戶提交的代碼,并將其轉(zhuǎn)化到對(duì)應(yīng)的后端(計(jì)算框架、數(shù)據(jù)庫(kù)等)執(zhí)行,靈活性很高。其架構(gòu)簡(jiǎn)圖如下所示。
Flink Interpreter 就是 Zeppelin 原生支持的眾多 Interpreters 之一。只要配置好 Flink Interpreter 以及相關(guān)的執(zhí)行環(huán)境,我們就可以將 Zeppelin 用作 Flink SQL 作業(yè)的開(kāi)發(fā)平臺(tái)了(當(dāng)然,Scala 和 Python 也是沒(méi)問(wèn)題的)。接下來(lái)本文就逐步介紹 Flink on Zeppelin 的集成方法。
配置 Zeppelin
目前 Zeppelin 的最新版本是 0.9.0-preview2,可以在官網(wǎng)下載包含所有 Interpreters 的 zeppelin-0.9.0-preview2-bin-all.tgz,并解壓到服務(wù)器的合適位置。
接下來(lái)進(jìn)入 conf 目錄。將環(huán)境配置文件 zeppelin-env.sh.template 更名為 zeppelin-env.sh,并修改:
# JDK目錄 export JAVA_HOME=/opt/jdk1.8.0_172 # 方便之后配置Interpreter on YARN模式。注意必須安裝Hadoop,且hadoop必須配置在系統(tǒng)環(huán)境變量PATH中 export USE_HADOOP=true # Hadoop配置文件目錄 export HADOOP_CONF_DIR=/etc/hadoop/hadoop-conf將服務(wù)配置文件 zeppelin-site.xml.template 更名為 zeppelin-site.xml,并修改:
<!-- 服務(wù)地址。默認(rèn)為127.0.0.1,改為0.0.0.0使得可以在外部訪問(wèn) --> <property><name>zeppelin.server.addr</name><value>0.0.0.0</value><description>Server binding address</description> </property><!-- 服務(wù)端口。默認(rèn)為8080,如果已占用,可以修改之 --> <property><name>zeppelin.server.port</name><value>18080</value><description>Server port.</description> </property>最基礎(chǔ)的配置就完成了。運(yùn)行 bin/zeppelin-daemon.sh start 命令,返回 Zeppelin start [ OK ]的提示之后,訪問(wèn)<服務(wù)器地址>:18080,出現(xiàn)下面的頁(yè)面,就表示 Zeppelin 服務(wù)啟動(dòng)成功。
當(dāng)然,為了一步到位適應(yīng)生產(chǎn)環(huán)境,也可以適當(dāng)修改 zeppelin-site.xml 中的以下參數(shù):
<!-- 將Notebook repo更改為HDFS存儲(chǔ) --> <property><name>zeppelin.notebook.storage</name><value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value><description>Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.</description> </property><!-- Notebook在HDFS上的存儲(chǔ)路徑 --> <property><name>zeppelin.notebook.dir</name><value>/zeppelin/notebook</value><description>path or URI for notebook persist</description> </property><!-- 啟用Zeppelin的恢復(fù)功能。當(dāng)Zeppelin服務(wù)掛掉并重啟之后,能連接到原來(lái)運(yùn)行的Interpreter --> <property><name>zeppelin.recovery.storage.class</name><value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value><description>ReoveryStorage implementation based on hadoop FileSystem</description> </property><!-- Zeppelin恢復(fù)元數(shù)據(jù)在HDFS上的存儲(chǔ)路徑 --> <property><name>zeppelin.recovery.dir</name><value>/zeppelin/recovery</value><description>Location where recovery metadata is stored</description> </property><!-- 禁止使用匿名用戶 --> <property><name>zeppelin.anonymous.allowed</name><value>true</value><description>Anonymous user allowed by default</description> </property>Zeppelin 集成了 Shiro 實(shí)現(xiàn)權(quán)限管理。禁止使用匿名用戶之后,可以在 conf 目錄下的 shiro.ini 中配置用戶名、密碼、角色等,不再贅述。注意每次修改配置都需要運(yùn)行 bin/zeppelin-daemon.sh restart 重啟 Zeppelin 服務(wù)。
配置 Flink Interpreter on YARN
在使用 Flink Interpreter 之前,我們有必要對(duì)它進(jìn)行配置,使 Flink 作業(yè)和 Interpreter 本身在 YARN 環(huán)境中運(yùn)行。
點(diǎn)擊首頁(yè)用戶名區(qū)域菜單中的 Interpreter 項(xiàng)(上一節(jié)圖中已經(jīng)示出),搜索 Flink,就可以看到參數(shù)列表。
Interpreter Binding
首先,將 Interpreter Binding 模式修改為 Isolated per Note,如下圖所示。
在這種模式下,每個(gè) Note 在執(zhí)行時(shí)會(huì)分別啟動(dòng) Interpreter 進(jìn)程,類似于 Flink on YARN 的 Per-job 模式,最符合生產(chǎn)環(huán)境的需要。
Flink on YARN 參數(shù)
以下是需要修改的部分基礎(chǔ)參數(shù)。注意這些參數(shù)也可以在 Note 中指定,每個(gè)作業(yè)自己的配置會(huì)覆蓋掉這里的默認(rèn)配置。
- FLINK_HOME:Flink 1.11所在的目錄;
- HADOOP_CONF_DIR:Hadoop 配置文件所在的目錄;
- flink.execution.mode:Flink 作業(yè)的執(zhí)行模式,指定為 YARN 以啟用 Flink on YARN;
- flink.jm.memory:JobManager 的內(nèi)存量(MB);
- flink.tm.memory:TaskManager 的內(nèi)存量(MB);
- flink.tm.slot:TaskManager 的 Slot 數(shù);
- flink.yarn.appName:YARN Application 的默認(rèn)名稱;
- flink.yarn.queue:提交作業(yè)的默認(rèn) YARN 隊(duì)列。
Hive Integration 參數(shù)
如果我們想訪問(wèn) Hive 數(shù)據(jù),以及用 HiveCatalog 管理 Flink SQL 的元數(shù)據(jù),還需要配置與 Hive 的集成。
- HIVE_CONF_DIR:Hive 配置文件(hive-site.xml)所在的目錄;
- zeppelin.flink.enableHive:設(shè)為 true 以啟用 Hive Integration;
- zeppelin.flink.hive.version:Hive 版本號(hào)。
- 復(fù)制與 Hive Integration 相關(guān)的依賴到 $FLINK_HOME/lib 目錄下,包括:
- flink-connector-hive_2.11-1.11.0.jar
- flink-hadoop-compatibility_2.11-1.11.0.jar
- hive-exec-..jar
- 如果 Hive 版本是1.x,還需要額外加入 hive-metastore-1.*.jar、libfb303-0.9.2.jar 和 libthrift-0.9.2.jar
- 保證 Hive 元數(shù)據(jù)服務(wù)(Metastore)啟動(dòng)。注意不能是 Embedded 模式,即必須以外部數(shù)據(jù)庫(kù)(MySQL、Postgres等)作為元數(shù)據(jù)存儲(chǔ)。
Interpreter on YARN 參數(shù)
在默認(rèn)情況下,Interpreter 進(jìn)程是在部署 Zeppelin 服務(wù)的節(jié)點(diǎn)上啟動(dòng)的。隨著提交的任務(wù)越來(lái)越多,就會(huì)出現(xiàn)單點(diǎn)問(wèn)題。因此我們需要讓 Interpreter 也在 YARN 上運(yùn)行,如下圖所示。
- zeppelin.interpreter.yarn.resource.cores:Interpreter Container 占用的vCore 數(shù)量;
- zeppelin.interpreter.yarn.resource.memory:Interpreter Container 占用的內(nèi)存量(MB);
- zeppelin.interpreter.yarn.queue:Interpreter 所處的 YARN 隊(duì)列名稱。
配置完成之后,Flink on Zeppelin 集成完畢,可以測(cè)試一下了。
測(cè)試 Flink SQL on Zeppelin
創(chuàng)建一個(gè) Note,Interpreter 指定為 Flink。然后寫入第一個(gè) Paragraph:
以 %flink.conf 標(biāo)記的 Paragraph 用于指定這個(gè) Note 中的作業(yè)配置,支持 Flink 的所有配置參數(shù)(參見(jiàn) Flink 官網(wǎng))。另外,flink.execution.packages 參數(shù)支持以 Maven GAV 坐標(biāo)的方式引入外部依賴項(xiàng)。
接下來(lái)創(chuàng)建第二個(gè) Paragraph,創(chuàng)建 Kafka 流表:
%flink.ssql 表示利用 StreamTableEnvironment 執(zhí)行流處理 SQL,相對(duì)地,%flink.bsql 表示利用 BatchTableEnvironment 執(zhí)行批處理 SQL。注意表參數(shù)中的 properties.bootstrap.servers 利用了 Zeppelin Credentials 來(lái)填寫,方便不同作業(yè)之間復(fù)用。
執(zhí)行上述 SQL 之后會(huì)輸出信息:
同時(shí)在 Hive 中可以看到該表的元數(shù)據(jù)。
最后寫第三個(gè) Paragraph,從流表中查詢,并實(shí)時(shí)展現(xiàn)出來(lái):
點(diǎn)擊右上角的 FLINK JOB 標(biāo)記,可以打開(kāi)作業(yè)的 Web UI。上述作業(yè)的 JobGraph 如下。
除 SELECT 查詢外,通過(guò) Zeppelin 也可以執(zhí)行 INSERT 查詢,實(shí)現(xiàn)更加豐富的功能。關(guān)于 Flink SQL on Zeppelin 的更多應(yīng)用,筆者在今后的文章中會(huì)繼續(xù)講解。
?
原文鏈接
 本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink SQL 1.11 on Zeppelin 平台化实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: 阿里云研究员叔同:云原生是企业数字创新的
- 下一篇: 运维更简单、更智能,让运维人不再 “拼命
