diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index 1ebe38fdc224..7c4b97a0a78f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -25,6 +25,7 @@ package io.airbyte.workers; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; @@ -41,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// TODO:(Issue-4824): Figure out how to log Docker process information. public class WorkerUtils { public static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = initResourceRequirements(); @@ -52,8 +54,14 @@ public static void gentleClose(final Process process, final long timeout, final return; } + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + LOGGER.debug("Gently closing process {}", process.info().commandLine().get()); + } + try { - process.waitFor(timeout, timeUnit); + if (process.isAlive()) { + process.waitFor(timeout, timeUnit); + } } catch (InterruptedException e) { LOGGER.error("Exception while while waiting for process to finish", e); } @@ -103,6 +111,9 @@ static void gentleCloseWithHeartbeat(final Process process, while (process.isAlive() && heartbeatMonitor.isBeating()) { try { + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + LOGGER.debug("Gently closing process {} with heartbeat..", process.info().commandLine().get()); + } process.waitFor(checkHeartbeatDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Exception while waiting for process to finish", e); @@ -111,6 +122,9 @@ static void gentleCloseWithHeartbeat(final Process process, if (process.isAlive()) { try { + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + LOGGER.debug("Gently closing process {} without heartbeat..", process.info().commandLine().get()); + } process.waitFor(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Exception during grace period for process to finish. This can happen when cancelling jobs."); @@ -119,6 +133,9 @@ static void gentleCloseWithHeartbeat(final Process process, // if we were unable to exist gracefully, force shutdown... if (process.isAlive()) { + if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + LOGGER.debug("Force shutdown process {}..", process.info().commandLine().get()); + } forceShutdown.accept(process, forcedShutdownDuration); } } @@ -133,7 +150,7 @@ static void forceShutdown(Process process, Duration lastChanceDuration) { LOGGER.error("Exception while while killing the process", e); } if (process.isAlive()) { - LOGGER.error("Couldn't kill the process. You might have a zombie ({})", process.info().commandLine()); + LOGGER.error("Couldn't kill the process. You might have a zombie process."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index a0d271de259a..cd39f015d17f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.ProcessHandle.Info; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; @@ -489,15 +490,11 @@ public InputStream getErrorStream() { */ @Override public int waitFor() throws InterruptedException { - try { - Pod refreshedPod = - fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); - fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); - wasKilled.set(true); - return exitValue(); - } finally { - close(); - } + Pod refreshedPod = + fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); + fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); + wasKilled.set(true); + return exitValue(); } /** @@ -506,11 +503,7 @@ public int waitFor() throws InterruptedException { */ @Override public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException { - try { - return super.waitFor(timeout, unit); - } finally { - close(); - } + return super.waitFor(timeout, unit); } /** @@ -528,6 +521,11 @@ public void destroy() { } } + @Override + public Info info() { + return new KubePodProcessInfo(podDefinition.getMetadata().getName()); + } + /** * Close all open resource in the opposite order of resource creation. */ @@ -538,8 +536,13 @@ private void close() { Exceptions.swallow(this.stdoutServerSocket::close); Exceptions.swallow(this.stderrServerSocket::close); Exceptions.swallow(this.executorService::shutdownNow); - Exceptions.swallow(() -> portReleaser.accept(stdoutLocalPort)); - Exceptions.swallow(() -> portReleaser.accept(stderrLocalPort)); + try { + portReleaser.accept(stdoutLocalPort); + portReleaser.accept(stderrLocalPort); + } catch (Exception e) { + LOGGER.error("Error releasing ports ", e); + } + LOGGER.debug("Closed {}", podDefinition.getMetadata().getName()); } private boolean isTerminal(Pod pod) { @@ -600,7 +603,17 @@ private int getReturnCode(Pod pod) { @Override public int exitValue() { - return getReturnCode(podDefinition); + // getReturnCode throws IllegalThreadException if the Kube pod has not exited; + // close() is only called if the Kube pod has terminated. + var returnCode = getReturnCode(podDefinition); + // The OS traditionally handles process resource clean up. Therefore an exit code of 0, also + // indicates that all kernel resources were shut down. + // Because this is a custom implementation, manually close all the resources. + // Further, since the local resources are used to talk to Kubernetes resources, shut local resources + // down after Kubernetes resources are shut down, regardless of Kube termination status. + close(); + LOGGER.info("Closed all resources for pod {}", podDefinition.getMetadata().getName()); + return returnCode; } private static ResourceRequirementsBuilder getResourceRequirementsBuilder(ResourceRequirements resourceRequirements) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcessInfo.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcessInfo.java new file mode 100644 index 000000000000..62f879cb51b9 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcessInfo.java @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.process; + +import java.lang.ProcessHandle.Info; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * Minimal Process info implementation to assist with debug logging. + * + * Current implement only logs out the Kubernetes pod corresponding to the JVM process. + */ +public class KubePodProcessInfo implements Info { + + private final String podName; + + public KubePodProcessInfo(String podname) { + this.podName = podname; + } + + @Override + public Optional command() { + return Optional.of(podName); + } + + @Override + public Optional commandLine() { + return Optional.of(podName); + } + + @Override + public Optional arguments() { + return Optional.empty(); + } + + @Override + public Optional startInstant() { + return Optional.empty(); + } + + @Override + public Optional totalCpuDuration() { + return Optional.empty(); + } + + @Override + public Optional user() { + return Optional.empty(); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index aade29d61e9d..c5f7a502957b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -24,6 +24,7 @@ package io.airbyte.workers.process; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.WorkerException; import io.fabric8.kubernetes.client.KubernetesClient; @@ -79,21 +80,20 @@ public Process create(String jobId, throws WorkerException { try { // used to differentiate source and destination processes with the same id and attempt - final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); - final String podName = "airbyte-worker-" + jobId + "-" + attempt + "-" + suffix; + final String podName = createPodName(imageName, jobId, attempt); final int stdoutLocalPort = workerPorts.take(); - LOGGER.info("stdoutLocalPort = " + stdoutLocalPort); + LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort); final int stderrLocalPort = workerPorts.take(); - LOGGER.info("stderrLocalPort = " + stderrLocalPort); + LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort); final Consumer portReleaser = port -> { if (!workerPorts.contains(port)) { workerPorts.add(port); - LOGGER.info("Port consumer releasing: " + port); + LOGGER.info("{} releasing: {}", podName, port); } else { - LOGGER.info("Port consumer skipping releasing: " + port); + LOGGER.info("{} skipping releasing: {}", podName, port); } }; @@ -117,4 +117,38 @@ public Process create(String jobId, } } + /** + * Docker image names are by convention separated by slashes. The last portion is the image's name. + * This is followed by a colon and a version number. e.g. airbyte/scheduler:v1 or + * gcr.io/my-project/image-name:v2. + * + * Kubernetes has a maximum pod name length of 63 characters. + * + * With these two facts, attempt to construct a unique Pod name with the image name present for + * easier operations. + */ + @VisibleForTesting + protected static String createPodName(String fullImagePath, String jobId, int attempt) { + var versionDelimiter = ":"; + var noVersion = fullImagePath.split(versionDelimiter)[0]; + + var dockerDelimiter = "/"; + var nameParts = noVersion.split(dockerDelimiter); + var imageName = nameParts[nameParts.length - 1]; + + var randSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); + final String suffix = "worker-" + jobId + "-" + attempt + "-" + randSuffix; + + var podName = imageName + "-" + suffix; + + var podNameLenLimit = 63; + if (podName.length() > podNameLenLimit) { + var extra = podName.length() - podNameLenLimit; + imageName = imageName.substring(extra); + podName = imageName + "-" + suffix; + } + + return podName; + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index 195f8ec3bf26..a2d88a4be32c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -112,6 +112,7 @@ public void notifyEndOfStream() throws IOException { @Override public void close() throws Exception { if (destinationProcess == null) { + LOGGER.debug("Destination process already exited"); return; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index cc6a0fb919e7..05a0ab614b44 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -118,6 +118,7 @@ public Optional attemptRead() { @Override public void close() throws Exception { if (sourceProcess == null) { + LOGGER.debug("Source process already exited"); return; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java index 40fb64b257b3..01d87b11ecfc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java @@ -26,8 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -107,7 +105,7 @@ public void testDiscoverSchema() throws Exception { } }); - verify(process).waitFor(anyLong(), any()); + verify(process).exitValue(); } @SuppressWarnings("BusyWait") @@ -124,7 +122,7 @@ public void testDiscoverSchemaProcessFail() throws Exception { } }); - verify(process).waitFor(anyLong(), any()); + verify(process).exitValue(); } @Test diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index 95d510fe2d19..c89bbabb0f76 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -99,7 +99,7 @@ public void testClose() throws Exception { runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); runner.close(); - verify(process).destroy(); + verify(process).waitFor(); } @Test diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java new file mode 100644 index 000000000000..bea8b469794e --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.process; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class KubeProcessFactoryTest { + + @Test + void getPodNameNormal() { + var name = KubeProcessFactory.createPodName("airbyte/tester:1", "1", 10); + var withoutRandSuffix = name.substring(0, name.length() - 5); + Assertions.assertEquals("tester-worker-1-10-", withoutRandSuffix); + } + + @Test + void getPodNameTruncated() { + var name = KubeProcessFactory.createPodName("airbyte/very-very-very-long-name-longer-than-63-chars:2", "1", 10); + var withoutRandSuffix = name.substring(0, name.length() - 5); + Assertions.assertEquals("very-very-very-long-name-longer-than-63-chars-worker-1-10-", withoutRandSuffix); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java index 72c793b9bef3..d9ad2e823365 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java @@ -26,8 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -139,7 +137,7 @@ public void testSuccessfulLifecycle() throws Exception { } }); - verify(process).waitFor(anyLong(), any()); + verify(process).exitValue(); } @Test