flink启动命令参数_Flink集群部署
部署方式
一般來講有三種方式:
- Local
- Standalone
- Flink On Yarn/Mesos/K8s…
單機模式
參考上一篇Flink從入門到放棄(入門篇2)-本地環(huán)境搭建&構(gòu)建第一個Flink應(yīng)用
Standalone模式部署
我們基于CentOS7虛擬機搭建一個3個節(jié)點的集群:
角色分配:
Master: 192.168.246.134 Slave: 192.168.246.135 Slave: 192.168.246.136 復(fù)制代碼
192.168.246.134 jobmanager 192.168.246.135 taskmanager 192.168.246.136 taskmanager 復(fù)制代碼
假設(shè)三臺機器都存在: 用戶root 密碼為123
192.168.246.134 master 192.168.246.135 slave1 192.168.246.136 slave2 復(fù)制代碼
三臺機器首先要做ssh免登,具體方法很簡單,可以百度。
下載一個包到本地: 這里我選擇了1.7.2版本+Hadoop2.8+Scala2.11版本 然后,分發(fā)
scp flink-1.7.2-bin-hadoop28-scala_2.11.tgz root@192.168.246.13X:~ scp jdk-8u11-linux-x64.tar.gz root@192.168.246.13X:~ 注意:X代表4、5、6,分發(fā)到3臺機器 修改解壓后目錄屬主: Chown -R root:root flink/ Chown -R root:root jdk8/ export JAVA_HOME=/root/jdk8 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH 復(fù)制代碼
分別修改master和slave的flink-conf.yaml文件
Vim flink/conf/flink-conf.yaml ##配置master節(jié)點ip jobmanager.rpc.address: 192.168.1.100 ##配置slave節(jié)點可用內(nèi)存,單位MB taskmanager.heap.mb: 25600 ##配置每個節(jié)點的可用slot,1 核CPU對應(yīng) 1 slot ##the number of available CPUs per machine taskmanager.numberOfTaskSlots: 30 ##默認并行度 1 slot資源 parallelism.default: 1 修改slave節(jié)點配置文件slaves: 192.168.246.135 192.168.246.136 復(fù)制代碼
啟動集群:
##在master節(jié)點上執(zhí)行此腳本,就可以啟動集群,前提要保證master節(jié)點到slaver節(jié)點可以免密登錄, ##因為它的啟動過程是:先在master節(jié)點啟動jobmanager進程,然后ssh到各slaver節(jié)點啟動taskmanager進程 ./bin/start-cluster.sh 停止集群: ./bin/stop-cluster.sh 復(fù)制代碼
Flink on yarn集群部署
Yarn的簡介:
- ResourceManager ResourceManager 負責(zé)整個集群的資源管理和分配,是一個全局的資源管理系統(tǒng)。 NodeManager 以心跳的方式向 ResourceManager 匯報資源使用情況(目前主要是 CPU 和內(nèi)存的使用情況)。RM 只接受 NM 的資源回報信息,對于具體的資源處理則交給 NM 自己處理。
- NodeManager NodeManager 是每個節(jié)點上的資源和任務(wù)管理器,它是管理這臺機器的代理,負責(zé)該節(jié)點程序的運行,以及該節(jié)點資源的管理和監(jiān)控。YARN 集群每個節(jié)點都運行一個NodeManager。 NodeManager 定時向 ResourceManager 匯報本節(jié)點資源(CPU、內(nèi)存)的使用情況和Container 的運行狀態(tài)。當 ResourceManager 宕機時 NodeManager 自動連接 RM 備用節(jié)點。 NodeManager 接收并處理來自 ApplicationMaster 的 Container 啟動、停止等各種請求。
- ApplicationMaster 負責(zé)與 RM 調(diào)度器協(xié)商以獲取資源(用 Container 表示)。 將得到的任務(wù)進一步分配給內(nèi)部的任務(wù)(資源的二次分配)。 與 NM 通信以啟動/停止任務(wù)。 監(jiān)控所有任務(wù)運行狀態(tài),并在任務(wù)運行失敗時重新為任務(wù)申請資源以重啟任務(wù)
Flink on yarn 集群啟動步驟
- 步驟1 用戶向YARN中提交應(yīng)用程序,其中包括ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等。
- 步驟2 ResourceManager為該應(yīng)用程序分配第一個Container,并與對應(yīng)的Node-Manager通信,要求它在這個Container中啟動應(yīng)用程序的ApplicationMaster。
- 步驟3 ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應(yīng)用程序的運行狀態(tài),然后它將為各個任務(wù)申請資源,并監(jiān)控它的運行狀態(tài),直到運行結(jié)束,即重復(fù)步驟4~7。
- 步驟4 ApplicationMaster采用輪詢的方式通過RPC協(xié)議向ResourceManager申請和領(lǐng)取資源。
- 步驟5 一旦ApplicationMaster申請到資源后,便與對應(yīng)的NodeManager通信,要求它啟動任務(wù)。
- 步驟6 NodeManager為任務(wù)設(shè)置好運行環(huán)境(包括環(huán)境變量、JAR包、二進制程序等)后,將任務(wù)啟動命令寫到一個腳本中,并通過運行該腳本啟動任務(wù)。
- 步驟7 各個任務(wù)通過某個RPC協(xié)議向ApplicationMaster匯報自己的狀態(tài)和進度,以讓ApplicationMaster隨時掌握各個任務(wù)的運行狀態(tài),從而可以在任務(wù)失敗時重新啟動任務(wù)。 在應(yīng)用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應(yīng)用程序的當前運行狀態(tài)。
- 步驟8 應(yīng)用程序運行完成后,ApplicationMaster向ResourceManager注銷并關(guān)閉自己
on yarn 集群部署
設(shè)置Hadoop環(huán)境變量:
[root@hadoop2 flink-1.7.2]# vi /etc/profile export HADOOP_CONF_DIR=這里是你自己的hadoop路徑 復(fù)制代碼
bin/yarn-session.sh -h 查看使用方法:
bin/yarn-session.sh -h Usage: Required -n,--container <arg> 為YARN分配容器的數(shù)量 (=Number of Task Managers) Optional -D <property=value> 動態(tài)屬性 -d,--detached 以分離模式運行作業(yè) -h,--help Yarn session幫助. -id,--applicationId <arg> 連接到一個正在運行的YARN session -j,--jar <arg> Flink jar文件的路徑 -jm,--jobManagerMemory <arg> JobManager的內(nèi)存大小,driver-memory [in MB] -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -n,--container <arg> TaskManager的數(shù)量,相當于executor的數(shù)量 -nm,--name <arg> 設(shè)置YARN應(yīng)用自定義名稱 -q,--query 顯示可用的YARN資源 (memory, cores) -qu,--queue <arg> 指定YARN隊列 -s,--slots <arg> 每個JobManager的core的數(shù)量,executor-cores。建議將slot的數(shù)量設(shè)置每臺機器的處理器數(shù)量 -st,--streaming 在流模式下啟動Flink -t,--ship <arg> 在指定目錄中傳送文件(t for transfer) -tm,--taskManagerMemory <arg> 每個TaskManager的內(nèi)存大小,executor-memory [in MB] -yd,--yarndetached 如果存在,則以分離模式運行作業(yè) (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> 為高可用性模式創(chuàng)建Zookeeper子路徑的命名空間 復(fù)制代碼
在啟動的是可以指定TaskManager的個數(shù)以及內(nèi)存(默認是1G),也可以指定JobManager的內(nèi)存,但是JobManager的個數(shù)只能是一個
我們開啟動一個YARN session:
./bin/yarn-session.sh -n 4 -tm 8192 -s 8 復(fù)制代碼
上面命令啟動了4個TaskManager,每個TaskManager內(nèi)存為8G且占用了8個核(是每個TaskManager,默認是1個核)。在啟動YARN session的時候會加載conf/flink-config.yaml配置文件,我們可以根據(jù)自己的需求去修改里面的相關(guān)參數(shù).
YARN session啟動之后就可以使用bin/flink來啟動提交作業(yè):
例如:
./bin/flink run -c com.demo.wangzhiwu.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar --port 9000 復(fù)制代碼
flink run的用法如下:
用法: run [OPTIONS] <jar-file> <arguments> "run" 操作參數(shù): -c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數(shù)指定 -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節(jié)點)地址 使用這個參數(shù)可以指定一個不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆蓋配置文件中的默認值。 復(fù)制代碼
使用run 命令向yarn集群提交一個job。客戶端可以確定jobmanager的地址。當然,你也可以通過-m參數(shù)指定jobmanager。jobmanager的地址在yarn控制臺上可以看到。
值得注意的是:
上面的YARN session是在Hadoop YARN環(huán)境下啟動一個Flink cluster集群,里面的資源是可以共享給其他的Flink作業(yè)。我們還可以在YARN上啟動一個Flink作業(yè)。這里我們還是使用./bin/flink,但是不需要事先啟動YARN session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input hdfs://user/hadoop/input.txt --output hdfs://user/hadoop/output.txt 復(fù)制代碼
上面的命令同樣會啟動一個類似于YARN session啟動的頁面。其中的-yn是指TaskManager的個數(shù),必須要指定。
后臺運行 yarn session
如果你不希望flink yarn client一直運行,也可以啟動一個后臺運行的yarn session。使用這個參數(shù):-d 或者 --detached 在這種情況下,flink yarn client將會只提交任務(wù)到集群然后關(guān)閉自己。注意:在這種情況下,無法使用flink停止yarn session。 必須使用yarn工具來停止yarn session
yarn application -kill <applicationId> 復(fù)制代碼
flink on yarn的故障恢復(fù)
flink 的 yarn 客戶端通過下面的配置參數(shù)來控制容器的故障恢復(fù)。這些參數(shù)可以通過conf/flink-conf.yaml 或者在啟動yarn session的時候通過-D參數(shù)來指定。
- yarn.reallocate-failed:這個參數(shù)控制了flink是否應(yīng)該重新分配失敗的taskmanager容器。默認是true。
- yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失敗次數(shù),達到這個參數(shù),就會認為yarn session失敗。默認這個次數(shù)和初始化請求的taskmanager數(shù)量相等(-n 參數(shù)指定的)。
- yarn.application-attempts:applicationMaster重試的次數(shù)。如果這個值被設(shè)置為1(默認就是1),當application master失敗的時候,yarn session也會失敗。設(shè)置一個比較大的值的話,yarn會嘗試重啟applicationMaster。
日志文件查看
在某種情況下,flink yarn session 部署失敗是由于它自身的原因,用戶必須依賴于yarn的日志來進行分析。最有用的就是yarn log aggregation 。啟動它,用戶必須在yarn-site.xml文件中設(shè)置yarn.log-aggregation-enable 屬性為true。一旦啟用了,用戶可以通過下面的命令來查看一個失敗的yarn session的所有詳細日志。
yarn logs -applicationId <application ID> 復(fù)制代碼
完。
公眾號推薦
- 全網(wǎng)唯一一個從0開始幫助Java開發(fā)者轉(zhuǎn)做大數(shù)據(jù)領(lǐng)域的公眾號~
- 大數(shù)據(jù)技術(shù)與架構(gòu)或者搜索import_bigdata關(guān)注~
- 海量【java和大數(shù)據(jù)的面試題+視頻資料】整理在公眾號,關(guān)注后可以下載~
總結(jié)
以上是生活随笔為你收集整理的flink启动命令参数_Flink集群部署的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle对比两列数据_oracle与
- 下一篇: 4种实例 advice aop_Java