Flink JAR包上传和运行逻辑
https://blog.csdn.net/xianzhen376/article/details/86774348
文章目錄
說明
啟動ResetServer
注冊Handler
Upload JAR
Run Jar
生成JobGraph的過程
調(diào)用用戶程序main方法
執(zhí)行用戶程序main方法
執(zhí)行execute (和接觸過一個概念很類似-打樁測試)
提交JobGraph
ExectionGraph Deploy的過程
說明
目標:走讀Flink Clint中Upload jar、Run jar相關(guān)代碼
源碼版本:1.6.1
部屬模式:Standalone
相關(guān)知識點:Netty、 CompletedFuture
啟動ResetServer
RestServerEndpoint.start
注冊Handler
代碼From DispatcherRestEndpoint.java
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);
...
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
executor,
clusterConfiguration);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
// 此處注冊了JAR Upload和Run的處理方法
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
restAddressFuture,
timeout,
responseHeaders,
uploadDir,
executor,
clusterConfiguration);
// register extension handlers
handlers.addAll(webSubmissionExtension.getHandlers());
} catch (FlinkException e) {
...
}
} else {
log.info("Web-based job submission is not enabled.");
}
...
return handlers;
}
在WebSubmissionExtension中,可以看到定義了Upload、Run、List、Delete、Plan的Handler
Upload JAR
處理代碼在JarUploadHandler的handleRequest方法中。
Jar包存放路徑:
jarDir.resolve(UUID.randomUUID() + "_" + fileUpload.getFileName());
方法本身邏輯簡單,比較隱蔽的是jarDir的值。通過倒推尋找該值的賦值過程。
JarUploadHandler 構(gòu)造時賦值屬性jarDir;
JarUploadHandler由WebSubmissionExtension通過WebMonitorUtils.loadWebSubmissionExtension構(gòu)造,jarDir源自父類RestServerEndpoint中的變量uploadDir;
RestServerEndpoint中uploadDir通過configuration.getUploadDir()初始化
在RestServerEndpointConfiguration中找到了源頭:
final Path uploadDir = Paths.get(
config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)),
"flink-web-upload");
一般情況下,大家都不會改寫配置項WebOption.UPLOAD_DIR(對應(yīng)配置項“web.upload.dir”),所以JAR包存放到了"$WebOptions.TMP_DIR/flink-web-upload"
WebOptions.TMP_DIR的賦值比較隱蔽,只從配置文件看,是在/tmp目錄。但是在ClusterEntrypoint的generateClusterConfiguration中,其實對該值進行了改寫:
final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
final File uniqueWebTmpDir = new File(webTmpDir, "flink-web-" + UUID.randomUUID());
resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.getAbsolutePath());
最終的效果JAR包存放目錄是"/tmp/flink-web-UUID/flink-web-upload"
存放在tmp目錄里面是有風險的,過期后會被刪除。
Run Jar
同上,重點關(guān)注JarRunHandler的handleRequest
@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
...
# 產(chǎn)生JobGraph
final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
jarFile,
entryClass,
programArgs,
savepointRestoreSettings,
parallelism);
CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
# Jar上傳JobGraph,UserJar和UserArtifact
CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
try {
ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
} catch (FlinkException e) {
throw new CompletionException(e);
}
return jobGraph;
});
CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
// we have to enable queued scheduling because slots will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
# 提交Job
return gateway.submitJob(jobGraph, timeout);
});
return jobSubmissionFuture
.thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID()))
.exceptionally(throwable -> {
throw new CompletionException(new RestHandlerException(
throwable.getMessage(),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
throwable));
});
}
生成JobGraph的過程
/* 在JarRunHandler的getJobGraphAsync中構(gòu)造了PackagedProgram */
final PackagedProgram packagedProgram = new PackagedProgram(
jarFile.toFile(),
entryClass,
programArgs.toArray(new String[programArgs.size()]));
jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism);
/* From PackagedProgramUtils.java */
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism) throws ProgramInvocationException {
....
if (packagedProgram.isUsingProgramEntryPoint()) {
...
} else if (packagedProgram.isUsingInteractiveMode()) {
/* 一般提交的流程序會走這個分支,判斷原則是用戶程序的main Class是否isAssignableFrom ProgramDescription */
final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
optimizerPlanEnvironment.setParallelism(defaultParallelism);
// 會觸發(fā)main函數(shù)調(diào)用
flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
} else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
if (flinkPlan instanceof StreamingPlan) {
// 獲取JobGraph
jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
} else {
...
}
...
return jobGraph;
}
調(diào)用用戶程序main方法
/* From OptimizerPlanEnvironment.java */
public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
...
/* 設(shè)置ContextEnviormentFacoty對應(yīng)的env為OptimizerPlanEnvironment */
setAsContext();
try {
/* 調(diào)用用戶程序main方法 */
prog.invokeInteractiveModeForExecution();
}
...
}
執(zhí)行用戶程序main方法
// 一個常見的main 結(jié)構(gòu)
public static void main(String[] args) throws Exception {
/* 此處獲取的是上一步setAsContext中設(shè)置的OptimizerPlanEnvironment */
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
/* 對應(yīng)的是執(zhí)行OptimizerPlanEnvironment的execute */
env.execute();
}
執(zhí)行execute (和接觸過一個概念很類似-打樁測試)
public JobExecutionResult execute(String jobName) throws Exception {
/* 反饋Compile后的FlinkPlan */
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// execute后不要帶其他的用戶程序
// do not go on with anything now!
throw new ProgramAbortException();
}
提交JobGraph
OK,已經(jīng)得到了JobGraph,再細看提交JobGraph的過程
/* From Dispatcher.java */
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
...
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
} else {
//重點關(guān)注persistAndRunJob
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
}
省略一些方法間調(diào)用,調(diào)用順序如下:
Dispatch.persistAndRunJob
Dispatch.runJob
Dispatch.createJobManagerRunner,創(chuàng)建JobMaster
JobMaster.createAndRestoreExecutionGraph
終于看到了ExecutionGraph
ExectionGraph Deploy的過程
方法間調(diào)用關(guān)系:
上接Dispatcher.createJobManagerRunner
Dispatcher.startJobManagerRunner
JobManagerRunner.start
StandaloneLeaderElectionService.start
JobManagerRunner.grantLeadership
JobManagerRunner.verifyJobSchedulingStatusAndStartJobManager
JobMaster.start
JobMaster.startJobExecution
JobMaster.resetAndScheduleExecutionGraph
JobMaster.scheduleExecutionGraph
ExecutionGraph.scheduleForExecution
ExecutionGraph.scheduleEager
Execution.deploy
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/10647135.html
總結(jié)
以上是生活随笔為你收集整理的Flink JAR包上传和运行逻辑的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring面试问题与答案集锦
- 下一篇: 追源索骥:透过源码看懂Flink核心框架