Skip to content

Commit

Permalink
fix(core): fix docker container name (#952)
Browse files Browse the repository at this point in the history
Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>
  • Loading branch information
tchiotludo and loicmathieu authored Feb 3, 2023
1 parent 0726811 commit e484234
Showing 1 changed file with 43 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import io.kestra.core.tasks.scripts.AbstractLogThread;
import io.kestra.core.tasks.scripts.RunResult;
import io.kestra.core.utils.RetryUtils;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.convert.format.ReadableBytesTypeConverter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.slf4j.Logger;

Expand Down Expand Up @@ -100,20 +98,6 @@ private static void metadata(RunContext runContext, CreateContainerCmd container
Map<String, String> execution = (Map<String, String>) runContext.getVariables().get("execution");
Map<String, String> taskrun = (Map<String, String>) runContext.getVariables().get("taskrun");

String name = Slugify.of(String.join(
"-",
taskrun.get("id"),
flow.get("id"),
task.get("id")
));

if (name.length() > 63) {
name = name.substring(0, 63);
}

name = StringUtils.stripEnd(name, "-");

container.withName(name);
container.withLabels(ImmutableMap.of(
"flow.kestra.io/id", flow.get("id"),
"flow.kestra.io/namespace", flow.get("namespace"),
Expand All @@ -133,8 +117,6 @@ public RunResult run(
AbstractBash.LogSupplier logSupplier,
Map<String, Object> additionalVars
) throws Exception {
DockerClient dockerClient = getDockerClient(abstractBash, runContext, workingDirectory);

if (abstractBash.getDockerOptions() == null) {
throw new IllegalArgumentException("Missing required dockerOptions properties");
}
Expand All @@ -143,13 +125,13 @@ public RunResult run(
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);

try (
CreateContainerCmd container = dockerClient.createContainerCmd(image);
PullImageCmd pull = dockerClient.pullImageCmd(image);
DockerClient dockerClient = getDockerClient(abstractBash, runContext, workingDirectory);
PipedInputStream stdOutInputStream = new PipedInputStream();
PipedOutputStream stdOutOutputStream = new PipedOutputStream(stdOutInputStream);
PipedInputStream stdErrInputStream = new PipedInputStream();
PipedOutputStream stdErrOutputStream = new PipedOutputStream(stdErrInputStream);
) {
CreateContainerCmd container = dockerClient.createContainerCmd(image);
// properties
metadata(runContext, container);
HostConfig hostConfig = new HostConfig();
Expand Down Expand Up @@ -263,28 +245,30 @@ public RunResult run(

// pull image
if (abstractBash.getDockerOptions().getPullImage()) {
retryUtils.<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(5))
.maxInterval(Duration.ofSeconds(120))
.maxAttempt(5)
.build()
).run(
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":")
? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())
.awaitCompletion();
logger.debug("Image pulled [{}:{}]", repository, tag);
return true;
}
);
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
retryUtils.<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
.interval(Duration.ofSeconds(5))
.maxInterval(Duration.ofSeconds(120))
.maxAttempt(5)
.build()
).run(
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":")
? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())
.awaitCompletion();
logger.debug("Image pulled [{}:{}]", repository, tag);
return true;
}
);
}
}

// start container
Expand Down Expand Up @@ -325,8 +309,6 @@ public void onNext(Frame item) {
stdOut.join();
stdErr.join();

dockerClient.removeContainerCmd(exec.getId()).exec();

if (exitCode != 0) {
throw new AbstractBash.BashException(exitCode, stdOut.getLogsCount(), stdErr.getLogsCount());
} else {
Expand All @@ -337,9 +319,24 @@ public void onNext(Frame item) {
} catch (InterruptedException e) {
logger.warn("Killing process {} for InterruptedException", exec.getId());

dockerClient.killContainerCmd(exec.getId()).exec();
dockerClient.removeContainerCmd(exec.getId()).exec();
throw e;
} finally {
try {
var inspect = dockerClient.inspectContainerCmd(exec.getId()).exec();
if (Boolean.TRUE.equals(inspect.getState().getRunning())) {
// kill container as it's still running, this means there was an exception and the container didn't
// come to a normal end.
try {
dockerClient.killContainerCmd(exec.getId()).exec();
} catch (Exception e) {
logger.error("Unable to kill a running container", e);
}
}
dockerClient.removeContainerCmd(exec.getId()).exec();
} catch (Exception ignored) {

}

}
}
}
Expand Down

0 comments on commit e484234

Please sign in to comment.