diff --git a/airbyte-container-orchestrator/build.gradle b/airbyte-container-orchestrator/build.gradle index 32ef6c7d17aa..6356b42c3bee 100644 --- a/airbyte-container-orchestrator/build.gradle +++ b/airbyte-container-orchestrator/build.gradle @@ -3,7 +3,7 @@ plugins { } dependencies { - implementation 'io.fabric8:kubernetes-client:5.3.1' + implementation 'io.fabric8:kubernetes-client:5.12.2' implementation 'org.apache.commons:commons-lang3:3.11' implementation 'org.apache.commons:commons-text:1.9' implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723' diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java index 64149a0c5a04..c6da05a7ecbd 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java @@ -74,14 +74,14 @@ public ContainerOrchestratorApp( } private void configureLogging() { - for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) { + for (final String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) { if (envMap.containsKey(envVar)) { System.setProperty(envVar, envMap.get(envVar)); } } // make sure the new configuration is picked up - LoggerContext ctx = (LoggerContext) LogManager.getContext(false); + final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); ctx.reconfigure(); final var logClient = LogClientSingleton.getInstance(); @@ -119,7 +119,7 @@ private void runInternal(final DefaultAsyncStateManager asyncStateManager) { // required to kill clients with thread pools System.exit(0); - } catch (Throwable t) { + } catch (final Throwable t) { asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED); System.exit(1); } @@ -177,7 +177,7 @@ public static void main(final String[] args) { final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo); app.run(); - } catch (Throwable t) { + } catch (final Throwable t) { log.error("Orchestrator failed...", t); // otherwise the pod hangs on closing System.exit(1); @@ -212,7 +212,11 @@ private static ProcessFactory getProcessBuilderFactory(final Configs configs, fi // exposed) KubePortManagerSingleton.init(OrchestratorConstants.PORTS); - return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false); + return new KubeProcessFactory(workerConfigs, + configs.getJobKubeNamespace(), + fabricClient, + kubeHeartbeatUrl, + false); } else { return new DockerProcessFactory( workerConfigs, diff --git a/airbyte-scheduler/app/build.gradle b/airbyte-scheduler/app/build.gradle index 13953e969907..656dab8ecc6a 100644 --- a/airbyte-scheduler/app/build.gradle +++ b/airbyte-scheduler/app/build.gradle @@ -3,7 +3,7 @@ plugins { } dependencies { - implementation 'io.fabric8:kubernetes-client:5.9.0' + implementation 'io.fabric8:kubernetes-client:5.12.2' implementation 'io.temporal:temporal-sdk:1.8.1' implementation project(':airbyte-analytics') diff --git a/airbyte-tests/build.gradle b/airbyte-tests/build.gradle index b162ce62ac17..7c65475095cf 100644 --- a/airbyte-tests/build.gradle +++ b/airbyte-tests/build.gradle @@ -38,7 +38,7 @@ dependencies { implementation project(':airbyte-api') implementation project(':airbyte-container-orchestrator') - implementation 'io.fabric8:kubernetes-client:5.3.1' + implementation 'io.fabric8:kubernetes-client:5.12.2' implementation 'org.testcontainers:testcontainers:1.15.3' acceptanceTestsImplementation project(':airbyte-api') diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 0091663a364a..3830e60097e3 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -11,7 +11,7 @@ configurations { } dependencies { - implementation 'io.fabric8:kubernetes-client:5.3.1' + implementation 'io.fabric8:kubernetes-client:5.12.2' implementation 'io.temporal:temporal-sdk:1.8.1' implementation 'org.apache.ant:ant:1.10.10' implementation 'org.apache.commons:commons-lang3:3.11' diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index ab077bf5d570..d87ffd380b5c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -281,7 +281,11 @@ private static ProcessFactory getJobProcessFactory(final Configs configs, final final String localIp = InetAddress.getLocalHost().getHostAddress(); final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace()); - return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false); + return new KubeProcessFactory(workerConfigs, + configs.getJobKubeNamespace(), + fabricClient, + kubeHeartbeatUrl, + false); } else { return new DockerProcessFactory( workerConfigs, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index 36c384a9f4a8..504dcb6b985f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -310,16 +310,12 @@ public void create(final Map allLabels, .createOrReplace(podToCreate); log.info("Waiting for pod to be running..."); - try { - kubernetesClient.pods() - .inNamespace(kubePodInfo.namespace()) - .withName(kubePodInfo.name()) - .waitUntilCondition(p -> { - return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null; - }, 5, TimeUnit.MINUTES); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } + kubernetesClient.pods() + .inNamespace(kubePodInfo.namespace()) + .withName(kubePodInfo.name()) + .waitUntilCondition(p -> { + return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null; + }, 5, TimeUnit.MINUTES); final var podStatus = kubernetesClient.pods() .inNamespace(kubePodInfo.namespace()) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/ExitCodeWatcher.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/ExitCodeWatcher.java index 24fb200ca8fe..f92ff88e7279 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/ExitCodeWatcher.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/ExitCodeWatcher.java @@ -7,8 +7,9 @@ import com.google.common.collect.MoreCollectors; import io.fabric8.kubernetes.api.model.ContainerStatus; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; @@ -20,52 +21,111 @@ * be able to retrieve the exit code). */ @Slf4j -public class ExitCodeWatcher implements Watcher { +public class ExitCodeWatcher implements ResourceEventHandler { + private final String podName; + private final String podNamespace; private final Consumer onExitCode; - private final Consumer onWatchFailure; - private boolean exitCodeRetrieved = false; + private final Runnable onWatchFailure; + /** + * This flag is set to false when we either (a) find the pod's exit code, or (b) when the pod is + * deleted. This is so that we call exactly one of onExitCode and onWatchFailure, and we make that + * call exactly once. + *

+ * We rely on this class being side-effect-free, outside of persistExitCode() and persistFailure(). + * Those two methods use compareAndSet to prevent race conditions. Everywhere else, we can be sloppy + * because we won't actually emit any output. + */ + private final AtomicBoolean active = new AtomicBoolean(true); /** - * * @param onExitCode callback used to store the exit code * @param onWatchFailure callback that's triggered when the watch fails. should be some failed exit * code. */ - public ExitCodeWatcher(final Consumer onExitCode, final Consumer onWatchFailure) { + public ExitCodeWatcher(final String podName, + final String podNamespace, + final Consumer onExitCode, + final Runnable onWatchFailure) { + this.podName = podName; + this.podNamespace = podNamespace; this.onExitCode = onExitCode; this.onWatchFailure = onWatchFailure; } @Override - public void eventReceived(Action action, Pod resource) { - try { - if (!exitCodeRetrieved && KubePodResourceHelper.isTerminal(resource)) { - final ContainerStatus mainContainerStatus = resource.getStatus().getContainerStatuses() - .stream() - .filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME)) - .collect(MoreCollectors.onlyElement()); + public void onAdd(final Pod pod) { + if (shouldCheckPod(pod)) { + final Optional exitCode = getExitCode(pod); + exitCode.ifPresent(this::persistExitCode); + } + } - if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) { - final int exitCode = mainContainerStatus.getState().getTerminated().getExitCode(); - log.info("Processing event with exit code " + exitCode + " for pod: " + resource.getMetadata().getName()); - onExitCode.accept(exitCode); - exitCodeRetrieved = true; - } - } - } catch (Exception e) { - String podName = ""; - if (resource.getMetadata() != null) { - podName = resource.getMetadata().getName(); + @Override + public void onUpdate(final Pod oldPod, final Pod newPod) { + if (shouldCheckPod(newPod)) { + final Optional exitCode = getExitCode(newPod); + exitCode.ifPresent(this::persistExitCode); + } + } + + @Override + public void onDelete(final Pod pod, final boolean deletedFinalStateUnknown) { + if (shouldCheckPod(pod)) { + if (!deletedFinalStateUnknown) { + final Optional exitCode = getExitCode(pod); + exitCode.ifPresentOrElse( + this::persistExitCode, + this::persistFailure); + } else { + persistFailure(); } + } + } + + /** + * Informers without an OperationContext will monitor ALL pods in ALL namespaces; filter down to the + * one pod that we care about. If it's still running, then we obviously can't fetch its exit code. + *

+ * Also, if we've already found the exit code, or the pod has been deleted, then stop doing anything + * at all. + */ + private boolean shouldCheckPod(final Pod pod) { + final boolean correctName = podName.equals(pod.getMetadata().getName()); + final boolean correctNamespace = podNamespace.equals(pod.getMetadata().getNamespace()); + return active.get() && correctName && correctNamespace && KubePodResourceHelper.isTerminal(pod); + } + + private Optional getExitCode(final Pod pod) { + final ContainerStatus mainContainerStatus = pod.getStatus().getContainerStatuses() + .stream() + .filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME)) + .collect(MoreCollectors.onlyElement()); - log.error("ExitCodeWatcher event handling failed for pod: " + podName, e); + if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) { + return Optional.of(mainContainerStatus.getState().getTerminated().getExitCode()); } + return Optional.empty(); } - @Override - public void onClose(WatcherException cause) { - onWatchFailure.accept(cause); + private void persistExitCode(final int exitCode) { + if (active.compareAndSet(true, false)) { + log.info("Received exit code {} for pod {}", exitCode, podName); + onExitCode.accept(exitCode); + } + } + + private void persistFailure() { + if (active.compareAndSet(true, false)) { + // Log an error. The pod is completely gone, and we have no way to retrieve its exit code + // In theory, this shouldn't really happen. From + // https://pkg.go.dev/k8s.io/client-go/tools/cache#DeletedFinalStateUnknown: + // "in the case where an object was deleted but the watch deletion event was missed while + // disconnected from apiserver" + // But we have this handler just in case. + log.error("Pod {} was deleted before we could retrieve its exit code", podName); + onWatchFailure.run(); + } } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 60d26c1d1556..86d576215725 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -30,8 +30,8 @@ import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.VolumeMountBuilder; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.internal.readiness.Readiness; import java.io.IOException; import java.io.InputStream; @@ -147,7 +147,7 @@ public class KubePodProcess extends Process implements KubePod { private final int stderrLocalPort; private final ExecutorService executorService; private final CompletableFuture exitCodeFuture; - private final Watch podWatch; + private final SharedIndexInformer podInformer; public static String getPodIP(final KubernetesClient client, final String podName, final String podNamespace) { final var pod = client.pods().inNamespace(podNamespace).withName(podName).get(); @@ -305,16 +305,7 @@ private static void waitForInitPodToRun(final KubernetesClient client, final Pod LOGGER.info("Waiting for init container to be ready before copying files..."); final PodResource pod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()); - try { - pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES); - } catch (final InterruptedException e) { - LOGGER.error("Init pod not found after 5 minutes"); - LOGGER.error("Pod search executed in namespace {} for pod name {} resulted in: {}", - podDefinition.getMetadata().getNamespace(), - podDefinition.getMetadata().getName(), - pod.get().toString()); - throw e; - } + pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES); LOGGER.info("Init container present.."); client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()) .waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().get(0).getState().getRunning() != null, 5, TimeUnit.MINUTES); @@ -528,15 +519,21 @@ public KubePodProcess(final boolean isOrchestrator, // This is safe only because we are blocking the init pod until we copy files onto it. // See the ExitCodeWatcher comments for more info. exitCodeFuture = new CompletableFuture<>(); - podWatch = fabricClient.resource(podDefinition).watch(new ExitCodeWatcher( + podInformer = fabricClient.pods() + .inNamespace(namespace) + .withName(pod.getMetadata().getName()) + .inform(); + podInformer.addEventHandler(new ExitCodeWatcher( + pod.getMetadata().getName(), + namespace, exitCodeFuture::complete, - exception -> { + () -> { LOGGER.info(prependPodInfo( String.format( - "Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled. Error: %s", - KILLED_EXIT_CODE, - exception.getMessage()), - namespace, podName)); + "Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled.", + KILLED_EXIT_CODE), + namespace, + podName)); exitCodeFuture.complete(KILLED_EXIT_CODE); })); @@ -631,7 +628,7 @@ public InputStream getErrorStream() { public int waitFor() throws InterruptedException { try { exitCodeFuture.get(); - } catch (ExecutionException e) { + } catch (final ExecutionException e) { throw new RuntimeException(e); } @@ -704,7 +701,7 @@ private void close() { Exceptions.swallow(this.stdoutServerSocket::close); Exceptions.swallow(this.stderrServerSocket::close); - Exceptions.swallow(this.podWatch::close); + Exceptions.swallow(this.podInformer::close); Exceptions.swallow(this.executorService::shutdownNow); KubePortManagerSingleton.getInstance().offer(stdoutLocalPort); @@ -717,7 +714,7 @@ private int getReturnCode() { if (exitCodeFuture.isDone()) { try { return exitCodeFuture.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { throw new RuntimeException( prependPodInfo("Cannot find pod %s : %s while trying to retrieve exit code. This probably means the pod was not correctly created.", podDefinition.getMetadata().getNamespace(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 2ba00c266276..39e46f6b6afd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -39,7 +39,7 @@ public class KubeProcessFactory implements ProcessFactory { public static final String NORMALISE_STEP = "normalise"; public static final String CUSTOM_STEP = "custom"; - private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");; + private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+"); private static final String JOB_LABEL_KEY = "job_id"; private static final String ATTEMPT_LABEL_KEY = "attempt_id"; private static final String WORKER_POD_LABEL_KEY = "airbyte"; diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index ce87d899d1c3..f76559ed0922 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -18,8 +18,10 @@ import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerException; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import java.io.IOException; import java.net.Inet4Address; import java.net.ServerSocket; @@ -297,11 +299,17 @@ public void testDeletingPodImmediatelyAfterCompletion() throws Exception { .filter(p -> p.getMetadata() != null && p.getMetadata().getLabels() != null) .filter(p -> p.getMetadata().getLabels().containsKey("uuid") && p.getMetadata().getLabels().get("uuid").equals(uuid.toString())) .collect(Collectors.toList()).get(0); - fabricClient.resource(pod).watch(new ExitCodeWatcher( + final SharedIndexInformer podInformer = fabricClient.pods() + .inNamespace(pod.getMetadata().getNamespace()) + .withName(pod.getMetadata().getName()) + .inform(); + podInformer.addEventHandler(new ExitCodeWatcher( + pod.getMetadata().getName(), + pod.getMetadata().getNamespace(), exitCode -> { fabricClient.pods().delete(pod); }, - exception -> {})); + () -> {})); process.waitFor();