Skip to content

Commit

Permalink
🐛 Connector exit code should still be detected if resourceVersion is …
Browse files Browse the repository at this point in the history
…updated (#11861)
  • Loading branch information
edgao authored and suhomud committed May 23, 2022
1 parent ffeaa29 commit 721c4cd
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 73 deletions.
2 changes: 1 addition & 1 deletion airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

dependencies {
implementation 'io.fabric8:kubernetes-client:5.3.1'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'org.apache.commons:commons-text:1.9'
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public ContainerOrchestratorApp(
}

private void configureLogging() {
for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
for (final String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
if (envMap.containsKey(envVar)) {
System.setProperty(envVar, envMap.get(envVar));
}
}

// make sure the new configuration is picked up
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
ctx.reconfigure();

final var logClient = LogClientSingleton.getInstance();
Expand Down Expand Up @@ -119,7 +119,7 @@ private void runInternal(final DefaultAsyncStateManager asyncStateManager) {

// required to kill clients with thread pools
System.exit(0);
} catch (Throwable t) {
} catch (final Throwable t) {
asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED);
System.exit(1);
}
Expand Down Expand Up @@ -177,7 +177,7 @@ public static void main(final String[] args) {

final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo);
app.run();
} catch (Throwable t) {
} catch (final Throwable t) {
log.error("Orchestrator failed...", t);
// otherwise the pod hangs on closing
System.exit(1);
Expand Down Expand Up @@ -212,7 +212,11 @@ private static ProcessFactory getProcessBuilderFactory(final Configs configs, fi
// exposed)
KubePortManagerSingleton.init(OrchestratorConstants.PORTS);

return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
return new KubeProcessFactory(workerConfigs,
configs.getJobKubeNamespace(),
fabricClient,
kubeHeartbeatUrl,
false);
} else {
return new DockerProcessFactory(
workerConfigs,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-scheduler/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

dependencies {
implementation 'io.fabric8:kubernetes-client:5.9.0'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.temporal:temporal-sdk:1.8.1'

implementation project(':airbyte-analytics')
Expand Down
2 changes: 1 addition & 1 deletion airbyte-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies {
implementation project(':airbyte-api')
implementation project(':airbyte-container-orchestrator')

implementation 'io.fabric8:kubernetes-client:5.3.1'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'org.testcontainers:testcontainers:1.15.3'

acceptanceTestsImplementation project(':airbyte-api')
Expand Down
2 changes: 1 addition & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ configurations {
}

dependencies {
implementation 'io.fabric8:kubernetes-client:5.3.1'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ private static ProcessFactory getJobProcessFactory(final Configs configs, final
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
return new KubeProcessFactory(workerConfigs,
configs.getJobKubeNamespace(),
fabricClient,
kubeHeartbeatUrl,
false);
} else {
return new DockerProcessFactory(
workerConfigs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,12 @@ public void create(final Map<String, String> allLabels,
.createOrReplace(podToCreate);

log.info("Waiting for pod to be running...");
try {
kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace())
.withName(kubePodInfo.name())
.waitUntilCondition(p -> {
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
}, 5, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace())
.withName(kubePodInfo.name())
.waitUntilCondition(p -> {
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
}, 5, TimeUnit.MINUTES);

final var podStatus = kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import com.google.common.collect.MoreCollectors;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -20,52 +21,111 @@
* be able to retrieve the exit code).
*/
@Slf4j
public class ExitCodeWatcher implements Watcher<Pod> {
public class ExitCodeWatcher implements ResourceEventHandler<Pod> {

private final String podName;
private final String podNamespace;
private final Consumer<Integer> onExitCode;
private final Consumer<WatcherException> onWatchFailure;
private boolean exitCodeRetrieved = false;
private final Runnable onWatchFailure;
/**
* This flag is set to false when we either (a) find the pod's exit code, or (b) when the pod is
* deleted. This is so that we call exactly one of onExitCode and onWatchFailure, and we make that
* call exactly once.
* <p>
* We rely on this class being side-effect-free, outside of persistExitCode() and persistFailure().
* Those two methods use compareAndSet to prevent race conditions. Everywhere else, we can be sloppy
* because we won't actually emit any output.
*/
private final AtomicBoolean active = new AtomicBoolean(true);

/**
*
* @param onExitCode callback used to store the exit code
* @param onWatchFailure callback that's triggered when the watch fails. should be some failed exit
* code.
*/
public ExitCodeWatcher(final Consumer<Integer> onExitCode, final Consumer<WatcherException> onWatchFailure) {
public ExitCodeWatcher(final String podName,
final String podNamespace,
final Consumer<Integer> onExitCode,
final Runnable onWatchFailure) {
this.podName = podName;
this.podNamespace = podNamespace;
this.onExitCode = onExitCode;
this.onWatchFailure = onWatchFailure;
}

@Override
public void eventReceived(Action action, Pod resource) {
try {
if (!exitCodeRetrieved && KubePodResourceHelper.isTerminal(resource)) {
final ContainerStatus mainContainerStatus = resource.getStatus().getContainerStatuses()
.stream()
.filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME))
.collect(MoreCollectors.onlyElement());
public void onAdd(final Pod pod) {
if (shouldCheckPod(pod)) {
final Optional<Integer> exitCode = getExitCode(pod);
exitCode.ifPresent(this::persistExitCode);
}
}

if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) {
final int exitCode = mainContainerStatus.getState().getTerminated().getExitCode();
log.info("Processing event with exit code " + exitCode + " for pod: " + resource.getMetadata().getName());
onExitCode.accept(exitCode);
exitCodeRetrieved = true;
}
}
} catch (Exception e) {
String podName = "<unknown_name>";
if (resource.getMetadata() != null) {
podName = resource.getMetadata().getName();
@Override
public void onUpdate(final Pod oldPod, final Pod newPod) {
if (shouldCheckPod(newPod)) {
final Optional<Integer> exitCode = getExitCode(newPod);
exitCode.ifPresent(this::persistExitCode);
}
}

@Override
public void onDelete(final Pod pod, final boolean deletedFinalStateUnknown) {
if (shouldCheckPod(pod)) {
if (!deletedFinalStateUnknown) {
final Optional<Integer> exitCode = getExitCode(pod);
exitCode.ifPresentOrElse(
this::persistExitCode,
this::persistFailure);
} else {
persistFailure();
}
}
}

/**
* Informers without an OperationContext will monitor ALL pods in ALL namespaces; filter down to the
* one pod that we care about. If it's still running, then we obviously can't fetch its exit code.
* <p>
* Also, if we've already found the exit code, or the pod has been deleted, then stop doing anything
* at all.
*/
private boolean shouldCheckPod(final Pod pod) {
final boolean correctName = podName.equals(pod.getMetadata().getName());
final boolean correctNamespace = podNamespace.equals(pod.getMetadata().getNamespace());
return active.get() && correctName && correctNamespace && KubePodResourceHelper.isTerminal(pod);
}

private Optional<Integer> getExitCode(final Pod pod) {
final ContainerStatus mainContainerStatus = pod.getStatus().getContainerStatuses()
.stream()
.filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME))
.collect(MoreCollectors.onlyElement());

log.error("ExitCodeWatcher event handling failed for pod: " + podName, e);
if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) {
return Optional.of(mainContainerStatus.getState().getTerminated().getExitCode());
}
return Optional.empty();
}

@Override
public void onClose(WatcherException cause) {
onWatchFailure.accept(cause);
private void persistExitCode(final int exitCode) {
if (active.compareAndSet(true, false)) {
log.info("Received exit code {} for pod {}", exitCode, podName);
onExitCode.accept(exitCode);
}
}

private void persistFailure() {
if (active.compareAndSet(true, false)) {
// Log an error. The pod is completely gone, and we have no way to retrieve its exit code
// In theory, this shouldn't really happen. From
// https://pkg.go.dev/k8s.io/client-go/tools/cache#DeletedFinalStateUnknown:
// "in the case where an object was deleted but the watch deletion event was missed while
// disconnected from apiserver"
// But we have this handler just in case.
log.error("Pod {} was deleted before we could retrieve its exit code", podName);
onWatchFailure.run();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -147,7 +147,7 @@ public class KubePodProcess extends Process implements KubePod {
private final int stderrLocalPort;
private final ExecutorService executorService;
private final CompletableFuture<Integer> exitCodeFuture;
private final Watch podWatch;
private final SharedIndexInformer<Pod> podInformer;

public static String getPodIP(final KubernetesClient client, final String podName, final String podNamespace) {
final var pod = client.pods().inNamespace(podNamespace).withName(podName).get();
Expand Down Expand Up @@ -305,16 +305,7 @@ private static void waitForInitPodToRun(final KubernetesClient client, final Pod
LOGGER.info("Waiting for init container to be ready before copying files...");
final PodResource<Pod> pod =
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName());
try {
pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
LOGGER.error("Init pod not found after 5 minutes");
LOGGER.error("Pod search executed in namespace {} for pod name {} resulted in: {}",
podDefinition.getMetadata().getNamespace(),
podDefinition.getMetadata().getName(),
pod.get().toString());
throw e;
}
pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
LOGGER.info("Init container present..");
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName())
.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().get(0).getState().getRunning() != null, 5, TimeUnit.MINUTES);
Expand Down Expand Up @@ -528,15 +519,21 @@ public KubePodProcess(final boolean isOrchestrator,
// This is safe only because we are blocking the init pod until we copy files onto it.
// See the ExitCodeWatcher comments for more info.
exitCodeFuture = new CompletableFuture<>();
podWatch = fabricClient.resource(podDefinition).watch(new ExitCodeWatcher(
podInformer = fabricClient.pods()
.inNamespace(namespace)
.withName(pod.getMetadata().getName())
.inform();
podInformer.addEventHandler(new ExitCodeWatcher(
pod.getMetadata().getName(),
namespace,
exitCodeFuture::complete,
exception -> {
() -> {
LOGGER.info(prependPodInfo(
String.format(
"Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled. Error: %s",
KILLED_EXIT_CODE,
exception.getMessage()),
namespace, podName));
"Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled.",
KILLED_EXIT_CODE),
namespace,
podName));

exitCodeFuture.complete(KILLED_EXIT_CODE);
}));
Expand Down Expand Up @@ -631,7 +628,7 @@ public InputStream getErrorStream() {
public int waitFor() throws InterruptedException {
try {
exitCodeFuture.get();
} catch (ExecutionException e) {
} catch (final ExecutionException e) {
throw new RuntimeException(e);
}

Expand Down Expand Up @@ -704,7 +701,7 @@ private void close() {

Exceptions.swallow(this.stdoutServerSocket::close);
Exceptions.swallow(this.stderrServerSocket::close);
Exceptions.swallow(this.podWatch::close);
Exceptions.swallow(this.podInformer::close);
Exceptions.swallow(this.executorService::shutdownNow);

KubePortManagerSingleton.getInstance().offer(stdoutLocalPort);
Expand All @@ -717,7 +714,7 @@ private int getReturnCode() {
if (exitCodeFuture.isDone()) {
try {
return exitCodeFuture.get();
} catch (InterruptedException | ExecutionException e) {
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(
prependPodInfo("Cannot find pod %s : %s while trying to retrieve exit code. This probably means the pod was not correctly created.",
podDefinition.getMetadata().getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class KubeProcessFactory implements ProcessFactory {
public static final String NORMALISE_STEP = "normalise";
public static final String CUSTOM_STEP = "custom";

private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");;
private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");
private static final String JOB_LABEL_KEY = "job_id";
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
private static final String WORKER_POD_LABEL_KEY = "airbyte";
Expand Down
Loading

0 comments on commit 721c4cd

Please sign in to comment.