Skip to content

Commit

Permalink
[feature][task-flink] Support Flink application mode
Browse files Browse the repository at this point in the history
  • Loading branch information
paul8263 committed May 9, 2022
1 parent 15a15de commit c3b7321
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/docs/en/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Flink task type for executing Flink programs. For Flink nodes, the worker submit
- **Program type**: Supports Java, Scala and Python.
- **The class of main function**: The **full path** of Main Class, the entry point of the Flink program.
- **Main jar package**: The jar package of the Flink program (upload by Resource Center).
- **Deployment mode**: Support 2 deployment modes: cluster and local.
- **Deployment mode**: Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
- **Flink version**: Select version according to the execution env.
- **Task name** (optional): Flink task name.
- **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker
- 程序类型:支持 Java、Scala 和 Python 三种语言。
- 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**
- 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
- 部署方式:支持 cluster 和 local 两种模式的部署
- 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)) 三种模式的部署
- Flink 版本:根据所需环境选择对应的版本即可。
- 任务名称(选填):Flink 程序的名称。
- jobManager 内存数:用于设置 jobManager 内存数,可根据实际生产环境设置对应的内存数。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* flink args utils
Expand All @@ -35,67 +36,96 @@ private FlinkArgsUtils() {

private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12";

/**
* build args
* default flink deploy mode
*/
public static final FlinkDeployMode DEFAULT_DEPLOY_MODE = FlinkDeployMode.CLUSTER;

/**
* build flink command line
*
* @param param flink parameters
* @return argument list
*/
public static List<String> buildArgs(FlinkParameters param) {
public static List<String> buildCommandLine(FlinkParameters param) {
List<String> args = new ArrayList<>();

String deployMode = "cluster";
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
args.add(FlinkConstants.FLINK_COMMAND);
FlinkDeployMode deployMode = Optional.ofNullable(param.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
String flinkVersion = param.getFlinkVersion();
// build run command
switch (deployMode) {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)) {
args.add(FlinkConstants.FLINK_RUN); //run
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
args.add(FlinkConstants.FLINK_YARN_PER_JOB); //yarn-per-job
} else {
args.add(FlinkConstants.FLINK_RUN); //run
args.add(FlinkConstants.FLINK_RUN_MODE); //-m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster
}
break;
case APPLICATION:
args.add(FlinkConstants.FLINK_RUN_APPLICATION); //run-application
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
args.add(FlinkConstants.FLINK_YARN_APPLICATION); //yarn-application
break;
case LOCAL:
args.add(FlinkConstants.FLINK_RUN); //run
break;
}

String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(FlinkConstants.FLINK_RUN_MODE); //-m

args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster

int slot = param.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}

String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}

// judge flink version, the parameter -yn has removed from flink 1.10
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager > 0) { //-yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));

// build args
switch (deployMode) {
case CLUSTER:
case APPLICATION:
int slot = param.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}

String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}

// judge flink version, the parameter -yn has removed from flink 1.10
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager > 0) { //-yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}

String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}

if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);

String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}

if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
}
break;
case LOCAL:
break;
}

int parallelism = param.getParallelism();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ private FlinkConstants() {
/**
* flink
*/
public static final String FLINK_COMMAND = "flink";
public static final String FLINK_RUN = "run";
public static final String FLINK_RUN_APPLICATION = "run-application";
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_YARN_APPLICATION = "yarn-application";
public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";

public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_MAIN_CLASS = "-c";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.flink;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Flink deploy mode
*/
public enum FlinkDeployMode {
@JsonProperty("local")
LOCAL,
@JsonProperty("cluster")
CLUSTER,
@JsonProperty("application")
APPLICATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters {
private String mainClass;

/**
* deploy mode yarn-cluster yarn-local
* deploy mode yarn-cluster yarn-local yarn-application
*/
private String deployMode;
private FlinkDeployMode deployMode;

/**
* arguments
Expand Down Expand Up @@ -120,11 +120,11 @@ public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}

public String getDeployMode() {
public FlinkDeployMode getDeployMode() {
return deployMode;
}

public void setDeployMode(String deployMode) {
public void setDeployMode(FlinkDeployMode deployMode) {
this.deployMode = deployMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@

public class FlinkTask extends AbstractYarnTask {

/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";

/**
* flink parameters
*/
Expand Down Expand Up @@ -95,14 +88,8 @@ public void init() {
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();

args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
// flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildCommandLine(flinkParameters);

String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.flink;

import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

public class FlinkArgsUtilsTest {

private String joinStringListWithSpace(List<String> stringList) {
return String.join(" ", stringList);
}

private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode flinkDeployMode) {
FlinkParameters flinkParameters = new FlinkParameters();
flinkParameters.setProgramType(ProgramType.SCALA);
flinkParameters.setDeployMode(flinkDeployMode);
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setResourceName("job");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");

return flinkParameters;
}

@Test
public void testApplicationMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildCommandLine(flinkParameters);

Assert.assertEquals(
"flink run-application -t yarn-application -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}

@Test
public void testClusterMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setFlinkVersion("1.11");
List<String> commandLine1 = FlinkArgsUtils.buildCommandLine(flinkParameters);

Assert.assertEquals(
"flink run -m yarn-cluster -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));

flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 = FlinkArgsUtils.buildCommandLine(flinkParameters);

Assert.assertEquals(
"flink run -m yarn-cluster -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));

flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 = FlinkArgsUtils.buildCommandLine(flinkParameters);

Assert.assertEquals(
"flink run -t yarn-per-job -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}

@Test
public void testLocalMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildCommandLine(flinkParameters);

Assert.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
}
Loading

0 comments on commit c3b7321

Please sign in to comment.