diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md index a8205b1851929..5f857542bf74b 100644 --- a/docs/docs/en/guide/task/flink.md +++ b/docs/docs/en/guide/task/flink.md @@ -24,7 +24,7 @@ Flink task type for executing Flink programs. For Flink nodes, the worker submit - **Program type**: Supports Java, Scala and Python. - **The class of main function**: The **full path** of Main Class, the entry point of the Flink program. - **Main jar package**: The jar package of the Flink program (upload by Resource Center). -- **Deployment mode**: Support 2 deployment modes: cluster and local. +- **Deployment mode**: Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)). - **Flink version**: Select version according to the execution env. - **Task name** (optional): Flink task name. - **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment. diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md index 2aa34691c4a0d..5ee5f6850b7b2 100644 --- a/docs/docs/zh/guide/task/flink.md +++ b/docs/docs/zh/guide/task/flink.md @@ -24,7 +24,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker - 程序类型:支持 Java、Scala 和 Python 三种语言。 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。 -- 部署方式:支持 cluster 和 local 两种模式的部署。 +- 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)) 三种模式的部署。 - Flink 版本:根据所需环境选择对应的版本即可。 - 任务名称(选填):Flink 程序的名称。 - jobManager 内存数:用于设置 jobManager 内存数,可根据实际生产环境设置对应的内存数。 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index ea047bea6ca09..2bb574d633635 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * flink args utils @@ -35,67 +36,96 @@ private FlinkArgsUtils() { private static final String LOCAL_DEPLOY_MODE = "local"; private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12"; /** - * build args + * default flink deploy mode + */ + public static final FlinkDeployMode DEFAULT_DEPLOY_MODE = FlinkDeployMode.CLUSTER; + + /** + * build flink command line * * @param param flink parameters * @return argument list */ - public static List buildArgs(FlinkParameters param) { + public static List buildCommandLine(FlinkParameters param) { List args = new ArrayList<>(); - String deployMode = "cluster"; - String tmpDeployMode = param.getDeployMode(); - if (StringUtils.isNotEmpty(tmpDeployMode)) { - deployMode = tmpDeployMode; + args.add(FlinkConstants.FLINK_COMMAND); + FlinkDeployMode deployMode = Optional.ofNullable(param.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE); + String flinkVersion = param.getFlinkVersion(); + // build run command + switch (deployMode) { + case CLUSTER: + if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)) { + args.add(FlinkConstants.FLINK_RUN); //run + args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t + args.add(FlinkConstants.FLINK_YARN_PER_JOB); //yarn-per-job + } else { + args.add(FlinkConstants.FLINK_RUN); //run + args.add(FlinkConstants.FLINK_RUN_MODE); //-m + args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster + } + break; + case APPLICATION: + args.add(FlinkConstants.FLINK_RUN_APPLICATION); //run-application + args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t + args.add(FlinkConstants.FLINK_YARN_APPLICATION); //yarn-application + break; + case LOCAL: + args.add(FlinkConstants.FLINK_RUN); //run + break; } + String others = param.getOthers(); - if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { - args.add(FlinkConstants.FLINK_RUN_MODE); //-m - - args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster - - int slot = param.getSlot(); - if (slot > 0) { - args.add(FlinkConstants.FLINK_YARN_SLOT); - args.add(String.format("%d", slot)); //-ys - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { //-ynm - args.add(FlinkConstants.FLINK_APP_NAME); - args.add(ArgsUtils.escape(appName)); - } - - // judge flink version, the parameter -yn has removed from flink 1.10 - String flinkVersion = param.getFlinkVersion(); - if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { - int taskManager = param.getTaskManager(); - if (taskManager > 0) { //-yn - args.add(FlinkConstants.FLINK_TASK_MANAGE); - args.add(String.format("%d", taskManager)); + + // build args + switch (deployMode) { + case CLUSTER: + case APPLICATION: + int slot = param.getSlot(); + if (slot > 0) { + args.add(FlinkConstants.FLINK_YARN_SLOT); + args.add(String.format("%d", slot)); //-ys + } + + String appName = param.getAppName(); + if (StringUtils.isNotEmpty(appName)) { //-ynm + args.add(FlinkConstants.FLINK_APP_NAME); + args.add(ArgsUtils.escape(appName)); + } + + // judge flink version, the parameter -yn has removed from flink 1.10 + if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { + int taskManager = param.getTaskManager(); + if (taskManager > 0) { //-yn + args.add(FlinkConstants.FLINK_TASK_MANAGE); + args.add(String.format("%d", taskManager)); + } + } + String jobManagerMemory = param.getJobManagerMemory(); + if (StringUtils.isNotEmpty(jobManagerMemory)) { + args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM); + args.add(jobManagerMemory); //-yjm } - } - String jobManagerMemory = param.getJobManagerMemory(); - if (StringUtils.isNotEmpty(jobManagerMemory)) { - args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM); - args.add(jobManagerMemory); //-yjm - } - - String taskManagerMemory = param.getTaskManagerMemory(); - if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm - args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM); - args.add(taskManagerMemory); - } - - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { // -yqu - args.add(FlinkConstants.FLINK_QUEUE); - args.add(queue); + + String taskManagerMemory = param.getTaskManagerMemory(); + if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm + args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM); + args.add(taskManagerMemory); + } + + if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(queue)) { // -yqu + args.add(FlinkConstants.FLINK_QUEUE); + args.add(queue); + } } - } + break; + case LOCAL: + break; } int parallelism = param.getParallelism(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 7a3c1c6fc9bf6..6c695163a28ba 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -26,13 +26,18 @@ private FlinkConstants() { /** * flink */ + public static final String FLINK_COMMAND = "flink"; + public static final String FLINK_RUN = "run"; + public static final String FLINK_RUN_APPLICATION = "run-application"; public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; + public static final String FLINK_YARN_APPLICATION = "yarn-application"; + public static final String FLINK_YARN_PER_JOB = "yarn-per-job"; public static final String FLINK_RUN_MODE = "-m"; + public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; public static final String FLINK_QUEUE = "-yqu"; public static final String FLINK_TASK_MANAGE = "-yn"; - public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; public static final String FLINK_MAIN_CLASS = "-c"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java new file mode 100644 index 0000000000000..b02cd40f928f8 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Flink deploy mode + */ +public enum FlinkDeployMode { + @JsonProperty("local") + LOCAL, + @JsonProperty("cluster") + CLUSTER, + @JsonProperty("application") + APPLICATION +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java index c458c24f904bf..daa28794d5f8f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java @@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters { private String mainClass; /** - * deploy mode yarn-cluster yarn-local + * deploy mode yarn-cluster yarn-local yarn-application */ - private String deployMode; + private FlinkDeployMode deployMode; /** * arguments @@ -120,11 +120,11 @@ public void setMainClass(String mainClass) { this.mainClass = mainClass; } - public String getDeployMode() { + public FlinkDeployMode getDeployMode() { return deployMode; } - public void setDeployMode(String deployMode) { + public void setDeployMode(FlinkDeployMode deployMode) { this.deployMode = deployMode; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 5e1734b7bfaba..cd5e16bac04a2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -35,13 +35,6 @@ public class FlinkTask extends AbstractYarnTask { - /** - * flink command - * usage: flink run [OPTIONS] - */ - private static final String FLINK_COMMAND = "flink"; - private static final String FLINK_RUN = "run"; - /** * flink parameters */ @@ -95,14 +88,8 @@ public void init() { */ @Override protected String buildCommand() { - // flink run [OPTIONS] - List args = new ArrayList<>(); - - args.add(FLINK_COMMAND); - args.add(FLINK_RUN); - logger.info("flink task args : {}", args); - // other parameters - args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); + // flink run/run-application [OPTIONS] + List args = FlinkArgsUtils.buildCommandLine(flinkParameters); String command = ParameterUtils .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java new file mode 100644 index 0000000000000..f8771baed4ab9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink; + +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class FlinkArgsUtilsTest { + + private String joinStringListWithSpace(List stringList) { + return String.join(" ", stringList); + } + + private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode flinkDeployMode) { + FlinkParameters flinkParameters = new FlinkParameters(); + flinkParameters.setProgramType(ProgramType.SCALA); + flinkParameters.setDeployMode(flinkDeployMode); + flinkParameters.setParallelism(4); + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setId(1); + resourceInfo.setResourceName("job"); + resourceInfo.setRes("/opt/job.jar"); + flinkParameters.setMainJar(resourceInfo); + flinkParameters.setMainClass("org.example.Main"); + + return flinkParameters; + } + + @Test + public void testApplicationMode() throws Exception { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); + List commandLine = FlinkArgsUtils.buildCommandLine(flinkParameters); + + Assert.assertEquals( + "flink run-application -t yarn-application -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine)); + } + + @Test + public void testClusterMode() throws Exception { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER); + flinkParameters.setFlinkVersion("1.11"); + List commandLine1 = FlinkArgsUtils.buildCommandLine(flinkParameters); + + Assert.assertEquals( + "flink run -m yarn-cluster -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine1)); + + flinkParameters.setFlinkVersion("<1.10"); + List commandLine2 = FlinkArgsUtils.buildCommandLine(flinkParameters); + + Assert.assertEquals( + "flink run -m yarn-cluster -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine2)); + + flinkParameters.setFlinkVersion(">=1.12"); + List commandLine3 = FlinkArgsUtils.buildCommandLine(flinkParameters); + + Assert.assertEquals( + "flink run -t yarn-per-job -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine3)); + } + + @Test + public void testLocalMode() throws Exception { + FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL); + List commandLine = FlinkArgsUtils.buildCommandLine(flinkParameters); + + Assert.assertEquals( + "flink run -p 4 -sae -c org.example.Main /opt/job.jar", + joinStringListWithSpace(commandLine)); + } +} diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts index 9b2170ec90494..cca17b08d92b8 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { computed, ref } from 'vue' +import { computed, watch } from 'vue' import { useI18n } from 'vue-i18n' -import { useCustomParams, useDeployMode, useMainJar, useResources } from '.' +import { useCustomParams, useMainJar, useResources } from '.' import type { IJsonItem } from '../types' export function useFlink(model: { [field: string]: any }): IJsonItem[] { @@ -26,14 +26,51 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { ) const taskManagerNumberSpan = computed(() => - model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0 + model.flinkVersion === '<1.10' && model.deployMode !== 'local' ? 12 : 0 ) const deployModeSpan = computed(() => - model.deployMode === 'cluster' ? 12 : 0 + model.deployMode !== 'local' ? 12 : 0 ) - const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0)) + const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0)) + + const deployModeOptions = computed(() => { + if (model.flinkVersion === '<1.10') { + return [ + { + label: 'cluster', + value: 'cluster' + }, + { + label: 'local', + value: 'local' + } + ]; + } else { + return [ + { + label: 'per-job/cluster', + value: 'cluster' + }, + { + label: 'application', + value: 'application' + }, + { + label: 'local', + value: 'local' + } + ]; + } + }) + + watch( + () => model.flinkVersion, + () => { + model.deployMode = 'cluster' + } + ) return [ { @@ -68,7 +105,13 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { } }, useMainJar(model), - useDeployMode(24, ref(false)), + { + type: 'radio', + field: 'deployMode', + name: t('project.node.deploy_mode'), + options: deployModeOptions, + span: 24 + }, { type: 'select', field: 'flinkVersion', @@ -223,7 +266,11 @@ const FLINK_VERSIONS = [ value: '<1.10' }, { - label: '>=1.10', - value: '>=1.10' + label: '1.11', + value: '1.11' + }, + { + label: '>=1.12', + value: '>=1.12' } ]