-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(core): add NamespaceFiles props on WorkingDirectory (#2467)
close #2405 Co-authored-by: Anna Geller <anna.m.geller@gmail.com>
- Loading branch information
1 parent
c918444
commit 3874d1d
Showing
9 changed files
with
429 additions
and
11 deletions.
There are no files selected for viewing
40 changes: 40 additions & 0 deletions
40
core/src/main/java/io/kestra/core/models/tasks/NamespaceFiles.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package io.kestra.core.models.tasks; | ||
|
||
import io.kestra.core.models.annotations.PluginProperty; | ||
import io.micronaut.core.annotation.Introspected; | ||
import io.swagger.v3.oas.annotations.media.Schema; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
|
||
import java.util.List; | ||
import javax.validation.Valid; | ||
|
||
@Builder | ||
@Getter | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
@Introspected | ||
public class NamespaceFiles { | ||
@Schema( | ||
title = "Whether to enable namespace files to be loaded into the working directory" | ||
) | ||
@PluginProperty | ||
@Builder.Default | ||
private Boolean enabled = true; | ||
|
||
@Schema( | ||
title = "A list of filters to include only matching glob patterns" | ||
) | ||
@PluginProperty | ||
@Valid | ||
private List<String> include; | ||
|
||
@Schema( | ||
title = "A list of filters to exclude matching glob patterns" | ||
) | ||
@PluginProperty | ||
@Valid | ||
private List<String> exclude; | ||
} |
13 changes: 13 additions & 0 deletions
13
core/src/main/java/io/kestra/core/models/tasks/NamespaceFilesInterface.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package io.kestra.core.models.tasks; | ||
|
||
import io.kestra.core.models.annotations.PluginProperty; | ||
import io.swagger.v3.oas.annotations.media.Schema; | ||
|
||
public interface NamespaceFilesInterface { | ||
@Schema( | ||
title = "Inject namespace files", | ||
description = "Inject namespace files to this task. When enabled, it will, by default, load all namespace files into the working directory. However, you can use the `include` or `exclude` properties to limit which namespace files will be injected." | ||
) | ||
@PluginProperty | ||
NamespaceFiles getNamespaceFiles(); | ||
} |
121 changes: 121 additions & 0 deletions
121
core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package io.kestra.core.runners; | ||
|
||
import io.kestra.core.models.tasks.NamespaceFiles; | ||
import io.kestra.core.storages.FileAttributes; | ||
import io.kestra.core.storages.StorageInterface; | ||
import io.micronaut.core.annotation.Nullable; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.URI; | ||
import java.nio.file.FileSystems; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import static io.kestra.core.utils.Rethrow.*; | ||
|
||
@Singleton | ||
@Slf4j | ||
public class NamespaceFilesService { | ||
@Inject | ||
private StorageInterface storageInterface; | ||
|
||
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception { | ||
if (!namespaceFiles.getEnabled()) { | ||
return Collections.emptyList(); | ||
} | ||
|
||
List<URI> list = new ArrayList<>(); | ||
list.addAll(recursiveList(tenantId, namespace, null)); | ||
|
||
|
||
list = list | ||
.stream() | ||
.filter(throwPredicate(f -> { | ||
var file = f.getPath(); | ||
|
||
if (namespaceFiles.getExclude() != null) { | ||
boolean b = match(runContext.render(namespaceFiles.getExclude()), file); | ||
|
||
if (b) { | ||
return false; | ||
} | ||
} | ||
|
||
if (namespaceFiles.getInclude() != null) { | ||
boolean b = match(namespaceFiles.getInclude(), file); | ||
|
||
if (!b) { | ||
return false; | ||
} | ||
} | ||
|
||
return true; | ||
})) | ||
.collect(Collectors.toList()); | ||
|
||
copy(tenantId, namespace, basePath, list); | ||
|
||
return list; | ||
} | ||
|
||
private URI uri(String namespace, @Nullable URI path) { | ||
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(path) | ||
.map(URI::getPath) | ||
.orElse("") | ||
); | ||
} | ||
|
||
private List<URI> recursiveList(String tenantId, String namespace, @Nullable URI path) throws IOException { | ||
URI uri = uri(namespace, path); | ||
|
||
List<URI> result = new ArrayList<>(); | ||
List<FileAttributes> list = storageInterface.list(tenantId, uri); | ||
|
||
for (var file: list) { | ||
URI current = URI.create((path != null ? path.getPath() : "") + "/" + file.getFileName()); | ||
|
||
if (file.getType() == FileAttributes.FileType.Directory) { | ||
result.addAll(this.recursiveList(tenantId, namespace, current)); | ||
} else { | ||
result.add(current); | ||
} | ||
} | ||
|
||
return result; | ||
} | ||
|
||
private static boolean match(List<String> patterns, String file) { | ||
return patterns | ||
.stream() | ||
.anyMatch(s -> FileSystems | ||
.getDefault() | ||
.getPathMatcher("glob:" + s) | ||
.matches(Paths.get(file)) | ||
); | ||
} | ||
|
||
private void copy(String tenantId, String namespace, Path basePath, List<URI> files) throws IOException { | ||
files | ||
.forEach(throwConsumer(f -> { | ||
InputStream inputStream = storageInterface.get(tenantId, uri(namespace, f)); | ||
Path destination = Paths.get(basePath.toString(), f.getPath()); | ||
|
||
if (!destination.getParent().toFile().exists()) { | ||
//noinspection ResultOfMethodCallIgnored | ||
destination.getParent().toFile().mkdirs(); | ||
} | ||
|
||
Files.copy(inputStream, destination); | ||
})); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
core/src/test/java/io/kestra/core/runners/NamespaceFilesServiceTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package io.kestra.core.runners; | ||
|
||
import io.kestra.core.models.tasks.NamespaceFiles; | ||
import io.kestra.core.storages.StorageInterface; | ||
import io.kestra.core.utils.IdUtils; | ||
import io.micronaut.core.annotation.Nullable; | ||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest; | ||
import jakarta.inject.Inject; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.List; | ||
|
||
import static io.kestra.core.utils.Rethrow.throwFunction; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
@MicronautTest | ||
class NamespaceFilesServiceTest { | ||
@Inject | ||
StorageInterface storageInterface; | ||
|
||
@Inject | ||
NamespaceFilesService namespaceFilesService; | ||
|
||
@Inject | ||
RunContextFactory runContextFactory; | ||
|
||
@Test | ||
public void noFilter() throws Exception { | ||
Path basePath = Files.createTempDirectory("unit"); | ||
String namespace = "io.kestra." + IdUtils.create(); | ||
|
||
put(null, namespace, "/a/b/c/1.sql", "1"); | ||
put(null, namespace, "/a/1.sql", "2"); | ||
put(null, namespace, "/b/c/d/1.sql", "3"); | ||
|
||
List<URI> injected = namespaceFilesService.inject( | ||
runContextFactory.of(), | ||
null, | ||
namespace, | ||
basePath, | ||
NamespaceFiles | ||
.builder() | ||
.enabled(true) | ||
.build() | ||
); | ||
|
||
assertThat(injected.size(), is(3)); | ||
|
||
List<Path> tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList(); | ||
assertThat(tempDir.size(), is(3)); | ||
} | ||
|
||
@Test | ||
public void filter() throws Exception { | ||
Path basePath = Files.createTempDirectory("unit"); | ||
String namespace = "io.kestra." + IdUtils.create(); | ||
|
||
put(null, namespace, "/a/b/c/1.sql", "1"); | ||
put(null, namespace, "/a/3.sql", "2"); | ||
put(null, namespace, "/b/c/d/1.sql", "3"); | ||
|
||
List<URI> injected = namespaceFilesService.inject( | ||
runContextFactory.of(), | ||
null, | ||
namespace, | ||
basePath, | ||
NamespaceFiles.builder() | ||
.include(List.of("/a/**")) | ||
.exclude(List.of("**/3.sql")) | ||
.build() | ||
); | ||
|
||
assertThat(injected.size(), is(1)); | ||
assertThat(injected.get(0).getPath(), containsString("c/1.sql")); | ||
List<Path> tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList(); | ||
assertThat(tempDir.size(), is(1)); | ||
assertThat(tempDir.get(0).toString(), is(Paths.get(basePath.toString(), "/a/b/c/1.sql").toString())); | ||
} | ||
|
||
@Test | ||
public void tenant() throws Exception { | ||
String namespace = "io.kestra." + IdUtils.create(); | ||
|
||
put("tenant1", namespace, "/a/b/c/1.sql", "1"); | ||
put("tenant2", namespace, "/a/b/c/1.sql", "2"); | ||
|
||
RunContext runContext = runContextFactory.of(); | ||
List<URI> injected = namespaceFilesService.inject( | ||
runContextFactory.of(), | ||
"tenant1", | ||
namespace, | ||
runContext.tempDir(), | ||
NamespaceFiles | ||
.builder() | ||
.enabled(true) | ||
.build() | ||
); | ||
assertThat(injected.size(), is(1)); | ||
|
||
String content = Files.walk(runContext.tempDir()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow(); | ||
assertThat(content, is("1")); | ||
|
||
runContext = runContextFactory.of(); | ||
injected = namespaceFilesService.inject( | ||
runContextFactory.of(), | ||
"tenant2", | ||
namespace, | ||
runContext.tempDir(), | ||
NamespaceFiles | ||
.builder() | ||
.enabled(true) | ||
.build() | ||
); | ||
assertThat(injected.size(), is(1)); | ||
|
||
content = Files.walk(runContext.tempDir()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow(); | ||
assertThat(content, is("2")); | ||
} | ||
|
||
private void put(@Nullable String tenantId, String namespace, String path, String content) throws IOException { | ||
storageInterface.put( | ||
tenantId, | ||
URI.create(storageInterface.namespaceFilePrefix(namespace) + path), | ||
new ByteArrayInputStream(content.getBytes()) | ||
); | ||
} | ||
} |
Oops, something went wrong.