From c16221256d583824f176afea7a0d7e8b1216c6f3 Mon Sep 17 00:00:00 2001 From: Tyrantlucifer Date: Wed, 16 Oct 2024 17:30:53 +0800 Subject: [PATCH] [Hotfix][Core][Flink] SeaTunnel flink engine support application mode on yarn (#7762) --- .github/workflows/backend.yml | 2 +- .../seatunnel/common/config/Common.java | 6 +++++- .../start-seatunnel-flink-13-connector-v2.sh | 19 +++++++++++++++++++ .../core/starter/flink/FlinkStarter.java | 17 +++++++++++++++++ .../start-seatunnel-flink-15-connector-v2.sh | 19 +++++++++++++++++++ .../core/starter/flink/FlinkStarter.java | 17 +++++++++++++++++ 6 files changed, 78 insertions(+), 2 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 6afc981bed0e..a5165c85baad 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -683,7 +683,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 150 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java index 95928d1e4ccd..0ebdc341fac8 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java @@ -39,6 +39,8 @@ public class Common { + private static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz"; + private Common() { throw new IllegalStateException("Utility class"); } @@ -113,8 +115,10 @@ public static Path appRootDir() { } catch (URISyntaxException e) { throw new RuntimeException(e); } - } else if (DeployMode.CLUSTER == MODE || DeployMode.RUN_APPLICATION == MODE) { + } else if (DeployMode.CLUSTER == MODE) { return Paths.get(""); + } else if (DeployMode.RUN_APPLICATION == MODE) { + return Paths.get(FLINK_YARN_APPLICATION_PATH); } else { throw new IllegalStateException("deploy mode not support : " + MODE); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh index f2c61f2193f1..4bd9905354e3 100755 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh @@ -43,6 +43,25 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then . "${CONF_DIR}/seatunnel-env.sh" fi +if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then + + directories=("connectors" "lib" "plugins") + + existing_dirs=() + + for dir in "${directories[@]}"; do + if [ -d "$dir" ]; then + existing_dirs+=("$dir") + fi + done + + if [ ${#existing_dirs[@]} -eq 0 ]; then + echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz" + else + tar -zcvf runtime.tar.gz "${existing_dirs[@]}" + fi +fi + if [ $# == 0 ] then args="-h" diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index c244f2ff3332..e9d0ba7df289 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.core.starter.Starter; import org.apache.seatunnel.core.starter.enums.EngineType; +import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; @@ -32,6 +33,7 @@ public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName(); public static final String SHELL_NAME = EngineType.FLINK13.getStarterShellName(); + public static final String RUNTIME_FILE = "runtime.tar.gz"; private final FlinkCommandArgs flinkCommandArgs; private final String appJar; @@ -61,6 +63,18 @@ public List buildCommands() { command.add("--target"); command.add(flinkCommandArgs.getMasterType().getMaster()); } + // set yarn application mode parameters + if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) { + command.add( + String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile())); + command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE)); + } + // set yarn application name + if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION + || flinkCommandArgs.getMasterType() == MasterType.YARN_PER_JOB + || flinkCommandArgs.getMasterType() == MasterType.YARN_SESSION) { + command.add(String.format("-Dyarn.application.name=%s", flinkCommandArgs.getJobName())); + } // set flink original parameters command.addAll(flinkCommandArgs.getOriginalParameters()); // set main class name @@ -86,6 +100,9 @@ public List buildCommands() { if (flinkCommandArgs.isDecrypt()) { command.add("--decrypt"); } + // set deploy mode + command.add("--deploy-mode"); + command.add(flinkCommandArgs.getDeployMode().getDeployMode()); // set extra system properties flinkCommandArgs.getVariables().stream() .filter(Objects::nonNull) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh index 137b8c043b18..5698a340dab0 100755 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh @@ -43,6 +43,25 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then . "${CONF_DIR}/seatunnel-env.sh" fi +if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then + + directories=("connectors" "lib" "plugins") + + existing_dirs=() + + for dir in "${directories[@]}"; do + if [ -d "$dir" ]; then + existing_dirs+=("$dir") + fi + done + + if [ ${#existing_dirs[@]} -eq 0 ]; then + echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz" + else + tar -zcvf runtime.tar.gz "${existing_dirs[@]}" + fi +fi + if [ $# == 0 ] then args="-h" diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index e74bbd402fc5..06cfd5f4495a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.core.starter.Starter; import org.apache.seatunnel.core.starter.enums.EngineType; +import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; @@ -32,6 +33,7 @@ public class FlinkStarter implements Starter { private static final String APP_NAME = SeaTunnelFlink.class.getName(); public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName(); public static final String SHELL_NAME = EngineType.FLINK15.getStarterShellName(); + public static final String RUNTIME_FILE = "runtime.tar.gz"; private final FlinkCommandArgs flinkCommandArgs; private final String appJar; @@ -61,6 +63,18 @@ public List buildCommands() { command.add("--target"); command.add(flinkCommandArgs.getMasterType().getMaster()); } + // set yarn application mode parameters + if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) { + command.add( + String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile())); + command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE)); + } + // set yarn application name + if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION + || flinkCommandArgs.getMasterType() == MasterType.YARN_PER_JOB + || flinkCommandArgs.getMasterType() == MasterType.YARN_SESSION) { + command.add(String.format("-Dyarn.application.name=%s", flinkCommandArgs.getJobName())); + } // set flink original parameters command.addAll(flinkCommandArgs.getOriginalParameters()); // set main class name @@ -86,6 +100,9 @@ public List buildCommands() { if (flinkCommandArgs.isDecrypt()) { command.add("--decrypt"); } + // set deploy mode + command.add("--deploy-mode"); + command.add(flinkCommandArgs.getDeployMode().getDeployMode()); // set extra system properties flinkCommandArgs.getVariables().stream() .filter(Objects::nonNull)