Skip to content

Commit

Permalink
feat(task-runners): modify task runner implementation so that it has …
Browse files Browse the repository at this point in the history
…every files from working directory
  • Loading branch information
brian-mulier-p committed Jul 10, 2024
1 parent 277304f commit f35c979
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -47,7 +46,6 @@ private ScriptService() {
public static String replaceInternalStorage(
RunContext runContext,
@Nullable String command,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException {
if (command == null) {
Expand All @@ -59,41 +57,37 @@ public static String replaceInternalStorage(
.replaceAll(throwFunction(matchResult -> {
String localFile = saveOnLocalStorage(runContext, matchResult.group()).replace("\\", "/");

internalStorageToLocalFileConsumer.accept(matchResult.group(), localFile);

if (!replaceWithRelativePath) {
return localFile;
}

return localFile.startsWith("/") ? localFile.substring(1) : localFile;
return runContext.workingDir().path().relativize(Path.of(localFile)).toString();
}));
}

public static String replaceInternalStorage(
RunContext runContext,
Map<String, Object> additionalVars,
String command,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
if (command == null) {
return null;
}

return ScriptService.replaceInternalStorage(runContext, additionalVars, List.of(command), internalStorageToLocalFileConsumer, replaceWithRelativePath).getFirst();
return ScriptService.replaceInternalStorage(runContext, additionalVars, List.of(command), replaceWithRelativePath).getFirst();
}

public static List<String> replaceInternalStorage(
RunContext runContext,
Map<String, Object> additionalVars,
List<String> commands,
BiConsumer<String, String> internalStorageToLocalFileConsumer,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return commands
.stream()
.map(throwFunction(c -> runContext.render(c, additionalVars)))
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, internalStorageToLocalFileConsumer, replaceWithRelativePath)))
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath)))
.toList();

}
Expand All @@ -102,8 +96,7 @@ public static List<String> replaceInternalStorage(
RunContext runContext,
List<String> commands
) throws IOException, IllegalVariableEvaluationException {
return ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), commands, (ignored, file) -> {
}, false);
return ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), commands, false);
}

private static String saveOnLocalStorage(RunContext runContext, String uri) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.kestra.core.models.tasks.runners;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;


/**
Expand Down Expand Up @@ -33,4 +38,23 @@ default boolean outputDirectoryEnabled() {
Duration getTimeout();

TargetOS getTargetOS();

default List<Path> relativeWorkingDirectoryFilesPaths() throws IOException {
Path workingDirectory = this.getWorkingDirectory();
if (workingDirectory == null) {
return Collections.emptyList();
}

try (Stream<Path> walk = Files.walk(workingDirectory)) {
Stream<Path> filtered = walk.filter(Files::isRegularFile);
Path outputDirectory = this.getOutputDirectory();
if (outputDirectory != null) {
filtered = filtered.filter(Predicate.not(path -> path.startsWith(outputDirectory)));
}

return filtered
.map(workingDirectory::relativize)
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class TaskRunner implements Plugin, WorkerJobLifecycle {
* For remote task runner (like Kubernetes or in a cloud provider), <code>filesToUpload</code> must be used to upload input and namespace files to the runner,
* and <code>filesToDownload</code> must be used to download output files from the runner.
*/
public abstract RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception;
public abstract RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception;

public Map<String, Object> additionalVars(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
if (this.additionalVars == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class Process extends TaskRunner {
public static final Process INSTANCE = Process.builder().type(Process.class.getName()).build();

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,36 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@KestraTest
class ScriptServiceTest {
public static final Pattern COMMAND_PATTERN_CAPTURE_LOCAL_PATH = Pattern.compile("my command with an internal storage file: (.*)");
@Inject private RunContextFactory runContextFactory;

@Test
void replaceInternalStorage() throws IOException {
var runContext = runContextFactory.of();
var command = ScriptService.replaceInternalStorage(runContext, null, (ignored, file) -> {}, false);
var command = ScriptService.replaceInternalStorage(runContext, null, false);
assertThat(command, is(""));

command = ScriptService.replaceInternalStorage(runContext, "my command", (ignored, file) -> {}, false);
command = ScriptService.replaceInternalStorage(runContext, "my command", false);
assertThat(command, is("my command"));

Path path = Path.of("/tmp/unittest/file.txt");
Expand All @@ -45,16 +44,24 @@ void replaceInternalStorage() throws IOException {
}

String internalStorageUri = "kestra://some/file.txt";
AtomicReference<String> localFile = new AtomicReference<>();
File localFile = null;
try {
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, (ignored, file) -> localFile.set(file), false);
assertThat(command, is("my command with an internal storage file: " + localFile.get()));
assertThat(Path.of(localFile.get()).toFile().exists(), is(true));

command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, (ignored, file) -> localFile.set(file), true);
assertThat(command, is("my command with an internal storage file: " + localFile.get().substring(1)));
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, false);

Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
assertThat(matcher.matches(), is(true));
Path absoluteLocalFilePath = Path.of(matcher.group(1));
localFile = absoluteLocalFilePath.toFile();
assertThat(localFile.exists(), is(true));

command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, true);
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
assertThat(matcher.matches(), is(true));
String relativePath = matcher.group(1);
assertThat(relativePath, not(startsWith("/")));
assertThat(runContext.workingDir().resolve(Path.of(relativePath)).toFile().exists(), is(true));
} finally {
Path.of(localFile.get()).toFile().delete();
localFile.delete();
path.toFile().delete();
}
}
Expand All @@ -68,8 +75,9 @@ void uploadInputFiles() throws IOException {
Files.createFile(path);
}

Map<String, String> localFileByInternalStorage = new HashMap<>();
List<File> filesToDelete = new ArrayList<>();
String internalStorageUri = "kestra://some/file.txt";

try {
String wdir = "/my/wd";
var commands = ScriptService.replaceInternalStorage(
Expand All @@ -79,20 +87,29 @@ void uploadInputFiles() throws IOException {
"my command with an internal storage file: " + internalStorageUri,
"my command with some additional var usage: {{ workingDir }}"
),
localFileByInternalStorage::put,
false
);
assertThat(commands, not(empty()));
assertThat(commands.getFirst(), is("my command with an internal storage file: " + localFileByInternalStorage.get(internalStorageUri)));
assertThat(Path.of(localFileByInternalStorage.get(internalStorageUri)).toFile().exists(), is(true));

assertThat(commands.getFirst(), not(is("my command with an internal storage file: " + internalStorageUri)));
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
assertThat(matcher.matches(), is(true));
File file = Path.of(matcher.group(1)).toFile();
assertThat(file.exists(), is(true));
filesToDelete.add(file);

assertThat(commands.get(1), is("my command with some additional var usage: " + wdir));

commands = ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), List.of("my command with an internal storage file: " + internalStorageUri), localFileByInternalStorage::put, true);
assertThat(commands.getFirst(), is("my command with an internal storage file: " + localFileByInternalStorage.get(internalStorageUri).substring(1)));
commands = ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), List.of("my command with an internal storage file: " + internalStorageUri), true);
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
assertThat(matcher.matches(), is(true));
file = runContext.workingDir().resolve(Path.of(matcher.group(1))).toFile();
assertThat(file.exists(), is(true));
filesToDelete.add(file);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException(e);
} finally {
localFileByInternalStorage.forEach((k, v) -> Path.of(v).toFile().delete());
filesToDelete.forEach(File::delete);
path.toFile().delete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public TaskRunnerAdditional(boolean overrideEnvValues) {
}

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) {
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public CommandsWrapper addEnv(Map<String, String> envs) {
}

public ScriptOutput run() throws Exception {
List<String> filesToUpload = new ArrayList<>();
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {

List<NamespaceFile> matchedNamespaceFiles = runContext.storage()
Expand All @@ -143,22 +142,19 @@ public ScriptOutput run() throws Exception {
InputStream content = runContext.storage().getFile(namespaceFile.uri());
runContext.workingDir().createFile(namespaceFile.path().toString(), content);
}));

matchedNamespaceFiles.forEach(file -> filesToUpload.add(file.path().toString()));
}

TaskRunner realTaskRunner = this.getTaskRunner();
if (this.inputFiles != null) {
Map<String, String> finalInputFiles = FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
filesToUpload.addAll(finalInputFiles.keySet());
FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
}

RunContextInitializer initializer = ((DefaultRunContext) runContext).getApplicationContext().getBean(RunContextInitializer.class);

RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
this.commands = this.render(runContext, commands, filesToUpload);
this.commands = this.render(runContext, commands);

RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, filesToUpload, this.outputFiles);
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, this.outputFiles);

Map<String, URI> outputFiles = new HashMap<>();
if (this.outputDirectoryEnabled()) {
Expand Down Expand Up @@ -221,18 +217,16 @@ public String render(RunContext runContext, String command, List<String> interna
this.runContext,
taskRunner.additionalVars(runContext, this),
command,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}

public List<String> render(RunContext runContext, List<String> commands, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
public List<String> render(RunContext runContext, List<String> commands) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
commands,
(ignored, localFilePath) -> internalStorageLocalFiles.add(localFilePath),
taskRunner instanceof RemoteRunnerInterface
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static Docker from(DockerOptions dockerOptions) {


@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
if (taskCommands.getContainerImage() == null && this.image == null) {
throw new IllegalArgumentException("This task runner needs the `containerImage` property to be set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public String getType() {
RunnerResult run = Docker.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);
Await.until(() -> run.getLogConsumer().getStdOutCount() == 2, null, Duration.ofSeconds(5));
Expand Down Expand Up @@ -89,7 +88,6 @@ public String getType() {
RunnerResult run = Docker.from(DockerOptions.builder().image("alpine").build()).run(
runContext,
taskCommands,
Collections.emptyList(),
Collections.emptyList()
);

Expand Down
Loading

0 comments on commit f35c979

Please sign in to comment.