Skip to content

Commit

Permalink
add initContainer to container-orchestrator pod definition (#19088)
Browse files Browse the repository at this point in the history
* init attempt at initcontainer

* wait for init container to be up instead of main container

* copy files to init container

* Revert "Bmoric/extract webbackend api (#18988)"

This reverts commit b05a5b2.

* Revert "Revert "Bmoric/extract webbackend api (#18988)""

This reverts commit ebef6e4.

* block on initContainer status; cleanup init script

* add log messages

* add quotes to log messages

* pr feedback, add comment to bash script
  • Loading branch information
colesnodgrass authored Nov 10, 2022
1 parent 9125ccb commit cc93c46
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
* application. Unlike {@link KubePodProcess} there is no heartbeat mechanism that requires the
* launching pod and the launched pod to co-exist for the duration of execution for the launched
* pod.
*
* <p>
* Instead, this process creates the pod and interacts with a document store on cloud storage to
* understand the state of the created pod.
*
* <p>
* The document store is considered to be the truth when retrieving the status for an async pod
* process. If the store isn't updated by the underlying pod, it will appear as failed.
*/
Expand Down Expand Up @@ -190,10 +190,12 @@ public boolean hasExited() {
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
// implementation copied from Process.java since this isn't a real Process
long remainingNanos = unit.toNanos(timeout);
if (hasExited())
if (hasExited()) {
return true;
if (timeout <= 0)
}
if (timeout <= 0) {
return false;
}

final long deadline = System.nanoTime() + remainingNanos;
do {
Expand All @@ -202,8 +204,9 @@ public boolean waitFor(final long timeout, final TimeUnit unit) throws Interrupt
// We are waiting polling every 500ms for status. The trade-off here is between how often
// we poll our status storage (GCS) and how reactive we are to detect that a process is done.
Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 500));
if (hasExited())
if (hasExited()) {
return true;
}
remainingNanos = deadline - System.nanoTime();
} while (remainingNanos > 0);

Expand Down Expand Up @@ -236,7 +239,7 @@ private boolean checkStatus(final AsyncKubePodStatus status) {

/**
* Checks terminal states first, then running, then initialized. Defaults to not started.
*
* <p>
* The order matters here!
*/
public AsyncKubePodStatus getDocStoreStatus() {
Expand Down Expand Up @@ -298,6 +301,33 @@ public void create(final Map<String, String> allLabels,
final List<ContainerPort> containerPorts = KubePodProcess.createContainerPortList(portMap);
containerPorts.add(new ContainerPort(serverPort, null, null, null, null));

final var initContainer = new ContainerBuilder()
.withName(KubePodProcess.INIT_CONTAINER_NAME)
.withImage("busybox:1.35")
.withVolumeMounts(volumeMounts)
.withCommand(List.of(
"sh",
"-c",
String.format("""
i=0
until [ $i -gt 60 ]
do
echo "$i - waiting for config file transfer to complete..."
# check if the upload-complete file exists, if so exit without error
if [ -f "%s/%s" ]; then
exit 0
fi
i=$((i+1))
sleep 1
done
echo "config files did not transfer in time"
# no upload-complete file was created in time, exit with error
exit 1
""",
KubePodProcess.CONFIG_DIR,
KubePodProcess.SUCCESS_FILE_NAME)))
.build();

final var mainContainer = new ContainerBuilder()
.withName(KubePodProcess.MAIN_CONTAINER_NAME)
.withImage(kubePodInfo.mainContainerInfo().image())
Expand All @@ -316,9 +346,11 @@ public void create(final Map<String, String> allLabels,
.withLabels(allLabels)
.endMetadata()
.withNewSpec()
.withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true)
.withServiceAccount("airbyte-admin")
.withAutomountServiceAccountToken(true)
.withRestartPolicy("Never")
.withContainers(mainContainer)
.withInitContainers(initContainer)
.withVolumes(volumes)
.endSpec()
.build();
Expand All @@ -332,9 +364,9 @@ public void create(final Map<String, String> allLabels,
kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace())
.withName(kubePodInfo.name())
.waitUntilCondition(p -> {
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
}, 5, TimeUnit.MINUTES);
.waitUntilCondition(p -> !p.getStatus().getInitContainerStatuses().isEmpty()
&& p.getStatus().getInitContainerStatuses().get(0).getState().getWaiting() == null,
5, TimeUnit.MINUTES);

final var podStatus = kubernetesClient.pods()
.inNamespace(kubePodInfo.namespace())
Expand All @@ -343,7 +375,7 @@ public void create(final Map<String, String> allLabels,
.getStatus();

final var containerState = podStatus
.getContainerStatuses()
.getInitContainerStatuses()
.get(0)
.getState();

Expand Down Expand Up @@ -378,7 +410,7 @@ public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, fina
// several issues with copying files. See https://github.com/airbytehq/airbyte/issues/8643 for
// details.
final String command = String.format("kubectl cp %s %s/%s:%s -c %s", tmpFile, podDefinition.getMetadata().getNamespace(),
podDefinition.getMetadata().getName(), containerPath, "main");
podDefinition.getMetadata().getName(), containerPath, KubePodProcess.INIT_CONTAINER_NAME);
log.info(command);

proc = Runtime.getRuntime().exec(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
* parent process starting a Kube Pod Process needs to exist within the Kube networking space. This
* is so the parent process can forward data into the child's stdin and read the child's stdout and
* stderr streams and copy configuration files over.
*
* <p>
* This is made possible by:
* <ul>
* <li>1) An init container that creates 3 named pipes corresponding to stdin, stdout and std err on
Expand All @@ -91,7 +91,7 @@
* </ul>
* The docker image used for this pod process must expose a AIRBYTE_ENTRYPOINT which contains the
* entrypoint we will wrap when creating the main container in the pod.
*
* <p>
* See the constructor for more information.
*/

Expand All @@ -104,7 +104,7 @@ public class KubePodProcess extends Process implements KubePod {
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);

public static final String MAIN_CONTAINER_NAME = "main";
private static final String INIT_CONTAINER_NAME = "init";
public static final String INIT_CONTAINER_NAME = "init";
private static final String DEFAULT_MEMORY_REQUEST = "25Mi";
private static final String DEFAULT_MEMORY_LIMIT = "50Mi";
private static final String DEFAULT_CPU_REQUEST = "0.1";
Expand Down Expand Up @@ -701,7 +701,7 @@ public KubePodInfo getInfo() {

/**
* Close all open resource in the opposite order of resource creation.
*
* <p>
* Null checks exist because certain local Kube clusters (e.g. Docker for Desktop) back this
* implementation with OS processes and resources, which are automatically reaped by the OS.
*/
Expand Down

0 comments on commit cc93c46

Please sign in to comment.