Skip to content

Commit

Permalink
feat(): simplified uploadfiles
Browse files Browse the repository at this point in the history
close #3919
  • Loading branch information
Skraye committed Jul 31, 2024
1 parent dae1f29 commit 11e7aff
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 98 deletions.
201 changes: 109 additions & 92 deletions core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kestra.plugin.core.namespace;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
Expand All @@ -11,7 +9,6 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.Namespace;
import io.kestra.core.utils.FileUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -21,14 +18,14 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.InvalidPathException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.kestra.core.utils.PathUtil.checkLeadingSlash;

Expand All @@ -42,29 +39,69 @@
@Plugin(
examples = {
@Example(
title = "Upload a custom Python script to the `dev` namespace and execute it.",
title = "Upload files output from a previous task",
full = true,
code = """
id: upload_inputfile
id: upload2
namespace: company.team
inputs:
- id: my_python_script
type: FILE
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://github.com/kestra-io/scripts/archive/refs/heads/main.zip
\s
- id: unzip
type: io.kestra.plugin.compress.ArchiveDecompress
from: "{{ outputs.download.uri }}"
algorithm: ZIP
- id: upload
type: io.kestra.plugin.core.namespace.UploadFiles
filesMap: "{{ outputs.unzip.files }}"
namespace: company.team"""
),
@Example(
title = "Upload a specific file and rename it.",
full = true,
code = """
id: upload-a-file
namespace: dwh
tasks:
- id: upload_and_rename
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://github.com/kestra-io/scripts/archive/refs/heads/main.zip
\s
- id: unzip
type: io.kestra.plugin.compress.ArchiveDecompress
from: "{{ outputs.download.uri }}"
algorithm: ZIP
- id: upload
type: io.kestra.plugin.core.namespace.UploadFiles
files:
/scripts/main.py: "{{ inputs.my_python_script }}"
namespace: dev
- id: python
type: io.kestra.plugin.scripts.python.Commands
namespace:
enabled: true
commands:
- python scripts/main.py"""
filesMap:
LICENCE: "{{ outputs.unzip.files['scripts-main/LICENSE'] }}"
namespace: dwh"""
),
@Example(
title = "Upload a folder",
full = true,
code = """
id: dbt_setup
namespace: dwh
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: git_clone
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: master
- id: upload
type: io.kestra.plugin.core.namespace.UploadFiles
files:
- "glob:**/dbt/**"
namespace: dwh"""
)
}
)
Expand All @@ -76,16 +113,22 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
@PluginProperty(dynamic = true)
private String namespace;

@NotNull
@Schema(
title = "A list of files.",
description = "This can be a list of strings, where each string can be either a regex glob pattern or a file path. Providing a list requires specifying a `destination` where files will be stored.\n" +
"This can also be a map where you can provide a specific destination path for a URI, which can be useful if you need to rename a file or move it to a different folder.\n" +
"Finally, this can also be a string if its a pebble expression that render to a list or a map, or a single URI (that will be handle like a list of one).",
anyOf = {List.class, Map.class, String.class}
title = "A list of regex that match files in the current directory.",
description = "This should be a list of regex matching **ant-style patterns**." +
"This one is intended to be used with the WorkingDirectory task"
)
@PluginProperty(dynamic = true)
private Object files;
private List<String> files;

@Schema(
title = "A map of URI.",
description = "This should be a map of URI, with the key being the filename that will be upload, and the key the URI." +
"This one is intended to be used with output files of other tasks",
anyOf = {List.class, String.class}
)
@PluginProperty(dynamic = true)
private Object filesMap;

@Schema(
title = "The destination folder.",
Expand All @@ -111,83 +154,57 @@ public UploadFiles.Output run(RunContext runContext) throws Exception {

final Namespace storageNamespace = runContext.storage().namespace(renderedNamespace);

if (files instanceof String filesString) {
// check if rendered filesString is a map, a list or a string
files = renderFilesString(runContext, filesString);
if (files == null && filesMap == null) {
throw new IllegalArgumentException("files or filesMap is required");
}

if (files instanceof List<?> filesList) {
filesList = runContext.render((List<String>) filesList);
if (files != null) {
this.uploadFiles(runContext, files, storageNamespace, renderedDestination);
}

final List<String> regexs = new ArrayList<>();
if (filesMap != null) {
Map<String, Object> readFilesMap = new HashMap<>();
if (filesMap instanceof String) {
String renderedFilesMap = runContext.render((String) filesMap);
readFilesMap = JacksonMapper.ofJson().readValue(renderedFilesMap, Map.class);
}
this.uploadFilesMap(runContext, readFilesMap, storageNamespace, renderedDestination);
}

for (Object file : filesList) {
Optional<URI> uri = FileUtils.getURI(file.toString());
// Immediately handle strings that are full URI
try {
return Output.builder().build();
}

if (uri.isPresent() && runContext.storage().isFileExist(uri.get())) {
Path targetFilePath = Path.of(renderedDestination, FileUtils.getFileName(uri.get()));
storageNamespace.putFile(targetFilePath, runContext.storage().getFile(uri.get()), conflict);
} else {
regexs.add(file.toString());
}
}
// If the string is not a valid URI, try to use it as a regex
catch (InvalidPathException | NullPointerException e) {
runContext.logger().debug("File {} is not a valid URI, using it as a regex", file);
regexs.add(file.toString());
}
}
private void uploadFiles(RunContext runContext, List<String> files, Namespace storageNamespace, String destination) throws IllegalVariableEvaluationException, IOException, URISyntaxException {
files = runContext.render(files);

// Check for files in the current WORKING_DIR that match the expressions
for (Path path : runContext.workingDir().findAllFilesMatching(regexs)) {
File file = path.toFile();
Path resolve = Paths.get("/").resolve(runContext.workingDir().path().relativize(file.toPath()));
for (Path path : runContext.workingDir().findAllFilesMatching(files)) {
File file = path.toFile();
Path resolve = Paths.get("/").resolve(runContext.workingDir().path().relativize(file.toPath()));

Path targetFilePath = Path.of(renderedDestination, resolve.toString());
storageNamespace.putFile(targetFilePath, new FileInputStream(file), conflict);
}
} else if (files instanceof Map map) {
// Using a Map for the `files` property, there must be only URI
Map<String, Object> renderedMap = runContext.render((Map<String, Object>) map);
for (Map.Entry<String, Object> entry : renderedMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (key instanceof String targetFilePath && value instanceof String stringSourceFileURI) {
URI sourceFileURI = URI.create(stringSourceFileURI);
if (runContext.storage().isFileExist(sourceFileURI)) {
storageNamespace.putFile(Path.of(targetFilePath), runContext.storage().getFile(sourceFileURI), conflict);
}
} else {
throw new IllegalArgumentException("files must be a List<String> or a Map<String, String>");
Path targetFilePath = Path.of(destination, resolve.toString());
storageNamespace.putFile(targetFilePath, new FileInputStream(file), conflict);
}
}

private void uploadFilesMap(RunContext runContext, Map<String, Object> filesMap, Namespace storageNamespace, String destination) throws IOException, URISyntaxException, IllegalVariableEvaluationException {
Map<String, Object> renderedMap = runContext.render(filesMap);
for (Map.Entry<String, Object> entry : renderedMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (key instanceof String targetFilePath && value instanceof String stringSourceFileURI) {
URI sourceFileURI = URI.create(stringSourceFileURI);
if (runContext.storage().isFileExist(sourceFileURI)) {
storageNamespace.putFile(Path.of(destination + targetFilePath), runContext.storage().getFile(sourceFileURI), conflict);
}
} else {
throw new IllegalArgumentException("filesMap must be a Map<String, String>");
}
} else {
throw new IllegalArgumentException("Files must be a List<String> or a Map<String, String>");

}

return Output.builder().build();
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
private final Map<String, URI> files;
}

private Object renderFilesString(RunContext runContext, String filesString) throws IllegalVariableEvaluationException, JsonProcessingException {
String rendered = runContext.render(filesString);
if (runContext.storage().isFileExist(URI.create(rendered))) {
return List.of(rendered);
} else {
try {
return JacksonMapper.ofJson().readValue(rendered, Map.class);
} catch (JsonMappingException e) {
return JacksonMapper.ofJson().readValue(rendered, List.class);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.devskiller.friendly_id.FriendlyId;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -51,7 +51,7 @@ void shouldThrowExceptionGivenAlreadyExistingFileWhenConflictError() throws Exce
UploadFiles uploadFile = UploadFiles.builder()
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.files(Map.of("/path/file.txt", fileStorage.toString()))
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace(namespace)
.conflict(Namespace.Conflicts.ERROR)
.destination("/folder")
Expand All @@ -73,7 +73,7 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictOverwrite() throws Except
UploadFiles uploadFile = UploadFiles.builder()
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.files(Map.of("/path/file.txt", fileStorage.toString()))
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace("{{ inputs.namespace }}")
.destination("/folder")
.build();
Expand All @@ -89,7 +89,7 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictOverwrite() throws Except

fileStorage = addToStorage("logback.xml");
uploadFile = uploadFile.toBuilder()
.files(Map.of("/path/file.txt", fileStorage.toString()))
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.build();

uploadFile.run(runContext);
Expand All @@ -111,7 +111,7 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictSkip() throws Exception {
UploadFiles uploadFile = UploadFiles.builder()
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.files(Map.of("/path/file.txt", fileStorage.toString()))
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.namespace(namespace)
.conflict(Namespace.Conflicts.SKIP)
.destination("/folder")
Expand All @@ -128,7 +128,7 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictSkip() throws Exception {

fileStorage = addToStorage("logback.xml");
uploadFile = uploadFile.toBuilder()
.files(Map.of("/path/file.txt", fileStorage.toString()))
.filesMap(Map.of("/path/file.txt", fileStorage.toString()))
.build();

uploadFile.run(runContext);
Expand All @@ -141,6 +141,29 @@ void shouldPutFileGivenAlreadyExistingFileWhenConflictSkip() throws Exception {
assertThat(previousFile.equals(newFile), is(true));
}

@Test
void shouldPutFileFromRegex() throws Exception {
String namespace = "io.kestra." + IdUtils.create();


UploadFiles uploadFile = UploadFiles.builder()
.id(UploadFiles.class.getSimpleName())
.type(UploadFiles.class.getName())
.files(List.of("glob:**application**"))
.namespace(namespace)
.conflict(Namespace.Conflicts.SKIP)
.destination("/folder/")
.build();

RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of());
runContext.workingDir().createFile("application-test.yml");
uploadFile.run(runContext);

Namespace namespaceStorage = runContext.storage().namespace(namespace);
List<NamespaceFile> namespaceFiles = namespaceStorage.all();
assertThat(namespaceFiles.size(), is(1));
}

private URI addToStorage(String fileToLoad) throws IOException, URISyntaxException {
File file = new File(Objects.requireNonNull(UploadFilesTest.class.getClassLoader().getResource(fileToLoad)).toURI());

Expand Down

0 comments on commit 11e7aff

Please sign in to comment.