Skip to content

Commit

Permalink
convert container-orchestrator to micronaut (#19396)
Browse files Browse the repository at this point in the history
* wip; add micronaut

* add additional json deserializer methods

* wip; converting to micronaut

* misc cleanup

* wip; broken

* wip; still broken

* wip

* formatting

* minor code cleanup; no actual changes

* wip; still broken

* removed commented out code; no longer broken

* wip; clean-up micronaut code

* cleanup; format

* fix pmd issues

* remove unused file

* init ApplicationTest

* edited link (#19444)

* move 'Example values' into intl (#19446)

* Revert "Update action.yml (#19416)" (#19450)

This reverts commit 78fb528.

* Notifications Workflow (#18735)

* notification workflow

* Bmoric/remove unused code (#19188)

* Tmp

* Move when the deletion is performed

* Re-enable disable test

* PR comments

* Use cancel

* rename

* Fix test and version check position

* remove unused temporal deletion code

* Remove false todo

* Rm repeated test

* Rm unused import

* Make sure that long running activity are not retried (#19452)

* Parse list of dicts in json_schema_helper.find_nodes() (#19386)

* Get test on nested list/dict passing - use index to query next object for list

* Fix flakecheck

* Test that get_node provides correct value

* Improve test and test cases

* Rewrite method for better comprehension

* Add test for base-level key. Rewrite method for comprehension and handling this case

* adding tests

* fix test

* formatting

* remove unused dependencies

* add missing test resource

* format

* add missing test resource (real)

* format

* add back protocol-models dep

* format

* pr feedback; log stacktrace

Co-authored-by: Sophia Wiley <106352739+sophia-wiley@users.noreply.github.com>
Co-authored-by: Lake Mossman <lake@airbyte.io>
Co-authored-by: Topher Lubaway <asimplechris@gmail.com>
Co-authored-by: Anne <102554163+alovew@users.noreply.github.com>
Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
Co-authored-by: Ella Rohm-Ensing <erohmensing@gmail.com>
  • Loading branch information
7 people authored Nov 30, 2022
1 parent 77112b0 commit 1cdfdbe
Show file tree
Hide file tree
Showing 35 changed files with 1,258 additions and 628 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void create(final Map<String, String> allLabels,
copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap);
}

public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
private static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
final List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());

// copy this file last to indicate that the copy has completed
Expand Down
24 changes: 24 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ public static <T> T deserialize(final String jsonString, final Class<T> klass) {
}
}

public static <T> T deserialize(final String jsonString, final TypeReference<T> valueTypeRef) {
try {
return OBJECT_MAPPER.readValue(jsonString, valueTypeRef);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static <T> T deserialize(final File file, final Class<T> klass) {
try {
return OBJECT_MAPPER.readValue(file, klass);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static <T> T deserialize(final File file, final TypeReference<T> valueTypeRef) {
try {
return OBJECT_MAPPER.readValue(file, valueTypeRef);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static <T> T convertValue(final Object object, final Class<T> klass) {
return OBJECT_MAPPER.convertValue(object, klass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public class MdcScope implements AutoCloseable {
public MdcScope(final Map<String, String> keyValuesToAdd) {
originalContextMap = MDC.getCopyOfContextMap();

keyValuesToAdd.forEach(
(key, value) -> MDC.put(key, value));
keyValuesToAdd.forEach(MDC::put);
}

@Override
Expand Down
23 changes: 12 additions & 11 deletions airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ plugins {
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'org.apache.commons:commons-text:1.10.0'
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'
implementation 'org.eclipse.jetty:jetty-servlet:9.4.31.v20200723'
implementation libs.bundles.datadog

implementation project(':airbyte-api')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-config:init')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-worker-models')

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor

testImplementation libs.bundles.micronaut.test
testImplementation 'org.mockito:mockito-inline:2.13.0'
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers
Expand All @@ -32,11 +35,9 @@ dependencies {
testImplementation project(':airbyte-commons-docker')
}


mainClassName = 'io.airbyte.container_orchestrator.ContainerOrchestratorApp'

application {
mainClass = mainClassName
applicationName = "airbyte-container-orchestrator"
mainClass = "io.airbyte.container_orchestrator.Application"
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.container_orchestrator;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.logging.LoggingHelper;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.container_orchestrator.orchestrator.JobOrchestrator;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.micronaut.runtime.Micronaut;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Entrypoint for the application responsible for launching containers and handling all message
* passing for replication, normalization, and dbt. Also, the current version relies on a heartbeat
* from a Temporal worker. This will also be removed in the future so this can run fully async.
* <p>
* This application retrieves most of its configuration from copied files from the calling Temporal
* worker.
* <p>
* This app uses default logging which is directly captured by the calling Temporal worker. In the
* future this will need to independently interact with cloud storage.
*/
@SuppressWarnings({"PMD.AvoidCatchingThrowable", "PMD.DoNotTerminateVM", "PMD.AvoidFieldNameMatchingTypeName"})
@Singleton
public class Application {

public static void main(final String[] args) {
// To mimic previous behavior, assume an exit code of 1 unless Application.run returns otherwise.
var exitCode = 1;
try (final var ctx = Micronaut.run(Application.class, args)) {
exitCode = ctx.getBean(Application.class).run();
} catch (final Throwable t) {
log.error("could not run {}", t.getMessage(), t);
} finally {
// this mimics the pre-micronaut code, unsure if there is a better way in micronaut to ensure a
// non-zero exit code
System.exit(exitCode);
}
}

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final String application;
private final JobOrchestrator<?> jobOrchestrator;
private final AsyncStateManager asyncStateManager;

public Application(@Named("application") final String application,
final JobOrchestrator<?> jobOrchestrator,
final AsyncStateManager asyncStateManager) {
this.application = application;
this.jobOrchestrator = jobOrchestrator;
this.asyncStateManager = asyncStateManager;
}

/**
* Configures logging/mdc scope, and creates all objects necessary to handle state updates.
* <p>
* Handles state updates (including writing failures) and running the job orchestrator. As much of
* the initialization as possible should go in here, so it's logged properly and the state storage
* is updated appropriately.
*/
@VisibleForTesting
int run() {
// set mdc scope for the remaining execution
try (final var mdcScope = new MdcScope.Builder()
.setLogPrefix(application)
.setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND)
.build()) {

asyncStateManager.write(AsyncKubePodStatus.INITIALIZING);
asyncStateManager.write(AsyncKubePodStatus.RUNNING);
asyncStateManager.write(AsyncKubePodStatus.SUCCEEDED, jobOrchestrator.runJob().orElse(""));
} catch (final Throwable t) {
log.error("Killing orchestrator because of an Exception", t);
asyncStateManager.write(AsyncKubePodStatus.FAILED);
return 1;
}

return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,98 @@

package io.airbyte.container_orchestrator;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.storage.DocumentStoreClient;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The state manager writes the "truth" for states of the async pod process. If the store isn't
* updated by the underlying pod, it will appear as failed.
*
* <p>
* It doesn't have a single value for a state. Instead, in a location on cloud storage or disk, it
* writes every state it's encountered.
*/
public interface AsyncStateManager {
@Singleton
public class AsyncStateManager {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final List<AsyncKubePodStatus> STATUS_CHECK_ORDER = List.of(
// terminal states first
AsyncKubePodStatus.FAILED,
AsyncKubePodStatus.SUCCEEDED,
// then check in progress state
AsyncKubePodStatus.RUNNING,
// then check for initialization state
AsyncKubePodStatus.INITIALIZING);

private final DocumentStoreClient documentStoreClient;
private final KubePodInfo kubePodInfo;

public AsyncStateManager(final DocumentStoreClient documentStoreClient, final KubePodInfo kubePodInfo) {
this.documentStoreClient = documentStoreClient;
this.kubePodInfo = kubePodInfo;
}

/**
* Writes a file containing a string value to a location designated by the input status.
* Writes an empty file to a location designated by the input status.
*/
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value);
public void write(final AsyncKubePodStatus status, final String value) {
final var key = getDocumentStoreKey(status);
log.info("Writing async status {} for {}...", status, kubePodInfo);
documentStoreClient.write(key, value);
}

/**
* Writes an empty file to a location designated by the input status.
* Writes a file containing a string value to a location designated by the input status.
*/
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status);
public void write(final AsyncKubePodStatus status) {
write(status, "");
}

/**
* Interprets the state given all written state messages for the pod.
* <p>
* Checks terminal states first, then running, then initialized. Defaults to not started.
* <p>
* The order matters here!
*/
AsyncKubePodStatus getStatus(final KubePodInfo kubePodInfo);
public AsyncKubePodStatus getStatus() {
return STATUS_CHECK_ORDER.stream()
.filter(this::statusFileExists)
.findFirst()
.orElse(AsyncKubePodStatus.NOT_STARTED);
}

/**
* @return the output stored in the success file. This can be an empty string.
* @throws IllegalArgumentException if no success file exists
*/
String getOutput(final KubePodInfo kubePodInfo) throws IllegalArgumentException;
public String getOutput() throws IllegalArgumentException {
final var key = getDocumentStoreKey(AsyncKubePodStatus.SUCCEEDED);
final var output = documentStoreClient.read(key);

return output.orElseThrow(() -> new IllegalArgumentException("Expected to retrieve output from a successfully completed pod!"));

}

/**
* IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
* version is deployed!
*/
@VisibleForTesting
String getDocumentStoreKey(final AsyncKubePodStatus status) {
return kubePodInfo.namespace() + "/" + kubePodInfo.name() + "/" + status.name();
}

private boolean statusFileExists(final AsyncKubePodStatus status) {
final var key = getDocumentStoreKey(status);
return documentStoreClient.read(key).isPresent();
}

}
Loading

0 comments on commit 1cdfdbe

Please sign in to comment.