flink 作业提交流程
之前給大家介紹了DataStream API中 Environment 和 Transformation 連個體系的源代碼,今天來了小插曲,給大家宏觀介紹下 Flink 作業(yè)的提交流程,希望對大家有幫助。
一、DataStream 作業(yè)提交流程
1)、首先,先給大家展示下流程圖:
2)、提交流程說明:
FlinkCli 先創(chuàng)建一個 Flink 環(huán)境變量
然后將環(huán)境變量存入到ThreadLocal中
在啟動 Flink 作業(yè)jar包的 main 方法
Flink 應(yīng)用程序通過 StreamExecutionEnvironment.getExecutionEnvironment() 獲取到相應(yīng)的執(zhí)行環(huán)境變量
Flink 應(yīng)用程序?qū)⒂脩艟帉懙淖鳂I(yè)轉(zhuǎn)換成 jobGraph 提交給Flink 集群
3)、Flink 作業(yè)以哪種方式提交,取決于 StreamExecutionEnvironment 的配置信息;
起到主要作用的配置參數(shù)是 execution.target;
execution.target 取值:
remote
local
yarn-per-job
yarn-session
kubernetes-session
yarn-application
kubernetes-application
StreamExecutionEnvironment 會根據(jù) execution.target 配置的不同取值創(chuàng)建相應(yīng)的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 創(chuàng)建相應(yīng)的 PipelineExecutor, PipelineExecutor執(zhí)行相應(yīng)的作業(yè)提交工作;
源代碼探究:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute()
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph)
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (見 代碼 3-1)
ExecutorFactory 舉例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(見代碼 3-2)
代碼 3-1
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
// 通過 java SPI 技術(shù)加載 實現(xiàn)了 PipelineExecutorFactory 接口的類
final ServiceLoader loader =
ServiceLoader.load(PipelineExecutorFactory.class);
}
代碼 3-2
@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {
}
// 配置選項
public static final ConfigOption TARGET =
key(“execution.target”)
4)、FlinkCli 創(chuàng)建 Flink 環(huán)境變量相關(guān)流程:
org.apache.flink.client.cli.CliFrontend.main()
org.apache.flink.client.cli.CliFrontend.executeProgram()
org.apache.flink.client.ClientUtils.executeProgram()
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
// 設(shè)置流環(huán)境變量
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
}
5)、StreamExecutionEnvironment.getExecutionEnvironment() 獲取執(zhí)行環(huán)境的邏輯:
先從 threadLocal 獲取環(huán)境變量
如果 threadLocal 中沒有相應(yīng)的環(huán)境變量,則創(chuàng)建一個本地環(huán)境變量
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;
return Optional.ofNullable(factory);
}
二、Flink Table
1)、flink Sql 作業(yè)提交流程
2)、提交流程說明
TableEnvironmentImpl 在創(chuàng)建的過程中創(chuàng)建了 Executor , ExecutorBase 中包含了StreamExecutionEnvironment 的實例, StreamExecutionEnvironment 的實例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法創(chuàng)建。
TableEnvironmentImpl 作業(yè)的提交依賴 StreamExecutionEnvironment 的作業(yè)提交流程。
TableEnvironmentImpl 借助Parser組件將 SQL 語句轉(zhuǎn)換成 Operation,然后借助 Planner組件將Operation轉(zhuǎn)換成 List。
使用StreamExecutionEnvironment 將 List 轉(zhuǎn)換成 StreamGraph。
后續(xù)操作與DataStream提交流程一樣。
3)、 TableEnvironmentImpl .executeSql() 執(zhí)行邏輯:
Sql 解析, 將Sql語句解析為 List 變量;
Transformation轉(zhuǎn)換,將 List 轉(zhuǎn)換為 List<Transformation<?>>
PipeLine轉(zhuǎn)換, 將List<Transformation<?>> 轉(zhuǎn)換為 PipeLine
4)、TableEnvironmentImpl 創(chuàng)建過程:
ModuleManager 的創(chuàng)建
CatalogManager 的創(chuàng)建
FunctionCatalog 的創(chuàng)建
Executor (執(zhí)行環(huán)境)的創(chuàng)建, 先通過 java SPI 加載 Executor 工廠, 通過EnvironmentSettings.Builder.useBlinkPlanner() 指定為 org.apache.flink.table.planner.delegation.BlinkExecutorFactory
Planner的創(chuàng)建(包括Parser的構(gòu)造),先通過 java SPI 加載 Planner 工廠,通過EnvironmentSettings.Builder.useBlinkPlanner() 指定為org.apache.flink.table.planner.delegation.BlinkPlannerFactory
構(gòu)造TableEnvironmentImpl
5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner)
基本流程:
Sql語句解析成Sql 抽象語法樹
Planner對sql 語法樹進行驗證
將驗證過的語法樹轉(zhuǎn)換成關(guān)系代數(shù)樹
將關(guān)系代數(shù)樹封裝成Flink對應(yīng)的Operation
public List parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
Calsite :Sql 解析框架
SqlNode 代表Sql 抽象語法樹中的節(jié)點,CalciteParser 內(nèi)部使用 SqlParser 將Sql語句解析成Sql 抽象語法樹。
Operation (Flink Table API中抽象出來的概念) 代表任意類型的Sql操作行為,例如 Select 、Insert、Drop 等sql操作可以表示為QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl內(nèi)部使用 Calsite 的 SqlToRelConverter 將驗證后的抽象語法樹轉(zhuǎn)換成關(guān)系代數(shù)樹。
6)、Operation 轉(zhuǎn)換為 Transformation 邏輯 (Blink Planner : StreamPlanner / BatchPlanner)
基本流程:
從Operation中 獲取到 關(guān)系代數(shù)樹
根據(jù)優(yōu)化規(guī)則優(yōu)化關(guān)系代數(shù)樹
生成物理執(zhí)行計劃
將物理執(zhí)行計劃轉(zhuǎn)換成 List<Transformation<?>>
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[]]
}
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}
總結(jié)
以上是生活随笔為你收集整理的flink 作业提交流程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何理解几何分布与指数分布的无记忆性?
- 下一篇: 传统僵尸网络家族