diff --git a/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java b/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java index 03c79a8bae..4c199efd91 100644 --- a/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java +++ b/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java @@ -26,6 +26,7 @@ public class LocalWorkingDir implements WorkingDir { private final Path workingDirPath; + private final String workingDirId; /** * Creates a new {@link LocalWorkingDir} instance. @@ -43,6 +44,7 @@ public LocalWorkingDir(final Path tmpdirBasePath) { * @param workingDirId The working directory id. */ public LocalWorkingDir(final Path tmpdirBasePath, final String workingDirId) { + this.workingDirId = workingDirId; this.workingDirPath = tmpdirBasePath.resolve(workingDirId); } @@ -54,6 +56,11 @@ public Path path() { return path(true); } + @Override + public String id() { + return workingDirId; + } + /** * {@inheritDoc} **/ diff --git a/core/src/main/java/io/kestra/core/runners/WorkingDir.java b/core/src/main/java/io/kestra/core/runners/WorkingDir.java index 1cef21383a..b5d3e89f23 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkingDir.java +++ b/core/src/main/java/io/kestra/core/runners/WorkingDir.java @@ -34,6 +34,13 @@ public interface WorkingDir { */ Path path(boolean create); + /** + * Gets the working directory identifier. + * + * @return The identifier. + */ + String id(); + /** * Resolve a path inside the working directory (a.k.a. the tempDir). *

diff --git a/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java b/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java index 960819da5c..37d840e1e5 100644 --- a/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java +++ b/script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java @@ -18,17 +18,31 @@ import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.Await; +import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.RetryUtils; import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; import io.micronaut.core.convert.format.ReadableBytesTypeConverter; import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; import org.apache.hc.core5.http.ConnectionClosedException; import org.slf4j.Logger; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.LinkOption; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.time.Duration; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -204,6 +218,18 @@ public class Docker extends TaskRunner { @PluginProperty(dynamic = true) private String shmSize; + @Schema( + title = "File handling strategy.", + description = """ + How to handle local files (input files, output files, namespace files, ...). + By default, we create a volume and copy the file into the volume bind path. + Configuring it to `MOUNT` will mount the working directory instead.""" + ) + @NotNull + @Builder.Default + @PluginProperty + private FileHandlingStrategy fileHandlingStrategy = FileHandlingStrategy.VOLUME; + public static Docker from(DockerOptions dockerOptions) { if (dockerOptions == null) { return Docker.builder().build(); @@ -246,16 +272,69 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths(); + boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths); + boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload); + boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled(); + boolean needVolume = hasFilesToDownload || hasFilesToUpload || outputDirectoryEnabled; + String filesVolumeName = null; + + // create a volume if we need to handle files + if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME) { + CreateVolumeCmd files = dockerClient.createVolumeCmd() + .withLabels(ScriptService.labels(runContext, "kestra.io/")); + filesVolumeName = files.exec().getName(); + logger.debug("Volume created: {}", filesVolumeName); + String remotePath = windowsToUnixPath(taskCommands.getWorkingDirectory().toString()); + + // first, create an archive + Path fileArchive = runContext.workingDir().createFile("inputFiles.tart"); + try (FileOutputStream fos = new FileOutputStream(fileArchive.toString()); + ArchiveOutputStream out = new TarArchiveOutputStream(fos)) { + for (Path file: relativeWorkingDirectoryFilesPaths) { + Path resolvedFile = runContext.workingDir().resolve(file); + TarArchiveEntry entry = out.createArchiveEntry(resolvedFile.toFile(), file.toString()); + out.putArchiveEntry(entry); + if (!Files.isDirectory(resolvedFile)) { + try (InputStream fis = Files.newInputStream(resolvedFile)) { + IOUtils.copy(fis, out); + } + } + out.closeArchiveEntry(); + } + out.finish(); + } + + // then send it to the container + try (InputStream is = new FileInputStream(fileArchive.toString())) { + CopyArchiveToContainerCmd copyArchiveToContainerCmd = dockerClient.copyArchiveToContainerCmd(exec.getId()) + .withTarInputStream(is) + .withRemotePath(remotePath); + copyArchiveToContainerCmd.exec(); + } + + Files.delete(fileArchive); + + // create the outputDir if needed + if (taskCommands.outputDirectoryEnabled()) { + CopyArchiveToContainerCmd copyArchiveToContainerCmd = dockerClient.copyArchiveToContainerCmd(exec.getId()) + .withHostResource(taskCommands.getOutputDirectory().toString()) + .withRemotePath(remotePath); + copyArchiveToContainerCmd.exec(); + } + } + + // start container dockerClient.startContainerCmd(exec.getId()).exec(); logger.debug( "Starting command with container id {} [{}]", @@ -336,6 +415,26 @@ public void onComplete() { logger.debug("Command succeed with code " + exitCode); } + // download output files + if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME && filesVolumeName != null) { + CopyArchiveFromContainerCmd copyArchiveFromContainerCmd = dockerClient.copyArchiveFromContainerCmd(exec.getId(), windowsToUnixPath(taskCommands.getWorkingDirectory().toString())); + try (InputStream is = copyArchiveFromContainerCmd.exec(); + TarArchiveInputStream tar = new TarArchiveInputStream(is)) { + ArchiveEntry entry; + while ((entry = tar.getNextEntry()) != null) { + // each entry contains the working directory as the first part, we need to remove it + Path extractTo = runContext.workingDir().resolve(Path.of(entry.getName().substring(runContext.workingDir().id().length() +1))); + if (entry.isDirectory()) { + if (!Files.exists(extractTo)) { + Files.createDirectories(extractTo); + } + } else { + Files.copy(tar, extractTo, StandardCopyOption.REPLACE_EXISTING); + } + } + } + } + return new RunnerResult(exitCode, defaultLogConsumer); } finally { try { @@ -343,6 +442,11 @@ public void onComplete() { // come to a normal end. kill(); dockerClient.removeContainerCmd(exec.getId()).exec(); + logger.debug("Container deleted: {}", exec.getId()); + if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME && filesVolumeName != null) { + dockerClient.removeVolumeCmd(filesVolumeName).exec(); + logger.debug("Volume deleted: {}", filesVolumeName); + } } catch (Exception ignored) { } @@ -418,8 +522,8 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc Path workingDirectory = taskCommands.getWorkingDirectory(); String image = runContext.render(this.image, additionalVars); - CreateContainerCmd container = dockerClient.createContainerCmd(image); - addMetadata(runContext, container); + CreateContainerCmd container = dockerClient.createContainerCmd(image) + .withLabels(ScriptService.labels(runContext, "kestra.io/")); HostConfig hostConfig = new HostConfig(); @@ -434,16 +538,6 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc container.withWorkingDir(windowsToUnixPath(workingDirectory.toAbsolutePath().toString())); } - List binds = new ArrayList<>(); - - if (workingDirectory != null) { - String bindPath = windowsToUnixPath(workingDirectory.toString()); - binds.add(new Bind( - bindPath, - new Volume(bindPath), - AccessMode.rw - )); - } if (this.getUser() != null) { container.withUser(runContext.render(this.getUser(), additionalVars)); @@ -458,14 +552,21 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc .toArray(String[]::new)); } + List binds = new ArrayList<>(); + if (this.fileHandlingStrategy == FileHandlingStrategy.MOUNT && workingDirectory != null) { + String bindPath = windowsToUnixPath(workingDirectory.toString()); + binds.add(new Bind( + bindPath, + new Volume(bindPath), + AccessMode.rw + )); + } if (volumesEnabled && this.getVolumes() != null) { binds.addAll(runContext.render(this.getVolumes()) .stream() .map(Bind::parse) - .toList() - ); + .toList()); } - if (!binds.isEmpty()) { hostConfig.withBinds(binds); } @@ -533,10 +634,6 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc .withAttachStdout(true); } - private static void addMetadata(RunContext runContext, CreateContainerCmd container) { - container.withLabels(ScriptService.labels(runContext, "kestra.io/")); - } - private static Long convertBytes(String bytes) { return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class) .orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'")) @@ -581,4 +678,9 @@ private void pullImage(DockerClient dockerClient, String image, PullPolicy polic ); } } + + public enum FileHandlingStrategy { + MOUNT, + VOLUME + } }