Skip to content

Commit

Permalink
feat(core): add NamespaceFiles props on WorkingDirectory
Browse files Browse the repository at this point in the history
close #2405
  • Loading branch information
tchiotludo committed Nov 7, 2023
1 parent 45f5a3c commit 419a9c2
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 11 deletions.
23 changes: 23 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/NamespaceFiles.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.kestra.core.models.tasks;

import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.util.List;
import javax.validation.Valid;

@Builder
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Introspected
public class NamespaceFiles {
@Valid
private List<String> include;

@Valid
private List<String> exclude;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.kestra.core.models.tasks;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;

public interface NamespaceFilesInterface {
@Schema(
title = "Inject namespace files",
description = "Inject namespace file on this task. If true, inject all namespaces files or use filter to limit injected files.",
anyOf = {Boolean.class, NamespaceFiles.class}
)
@PluginProperty
Object getNamespaceFiles();
}
127 changes: 127 additions & 0 deletions core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.kestra.core.runners;

import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.utils.Rethrow.*;

@Singleton
@Slf4j
public class NamespaceFilesService {
@Inject
private StorageInterface storageInterface;

public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, Object namespaceFiles) throws Exception {
if (namespaceFiles instanceof Boolean && !((Boolean) namespaceFiles)) {
return null;
}

List<URI> list = new ArrayList<>();
list.addAll(recursiveList(tenantId, namespace, null));

if (namespaceFiles instanceof Boolean && ((Boolean) namespaceFiles)) {
copy(tenantId, namespace, basePath, list);
return list;
}

NamespaceFiles filter = JacksonMapper.cast(namespaceFiles, NamespaceFiles.class);

list = list
.stream()
.filter(throwPredicate(f -> {
var file = f.getPath();

if (filter.getExclude() != null) {
boolean b = match(runContext.render(filter.getExclude()), file);

if (b) {
return false;
}
}

if (filter.getInclude() != null) {
boolean b = match(filter.getInclude(), file);

if (!b) {
return false;
}
}

return true;
}))
.collect(Collectors.toList());

copy(tenantId, namespace, basePath, list);

return list;
}

private URI uri(String namespace, @Nullable URI path) {
return URI.create(storageInterface.namespaceFilePrefix(namespace) + Optional.ofNullable(path)
.map(URI::getPath)
.orElse("")
);
}

private List<URI> recursiveList(String tenantId, String namespace, @Nullable URI path) throws IOException {
URI uri = uri(namespace, path);

List<URI> result = new ArrayList<>();
List<FileAttributes> list = storageInterface.list(tenantId, uri);

for (var file: list) {
URI current = URI.create((path != null ? path.getPath() : "") + "/" + file.getFileName());

if (file.getType() == FileAttributes.FileType.Directory) {
result.addAll(this.recursiveList(tenantId, namespace, current));
} else {
result.add(current);
}
}

return result;
}

private static boolean match(List<String> patterns, String file) {
return patterns
.stream()
.anyMatch(s -> FileSystems
.getDefault()
.getPathMatcher("glob:" + s)
.matches(Paths.get(file))
);
}

private void copy(String tenantId, String namespace, Path basePath, List<URI> files) throws IOException {
files
.parallelStream()
.forEach(throwConsumer(f -> {
InputStream inputStream = storageInterface.get(tenantId, uri(namespace, f));
Path destination = Paths.get(basePath.toString(), f.getPath());

if (!destination.getParent().toFile().exists()) {
destination.getParent().toFile().mkdirs();
}

Files.copy(inputStream, destination);
}));
}
}
13 changes: 12 additions & 1 deletion core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,19 @@ private void handleTask(WorkerTask workerTask) {
RunContext runContext = workerTask.getRunContext().forWorkerDirectory(applicationContext, workerTask);

try {
workingDirectory.preExecuteTasks(runContext, workerTask.getTaskRun());
// preExecuteTasks
try {
workingDirectory.preExecuteTasks(runContext, workerTask.getTaskRun());
} catch (Exception e) {
runContext.logger().error("Failed preExecuteTasks on WorkingDirectory: {}", e.getMessage(), e);
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(State.Type.FAILED));
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask));
this.logTerminated(workerTask);

return;
}

// execute all tasks
for (Task currentTask : workingDirectory.getTasks()) {
if (Boolean.TRUE.equals(currentTask.getDisabled())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.models.tasks.NamespaceFilesInterface;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.NamespaceFilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.utils.IdUtils;
Expand Down Expand Up @@ -186,7 +189,7 @@ with open('output.json', 'w') as output_file:
}
)
@WorkingDirectoryTaskValidation
public class WorkingDirectory extends Sequential {
public class WorkingDirectory extends Sequential implements NamespaceFilesInterface {

@Schema(
title = "Cache configuration",
Expand All @@ -198,6 +201,8 @@ public class WorkingDirectory extends Sequential {
@PluginProperty
private Cache cache;

private Object namespaceFiles;

@Getter(AccessLevel.PRIVATE)
private transient long cacheDownloadedTime = 0L;

Expand Down Expand Up @@ -230,12 +235,8 @@ public WorkerTask workerTask(TaskRun parent, Task task, RunContext runContext) {
.build();
}

public void preExecuteTasks(RunContext runContext, TaskRun taskRun) {
if (cache == null) {
return;
}

try {
public void preExecuteTasks(RunContext runContext, TaskRun taskRun) throws Exception {
if (cache != null) {
// first, check if we need to delete the file
if (cache.ttl != null) {
var maybeLastModifiedTime = runContext.getTaskCacheFileLastModifiedTime(taskRun.getNamespace(), taskRun.getFlowId(), this.getId(), taskRun.getValue());
Expand Down Expand Up @@ -271,8 +272,11 @@ public void preExecuteTasks(RunContext runContext, TaskRun taskRun) {
// Set the cacheDownloadedTime so that we can check if files has been updated later
cacheDownloadedTime = System.currentTimeMillis();
}
} catch (IOException e) {
runContext.logger().error("Unable to execute WorkingDirectory pre actions", e);
}

if (this.namespaceFiles != null ) {
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.kestra.core.runners;

import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static io.kestra.core.utils.Rethrow.throwFunction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

@MicronautTest
class NamespaceFilesServiceTest {
@Inject
StorageInterface storageInterface;

@Inject
NamespaceFilesService namespaceFilesService;

@Inject
RunContextFactory runContextFactory;

@Test
public void noFilter() throws Exception {
Path basePath = Files.createTempDirectory("unit");
String namespace = "io.kestra." + IdUtils.create();

put(null, namespace, "/a/b/c/1.sql", "1");
put(null, namespace, "/a/1.sql", "2");
put(null, namespace, "/b/c/d/1.sql", "3");

List<URI> injected = namespaceFilesService.inject(runContextFactory.of(),null, namespace, basePath, true);

assertThat(injected.size(), is(3));

List<Path> tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList();
assertThat(tempDir.size(), is(3));
}

@Test
public void filter() throws Exception {
Path basePath = Files.createTempDirectory("unit");
String namespace = "io.kestra." + IdUtils.create();

put(null, namespace, "/a/b/c/1.sql", "1");
put(null, namespace, "/a/3.sql", "2");
put(null, namespace, "/b/c/d/1.sql", "3");

List<URI> injected = namespaceFilesService.inject(
runContextFactory.of(),
null,
namespace,
basePath,
NamespaceFiles.builder()
.include(List.of("/a/**"))
.exclude(List.of("**/3.sql"))
.build()
);

assertThat(injected.size(), is(1));
assertThat(injected.get(0).getPath(), containsString("c/1.sql"));
List<Path> tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList();
assertThat(tempDir.size(), is(1));
assertThat(tempDir.get(0).toString(), is(Paths.get(basePath.toString(), "/a/b/c/1.sql").toString()));
}

@Test
public void tenant() throws Exception {
String namespace = "io.kestra." + IdUtils.create();

put("tenant1", namespace, "/a/b/c/1.sql", "1");
put("tenant2", namespace, "/a/b/c/1.sql", "2");

RunContext runContext = runContextFactory.of();
List<URI> injected = namespaceFilesService.inject(runContextFactory.of(),"tenant1", namespace, runContext.tempDir(), true);
assertThat(injected.size(), is(1));

String content = Files.walk(runContext.tempDir()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow();
assertThat(content, is("1"));

runContext = runContextFactory.of();
injected = namespaceFilesService.inject(runContextFactory.of(),"tenant2", namespace, runContext.tempDir(), true);
assertThat(injected.size(), is(1));

content = Files.walk(runContext.tempDir()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow();
assertThat(content, is("2"));
}

private void put(@Nullable String tenantId, String namespace, String path, String content) throws IOException {
storageInterface.put(
tenantId,
URI.create(storageInterface.namespaceFilePrefix(namespace) + path),
new ByteArrayInputStream(content.getBytes())
);
}
}
Loading

0 comments on commit 419a9c2

Please sign in to comment.