Skip to content

Commit

Permalink
feat(core, script): use a volume instead of a bind for handling files…
Browse files Browse the repository at this point in the history
… in the Docker task runner

Fixes #3857
  • Loading branch information
loicmathieu committed Jul 17, 2024
1 parent c508682 commit a5dc8e4
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class LocalWorkingDir implements WorkingDir {

private final Path workingDirPath;
private final String workingDirId;

/**
* Creates a new {@link LocalWorkingDir} instance.
Expand All @@ -43,6 +44,7 @@ public LocalWorkingDir(final Path tmpdirBasePath) {
* @param workingDirId The working directory id.
*/
public LocalWorkingDir(final Path tmpdirBasePath, final String workingDirId) {
this.workingDirId = workingDirId;
this.workingDirPath = tmpdirBasePath.resolve(workingDirId);
}

Expand All @@ -54,6 +56,11 @@ public Path path() {
return path(true);
}

@Override
public String id() {
return workingDirId;
}

/**
* {@inheritDoc}
**/
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/kestra/core/runners/WorkingDir.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public interface WorkingDir {
*/
Path path(boolean create);

/**
* Gets the working directory identifier.
*
* @return The identifier.
*/
String id();

/**
* Resolve a path inside the working directory (a.k.a. the tempDir).
* <p>
Expand Down
148 changes: 125 additions & 23 deletions script/src/main/java/io/kestra/plugin/scripts/runner/docker/Docker.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,31 @@
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.slf4j.Logger;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -204,6 +218,18 @@ public class Docker extends TaskRunner {
@PluginProperty(dynamic = true)
private String shmSize;

@Schema(
title = "File handling strategy.",
description = """
How to handle local files (input files, output files, namespace files, ...).
By default, we create a volume and copy the file into the volume bind path.
Configuring it to `MOUNT` will mount the working directory instead."""
)
@NotNull
@Builder.Default
@PluginProperty
private FileHandlingStrategy fileHandlingStrategy = FileHandlingStrategy.VOLUME;

public static Docker from(DockerOptions dockerOptions) {
if (dockerOptions == null) {
return Docker.builder().build();
Expand Down Expand Up @@ -246,16 +272,69 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
String image = runContext.render(this.image, additionalVars);

try (DockerClient dockerClient = dockerClient(runContext, image)) {
// create container
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);

// pull image
if (this.getPullPolicy() != PullPolicy.NEVER) {
pullImage(dockerClient, image, this.getPullPolicy(), logger);
}

// start container
// create container
CreateContainerCmd container = configure(taskCommands, dockerClient, runContext, additionalVars);
CreateContainerResponse exec = container.exec();
logger.debug("Container created: {}", exec.getId());

List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths();
boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths);
boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload);
boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled();
boolean needVolume = hasFilesToDownload || hasFilesToUpload || outputDirectoryEnabled;
String filesVolumeName = null;

// create a volume if we need to handle files
if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME) {
CreateVolumeCmd files = dockerClient.createVolumeCmd()
.withLabels(ScriptService.labels(runContext, "kestra.io/"));
filesVolumeName = files.exec().getName();
logger.debug("Volume created: {}", filesVolumeName);
String remotePath = windowsToUnixPath(taskCommands.getWorkingDirectory().toString());

// first, create an archive
Path fileArchive = runContext.workingDir().createFile("inputFiles.tart");
try (FileOutputStream fos = new FileOutputStream(fileArchive.toString());
ArchiveOutputStream<TarArchiveEntry> out = new TarArchiveOutputStream(fos)) {
for (Path file: relativeWorkingDirectoryFilesPaths) {
Path resolvedFile = runContext.workingDir().resolve(file);
TarArchiveEntry entry = out.createArchiveEntry(resolvedFile.toFile(), file.toString());
out.putArchiveEntry(entry);
if (!Files.isDirectory(resolvedFile)) {
try (InputStream fis = Files.newInputStream(resolvedFile)) {
IOUtils.copy(fis, out);
}
}
out.closeArchiveEntry();
}
out.finish();
}

// then send it to the container
try (InputStream is = new FileInputStream(fileArchive.toString())) {
CopyArchiveToContainerCmd copyArchiveToContainerCmd = dockerClient.copyArchiveToContainerCmd(exec.getId())
.withTarInputStream(is)
.withRemotePath(remotePath);
copyArchiveToContainerCmd.exec();
}

Files.delete(fileArchive);

// create the outputDir if needed
if (taskCommands.outputDirectoryEnabled()) {
CopyArchiveToContainerCmd copyArchiveToContainerCmd = dockerClient.copyArchiveToContainerCmd(exec.getId())
.withHostResource(taskCommands.getOutputDirectory().toString())
.withRemotePath(remotePath);
copyArchiveToContainerCmd.exec();
}
}

// start container
dockerClient.startContainerCmd(exec.getId()).exec();
logger.debug(
"Starting command with container id {} [{}]",
Expand Down Expand Up @@ -336,13 +415,38 @@ public void onComplete() {
logger.debug("Command succeed with code " + exitCode);
}

// download output files
if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME && filesVolumeName != null) {
CopyArchiveFromContainerCmd copyArchiveFromContainerCmd = dockerClient.copyArchiveFromContainerCmd(exec.getId(), windowsToUnixPath(taskCommands.getWorkingDirectory().toString()));
try (InputStream is = copyArchiveFromContainerCmd.exec();
TarArchiveInputStream tar = new TarArchiveInputStream(is)) {
ArchiveEntry entry;
while ((entry = tar.getNextEntry()) != null) {
// each entry contains the working directory as the first part, we need to remove it
Path extractTo = runContext.workingDir().resolve(Path.of(entry.getName().substring(runContext.workingDir().id().length() +1)));
if (entry.isDirectory()) {
if (!Files.exists(extractTo)) {
Files.createDirectories(extractTo);
}
} else {
Files.copy(tar, extractTo, StandardCopyOption.REPLACE_EXISTING);
}
}
}
}

return new RunnerResult(exitCode, defaultLogConsumer);
} finally {
try {
// kill container if it's still running, this means there was an exception and the container didn't
// come to a normal end.
kill();
dockerClient.removeContainerCmd(exec.getId()).exec();
logger.debug("Container deleted: {}", exec.getId());
if (needVolume && this.fileHandlingStrategy == FileHandlingStrategy.VOLUME && filesVolumeName != null) {
dockerClient.removeVolumeCmd(filesVolumeName).exec();
logger.debug("Volume deleted: {}", filesVolumeName);
}
} catch (Exception ignored) {

}
Expand Down Expand Up @@ -418,8 +522,8 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc
Path workingDirectory = taskCommands.getWorkingDirectory();
String image = runContext.render(this.image, additionalVars);

CreateContainerCmd container = dockerClient.createContainerCmd(image);
addMetadata(runContext, container);
CreateContainerCmd container = dockerClient.createContainerCmd(image)
.withLabels(ScriptService.labels(runContext, "kestra.io/"));

HostConfig hostConfig = new HostConfig();

Expand All @@ -434,16 +538,6 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc
container.withWorkingDir(windowsToUnixPath(workingDirectory.toAbsolutePath().toString()));
}

List<Bind> binds = new ArrayList<>();

if (workingDirectory != null) {
String bindPath = windowsToUnixPath(workingDirectory.toString());
binds.add(new Bind(
bindPath,
new Volume(bindPath),
AccessMode.rw
));
}

if (this.getUser() != null) {
container.withUser(runContext.render(this.getUser(), additionalVars));
Expand All @@ -458,14 +552,21 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc
.toArray(String[]::new));
}

List<Bind> binds = new ArrayList<>();
if (this.fileHandlingStrategy == FileHandlingStrategy.MOUNT && workingDirectory != null) {
String bindPath = windowsToUnixPath(workingDirectory.toString());
binds.add(new Bind(
bindPath,
new Volume(bindPath),
AccessMode.rw
));
}
if (volumesEnabled && this.getVolumes() != null) {
binds.addAll(runContext.render(this.getVolumes())
.stream()
.map(Bind::parse)
.toList()
);
.toList());
}

if (!binds.isEmpty()) {
hostConfig.withBinds(binds);
}
Expand Down Expand Up @@ -533,10 +634,6 @@ private CreateContainerCmd configure(TaskCommands taskCommands, DockerClient doc
.withAttachStdout(true);
}

private static void addMetadata(RunContext runContext, CreateContainerCmd container) {
container.withLabels(ScriptService.labels(runContext, "kestra.io/"));
}

private static Long convertBytes(String bytes) {
return READABLE_BYTES_TYPE_CONVERTER.convert(bytes, Number.class)
.orElseThrow(() -> new IllegalArgumentException("Invalid size with value '" + bytes + "'"))
Expand Down Expand Up @@ -581,4 +678,9 @@ private void pullImage(DockerClient dockerClient, String image, PullPolicy polic
);
}
}

public enum FileHandlingStrategy {
MOUNT,
VOLUME
}
}

0 comments on commit a5dc8e4

Please sign in to comment.