Skip to content

Commit

Permalink
Merge branch 'dev' into f-project-level-preference
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Jul 19, 2023
2 parents 882c66d + c30cca9 commit 61b4dcf
Show file tree
Hide file tree
Showing 38 changed files with 1,081 additions and 516 deletions.
2 changes: 2 additions & 0 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ appId.collect=log

# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
shell.env_source_list=
# The interceptor type of Shell task, e.g. bash, sh, cmd
shell.interceptor.type=bash

# Whether to enable remote logging
remote.logging.enable=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,20 @@
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -114,111 +112,61 @@ public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
}
}

public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
this.logBuffer = logBuffer;
}

/**
* build process
*
* @param commandFile command file
* @throws IOException IO Exception
*/
private void buildProcess(String commandFile) throws IOException {
// setting up user to run commands
List<String> command = new LinkedList<>();

// init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskRequest.getExecutePath()));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);

// if sudo.enable=true,setting up user to run commands
// todo: Create a ShellExecuteClass to generate the shell and execute shell commands
if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
if (SystemUtils.IS_OS_LINUX
&& PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
generateCgroupCommand(command);
} else {
command.add("sudo");
command.add("-u");
command.add(taskRequest.getTenantCode());
command.add("-E");
}
}
command.add(commandInterpreter());
command.add(commandFile);

// setting commands
processBuilder.command(command);
process = processBuilder.start();

printCommand(command);
}

/**
* generate systemd command.
* eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryLimit=200M --uid=root
* @param command command
*/
private void generateCgroupCommand(List<String> command) {
Integer cpuQuota = taskRequest.getCpuQuota();
Integer memoryMax = taskRequest.getMemoryMax();

command.add("sudo");
command.add("systemd-run");
command.add("-q");
command.add("--scope");

if (cpuQuota == -1) {
command.add("-p");
command.add("CPUQuota=");
} else {
command.add("-p");
command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
}

// use `man systemd.resource-control` to find available parameter
if (memoryMax == -1) {
command.add("-p");
command.add(String.format("MemoryLimit=%s", "infinity"));
} else {
command.add("-p");
command.add(String.format("MemoryLimit=%sM", taskRequest.getMemoryMax()));
}

command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
}

public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception {
// todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build
// the IShellActuator
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
logger.warn(
"Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
iShellInterceptorBuilder = iShellInterceptorBuilder
.shellDirectory(taskRequest.getExecutePath())
.shellName(taskRequest.getTaskAppId());
// Set system env
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
}
// Set custom env
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
}
// Set k8s config (This is only work in Linux)
if (taskRequest.getK8sTaskExecutionContext() != null) {
iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
}
// Set sudo (This is only work in Linux)
iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
// Set tenant (This is only work in Linux)
if (TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER);
} else {
iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
}
// Set CPU Quota (This is only work in Linux)
if (taskRequest.getCpuQuota() != null) {
iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
}
// Set memory Quota (This is only work in Linux)
if (taskRequest.getMemoryMax() != null) {
iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
}

String commandFilePath = buildCommandFilePath();

// create command file if not exists
createCommandFileIfNotExists(execCommand, commandFilePath);

// build process
buildProcess(commandFilePath);
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
process = iShellInterceptor.execute();

// parse process output
parseProcessOutput(process);
parseProcessOutput(this.process);

// collect pod log
collectPodLogIfNeeded();

int processId = getProcessId(process);
int processId = getProcessId(this.process);

result.setProcessId(processId);

Expand All @@ -243,7 +191,7 @@ public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Ex
}

// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);

TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
Expand Down Expand Up @@ -272,15 +220,15 @@ public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Ex
if (status && kubernetesStatus.isSuccess()) {

// SHELL task state
result.setExitStatusCode(process.exitValue());
result.setExitStatusCode(this.process.exitValue());

} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = process.exitValue();
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
Expand Down Expand Up @@ -446,9 +394,4 @@ private int getProcessId(Process process) {
return processId;
}

protected abstract String buildCommandFilePath();

protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;

protected abstract String commandInterpreter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,17 @@

import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;

import java.util.List;
import java.util.regex.Pattern;
import java.util.Map;

/**
* abstract yarn task
*/
public abstract class AbstractYarnTask extends AbstractRemoteTask {

/**
* process task
*/
private ShellCommandExecutor shellCommandExecutor;

/**
* rules for extracting application ID
*/
protected static final Pattern YARN_APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);

/**
* Abstract Yarn Task
*
* @param taskRequest taskRequest
*/
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
Expand All @@ -58,8 +44,12 @@ public AbstractYarnTask(TaskExecutionContext taskRequest) {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(getProperties())
// todo: do we need to move the replace to subclass?
.appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(response.getExitStatusCode());
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
Expand Down Expand Up @@ -115,10 +105,12 @@ public List<String> getApplicationIds() throws TaskException {
}

/**
* create command
*
* @return String
* Get the script used to bootstrap the task
*/
protected abstract String buildCommand();
protected abstract String getScript();

/**
* Get the properties of the task used to replace the placeholders in the script.
*/
protected abstract Map<String, String> getProperties();
}
Loading

0 comments on commit 61b4dcf

Please sign in to comment.