Skip to content

Commit

Permalink
[Hotfix][Core][Flink] SeaTunnel flink engine support application mode…
Browse files Browse the repository at this point in the history
… on yarn
  • Loading branch information
TyrantLucifer committed Sep 29, 2024
1 parent fb89033 commit 85c66f8
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

public class Common {

private static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz";

private Common() {
throw new IllegalStateException("Utility class");
}
Expand Down Expand Up @@ -113,8 +115,10 @@ public static Path appRootDir() {
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
} else if (DeployMode.CLUSTER == MODE || DeployMode.RUN_APPLICATION == MODE) {
} else if (DeployMode.CLUSTER == MODE) {
return Paths.get("");
} else if (DeployMode.RUN_APPLICATION == MODE) {
return Paths.get(FLINK_YARN_APPLICATION_PATH);
} else {
throw new IllegalStateException("deploy mode not support : " + MODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
. "${CONF_DIR}/seatunnel-env.sh"
fi

if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then

directories=("connectors" "lib" "plugins")

existing_dirs=()

for dir in "${directories[@]}"; do
if [ -d "$dir" ]; then
existing_dirs+=("$dir")
fi
done

if [ ${#existing_dirs[@]} -eq 0 ]; then
echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz"
else
tar -zcvf runtime.tar.gz "${existing_dirs[@]}"
fi
fi

if [ $# == 0 ]
then
args="-h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

Expand All @@ -32,6 +33,7 @@ public class FlinkStarter implements Starter {
private static final String APP_NAME = SeaTunnelFlink.class.getName();
public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName();
public static final String SHELL_NAME = EngineType.FLINK13.getStarterShellName();
public static final String RUNTIME_FILE = "runtime.tar.gz";
private final FlinkCommandArgs flinkCommandArgs;
private final String appJar;

Expand Down Expand Up @@ -61,6 +63,18 @@ public List<String> buildCommands() {
command.add("--target");
command.add(flinkCommandArgs.getMasterType().getMaster());
}
// set yarn application mode parameters
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) {
command.add(
String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile()));
command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE));
}
// set yarn application name
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION
|| flinkCommandArgs.getMasterType() == MasterType.YARN_PER_JOB
|| flinkCommandArgs.getMasterType() == MasterType.YARN_SESSION) {
command.add(String.format("-Dyarn.application.name=%s", flinkCommandArgs.getJobName()));
}
// set flink original parameters
command.addAll(flinkCommandArgs.getOriginalParameters());
// set main class name
Expand All @@ -86,6 +100,9 @@ public List<String> buildCommands() {
if (flinkCommandArgs.isDecrypt()) {
command.add("--decrypt");
}
// set deploy mode
command.add("--deploy-mode");
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set extra system properties
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
. "${CONF_DIR}/seatunnel-env.sh"
fi

if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then

directories=("connectors" "lib" "plugins")

existing_dirs=()

for dir in "${directories[@]}"; do
if [ -d "$dir" ]; then
existing_dirs+=("$dir")
fi
done

if [ ${#existing_dirs[@]} -eq 0 ]; then
echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz"
else
tar -zcvf runtime.tar.gz "${existing_dirs[@]}"
fi
fi

if [ $# == 0 ]
then
args="-h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;

Expand All @@ -32,6 +33,7 @@ public class FlinkStarter implements Starter {
private static final String APP_NAME = SeaTunnelFlink.class.getName();
public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName();
public static final String SHELL_NAME = EngineType.FLINK15.getStarterShellName();
public static final String RUNTIME_FILE = "runtime.tar.gz";
private final FlinkCommandArgs flinkCommandArgs;
private final String appJar;

Expand Down Expand Up @@ -61,6 +63,18 @@ public List<String> buildCommands() {
command.add("--target");
command.add(flinkCommandArgs.getMasterType().getMaster());
}
// set yarn application mode parameters
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) {
command.add(
String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile()));
command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE));
}
// set yarn application name
if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION
|| flinkCommandArgs.getMasterType() == MasterType.YARN_PER_JOB
|| flinkCommandArgs.getMasterType() == MasterType.YARN_SESSION) {
command.add(String.format("-Dyarn.application.name=%s", flinkCommandArgs.getJobName()));
}
// set flink original parameters
command.addAll(flinkCommandArgs.getOriginalParameters());
// set main class name
Expand All @@ -86,6 +100,9 @@ public List<String> buildCommands() {
if (flinkCommandArgs.isDecrypt()) {
command.add("--decrypt");
}
// set deploy mode
command.add("--deploy-mode");
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set extra system properties
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
Expand Down

0 comments on commit 85c66f8

Please sign in to comment.