Skip to content

Commit

Permalink
feat(tasks): add an outputDirs on Bash files
Browse files Browse the repository at this point in the history
close #703
  • Loading branch information
tchiotludo committed Aug 23, 2022
1 parent 061532e commit 80122f5
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 12 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ public URI putTempFile(File file) throws IOException {
return this.putTempFile(file, this.storageOutputPrefix.toString(), (String) null);
}

/**
* Put the temporary file on storage and delete it after.
*
* @param file the temporary file to upload to storage
* @param name overwrite file name
* @return the {@code StorageObject} created
* @throws IOException If the temporary file can't be read
*/
public URI putTempFile(File file, String name) throws IOException {
return this.putTempFile(file, this.storageOutputPrefix.toString(), name);
}

public URI putTempFile(File file, String executionId, AbstractTrigger trigger) throws IOException {
return this.putTempFile(
file,
Expand Down
58 changes: 51 additions & 7 deletions core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwBiConsumer;
import static io.kestra.core.utils.Rethrow.*;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -104,6 +103,17 @@ abstract public class AbstractBash extends Task {
@PluginProperty(dynamic = false)
protected List<String> outputFiles;

@Schema(
title = "Output dirs list that will be uploaded to internal storage",
description = "List of key that will generate temporary directories.\n" +
"On the command, just can use with special variable named `outputDirs.key`.\n" +
"If you add a files with `[\"myDir\"]`, you can use the special vars `echo 1 >> {[ outputDirs.myDir }}/file1.txt` " +
"and `echo 2 >> {[ outputDirs.myDir }}/file2.txt` and both files will be uploaded to internal storage." +
" Then you can used them on others tasks using `{{ outputs.taskId.files['myDir/file1.txt'] }}`"
)
@PluginProperty(dynamic = false)
protected List<String> outputDirs;

@Schema(
title = "Input files are extra files supplied by user that make it simpler organize code.",
description = "Describe a files map that will be written and usable in execution context. In python execution " +
Expand Down Expand Up @@ -195,6 +205,19 @@ protected ScriptOutput run(RunContext runContext, Supplier<String> supplier) thr
additionalVars
);

List<String> allOutputDirs = new ArrayList<>();

if (this.outputDirs != null && this.outputDirs.size() > 0) {
allOutputDirs.addAll(this.outputDirs);
}

Map<String, String> outputDirs = BashService.createOutputFiles(
workingDirectory,
allOutputDirs,
additionalVars,
true
);

String commandAsString = supplier.get();

// run
Expand All @@ -210,8 +233,29 @@ protected ScriptOutput run(RunContext runContext, Supplier<String> supplier) thr
// upload output files
Map<String, URI> uploaded = new HashMap<>();

outputFiles.
forEach(throwBiConsumer((k, v) -> uploaded.put(k, runContext.putTempFile(new File(runContext.render(v, additionalVars))))));
// outputFiles
outputFiles
.forEach(throwBiConsumer((k, v) -> uploaded.put(k, runContext.putTempFile(new File(runContext.render(v, additionalVars))))));

// outputDirs
outputDirs
.forEach(throwBiConsumer((k, v) -> {
try (Stream<Path> walk = Files.walk(new File(runContext.render(v, additionalVars)).toPath())) {
walk
.filter(Files::isRegularFile)
.forEach(throwConsumer(path -> {
String filename = Path.of(
k,
Path.of(runContext.render(v, additionalVars)).relativize(path).toString()
).toString();

uploaded.put(
filename,
runContext.putTempFile(path.toFile(), filename)
);
}));
}
}));

Map<String, Object> outputsVars = new HashMap<>();
outputsVars.putAll(runResult.getStdOut().getOutputs());
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/scripts/BashService.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public static Map<String, String> createOutputFiles(
Path tempDirectory,
List<String> outputFiles,
Map<String, Object> additionalVars
) throws IOException {
return BashService.createOutputFiles(tempDirectory, outputFiles, additionalVars, false);
}

public static Map<String, String> createOutputFiles(
Path tempDirectory,
List<String> outputFiles,
Map<String, Object> additionalVars,
Boolean isDir
) throws IOException {
List<String> outputs = new ArrayList<>();

Expand All @@ -63,14 +72,21 @@ public static Map<String, String> createOutputFiles(
outputs
.forEach(throwConsumer(s -> {
BashService.validFilename(s);
File tempFile;

File tempFile = File.createTempFile(s + "_", null, tempDirectory.toFile());
if (isDir) {
tempFile = Files.createTempDirectory(tempDirectory, s + "_").toFile();
} else {
tempFile = File.createTempFile(s + "_", null, tempDirectory.toFile());
}

result.put(s, "{{workingDir}}/" + tempFile.getName());
}));

additionalVars.put("temp", result);
additionalVars.put("outputFiles", result);
if (!isDir) {
additionalVars.put("temp", result);
}
additionalVars.put(isDir ? "outputDirs": "outputFiles", result);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void bash() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, Bash.class);

assertThat(generate, is(not(nullValue())));
assertThat(((Map<String, Map<String, Object>>) generate.get("properties")).size(), is(12));
assertThat(((Map<String, Map<String, Object>>) generate.get("properties")).size(), is(13));

generate = jsonSchemaGenerator.outputs(Task.class, Bash.class);

Expand Down
30 changes: 30 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/AbstractBashTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,36 @@ void files() throws Exception {
);
}

@Test
void outputDirs() throws Exception {
Bash bash = configure(Bash.builder()
.outputDirs(Arrays.asList("xml", "csv"))
.inputFiles(ImmutableMap.of("files/in/in.txt", "I'm here"))
.commands(new String[]{
"echo 1 >> {{ outputDirs.xml }}/file1.txt",
"mkdir -p {{ outputDirs.xml }}/sub/sub2",
"echo 2 >> {{ outputDirs.xml }}/sub/sub2/file2.txt",
"echo 3 >> {{ outputDirs.csv }}/file1.txt",
})
).build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, bash, ImmutableMap.of());
ScriptOutput run = bash.run(runContext);

assertThat(run.getExitCode(), is(0));
assertThat(run.getStdErrLineCount(), is(0));
assertThat(run.getStdOutLineCount(), is(0));

InputStream get = storageInterface.get(run.getOutputFiles().get("xml/file1.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("1\n"));

get = storageInterface.get(run.getOutputFiles().get("xml/sub/sub2/file2.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("2\n"));

get = storageInterface.get(run.getOutputFiles().get("csv/file1.txt"));
assertThat(CharStreams.toString(new InputStreamReader(get)), is("3\n"));
}

@Test
@DisabledIfEnvironmentVariable(named = "GITHUB_WORKFLOW", matches = ".*")
void failed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void bash() throws URISyntaxException {
assertThat(doc.getMarkdown(), containsString("io.kestra.core.tasks.scripts.Bash"));
assertThat(doc.getMarkdown(), containsString("Exit if any non true return value"));
assertThat(doc.getMarkdown(), containsString("The standard output line count"));
assertThat(((Map<String, Object>) doc.getSchema().getProperties().get("properties")).size(), is(12));
assertThat(((Map<String, Object>) doc.getSchema().getProperties().get("properties")).size(), is(13));
assertThat(((Map<String, Object>) doc.getSchema().getOutputs().get("properties")).size(), is(6));
});
}
Expand Down

0 comments on commit 80122f5

Please sign in to comment.