Skip to content

Commit

Permalink
Revert "Revert Convert the server to micronaut" (#21133)
Browse files Browse the repository at this point in the history
* Revert "Revert "Convert the server to micronaut (#19194)" (#21132)"

This reverts commit 31c65f8.

* Fix the cors

* Fix cloud
  • Loading branch information
benmoriceau authored Jan 9, 2023
1 parent bc93198 commit 9adb63c
Show file tree
Hide file tree
Showing 126 changed files with 1,585 additions and 1,612 deletions.
5 changes: 2 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ DATABASE_PORT=5432
DATABASE_DB=airbyte
# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DB} (do not include the username or password here)
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.26.001

# Airbyte Internal Config Database, defaults to Job Database if empty. Explicitly left empty to mute docker compose warnings.
CONFIG_DATABASE_USER=
CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.23.002

### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
Expand Down Expand Up @@ -115,4 +115,3 @@ OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
AUTO_DETECT_SCHEMA=false

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
* This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all
* open API generators create a separate class per API "root-route". For example, if our API has two
* routes "/v1/First/get" and "/v1/Second/get", OpenAPI generates (essentially) the following files:
*
* <p>
* ApiClient.java, FirstApi.java, SecondApi.java
*
* <p>
* To call the API type-safely, we'd do new FirstApi(new ApiClient()).get() or new SecondApi(new
* ApiClient()).get(), which can get cumbersome if we're interacting with many pieces of the API.
*
* <p>
* This is currently manually maintained. We could look into autogenerating it if needed.
*/
public class AirbyteApiClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
Expand All @@ -30,7 +29,6 @@
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.StreamDescriptor;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
Expand Down Expand Up @@ -62,7 +60,6 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class TemporalClient {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class ApiClientBeanFactory {
private static final int JWT_TTL_MINUTES = 5;

@Singleton
public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Named("apiClient")
public ApiClient apiClient(
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new io.airbyte.api.client.invoker.generated.ApiClient()
return new ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
Expand All @@ -66,7 +68,7 @@ public AirbyteApiClient airbyteApiClient(final ApiClient apiClient) {
}

@Singleton
public SourceApi sourceApi(final ApiClient apiClient) {
public SourceApi sourceApi(@Named("apiClient") final ApiClient apiClient) {
return new SourceApi(apiClient);
}

Expand All @@ -87,7 +89,7 @@ public WorkspaceApi workspaceApi(final ApiClient apiClient) {

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
return HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
Expand All @@ -18,7 +17,6 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,7 +26,6 @@
// todo (cgardens) - we are not getting any value out of instantiating this class. we should just
// use it as statics. not doing it now, because already in the middle of another refactor.
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionHelper {

private final ConfigRepository configRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package io.airbyte.commons.features;

import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class EnvVariableFeatureFlags implements FeatureFlags {

private static final Logger log = LoggerFactory.getLogger(EnvVariableFeatureFlags.class);

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
// Set this value to true to see all messages from the source to destination, set to one second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public File getServerLogFile(final Path workspaceRoot, final WorkerEnvironment w
}
final var cloudLogPath = sanitisePath(APP_LOGGING_CLOUD_PREFIX, getServerLogsRoot(workspaceRoot));
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -95,6 +96,7 @@ public File getSchedulerLogFile(final Path workspaceRoot, final WorkerEnvironmen

final var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + getSchedulerLogsRoot(workspaceRoot);
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -111,6 +113,7 @@ public List<String> getJobLogFile(final WorkerEnvironment workerEnvironment, fin
}

final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, logPath);
createCloudClientIfNull(logConfigs);
return logClient.tailCloudLog(logConfigs, cloudLogPath, LOG_TAIL_SIZE);
}

Expand All @@ -127,6 +130,7 @@ public void deleteLogs(final WorkerEnvironment workerEnvironment, final LogConfi
throw new NotImplementedException("Local log deletes not supported.");
}
final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, Path.of(logPath));
createCloudClientIfNull(logConfigs);
logClient.deleteLogs(logConfigs, cloudLogPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ConfigRepository(final Database database) {
*/
public boolean healthCheck() {
try {
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch());
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch()).stream().count();
} catch (final Exception e) {
LOGGER.error("Health check error: ", e);
return false;
Expand Down Expand Up @@ -294,7 +294,8 @@ private Stream<StandardSourceDefinition> sourceDefQuery(final Optional<UUID> sou
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.source))
.and(sourceDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardSourceDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -356,7 +357,8 @@ private Stream<StandardDestinationDefinition> destDefQuery(final Optional<UUID>
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.destination))
.and(destDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardDestinationDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static Stream<Record4<UUID, String, ActorType, String>> getActorDefiniti
return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION)
.from(ACTOR_DEFINITION)
.join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID))
.fetchStream();
.fetch()
.stream();
}

static void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs, final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private Stream<StandardSyncIdsWithProtocolVersions> findDisabledSyncs(final DSLC
.where(
CONNECTION.UNSUPPORTED_PROTOCOL_VERSION.eq(true).and(
(actorType == ActorType.DESTINATION ? destDef : sourceDef).ID.eq(actorDefId)))
.fetchStream()
.fetch()
.stream()
.map(r -> new StandardSyncIdsWithProtocolVersions(
r.get(CONNECTION.ID),
r.get(sourceDef.ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(fina
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
.from(CONNECTION)
.where(CONNECTION.ID.in(standardSync.stream().map(StandardSync::getConnectionId).toList()))
.fetchStream())
.fetch())
.stream()
.map(r -> new StandardSyncProtocolVersionFlag(r.get(CONNECTION.ID), r.get(CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)))
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -30,6 +31,7 @@
// scheduler:persistence in order to get workspace ids for configs (e.g. source). Our options are to
// split this helper by database or put it in a new module.
@SuppressWarnings("PMD.AvoidCatchingThrowable")
@Singleton
public class WorkspaceHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkspaceHelper.class);
Expand Down Expand Up @@ -106,7 +108,7 @@ public UUID load(@NonNull final Long jobId) throws ConfigNotFoundException, IOEx
* There are generally two kinds of helper methods present here. The first kind propagate exceptions
* for the method backing the cache. The second ignores them. The former is meant to be used with
* proper api calls, while the latter is meant to be use with asserts and precondtions checks.
*
* <p>
* In API calls, distinguishing between various exceptions helps return the correct status code.
*/

Expand Down
3 changes: 2 additions & 1 deletion airbyte-proxy/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ function start_container () {
}

function start_container_with_proxy () {
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME airbyte/proxy:$VERSION"
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME
airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
wait_for_docker;
Expand Down
23 changes: 20 additions & 3 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ plugins {

configurations.all {
exclude group: 'io.micronaut.jaxrs'
exclude group: 'io.micronaut.sql'
}

dependencies {
Expand All @@ -25,7 +24,6 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')

implementation libs.flyway.core
implementation 'com.github.slugify:slugify:2.4'
implementation 'commons-cli:commons-cli:1.4'
implementation libs.temporal.sdk
Expand All @@ -38,12 +36,28 @@ dependencies {
implementation 'org.glassfish.jersey.media:jersey-media-json-jackson'
implementation 'org.glassfish.jersey.ext:jersey-bean-validation'
implementation 'org.quartz-scheduler:quartz:2.3.2'
implementation 'io.sentry:sentry:6.3.1'
implementation 'io.swagger:swagger-annotations:1.6.2'

annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor
annotationProcessor libs.micronaut.jaxrs.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut
implementation libs.micronaut.jaxrs.server

// Ensure that the versions defined in deps.toml are used
// instead of versions from transitive dependencies
implementation(libs.flyway.core) {
force = true
}

testImplementation project(':airbyte-test-utils')
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers.postgresql
testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.1'
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

// we want to be able to access the generated db files from config/init when we build the server docker image.
Expand All @@ -57,9 +71,10 @@ task copySeed(type: Copy, dependsOn: [project(':airbyte-config:init').processRes
test.dependsOn(project.tasks.copySeed)
assemble.dependsOn(project.tasks.copySeed)

mainClassName = 'io.airbyte.server.ServerApp'
mainClassName = 'io.airbyte.server.Application'

application {
applicationName = project.name
mainClass = mainClassName
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}
Expand Down Expand Up @@ -87,6 +102,8 @@ run {
environment "AIRBYTE_VERSION", env.VERSION
environment "AIRBYTE_ROLE", System.getenv('AIRBYTE_ROLE')
environment "TEMPORAL_HOST", "localhost:7233"

environment 'MICRONAUT_ENVIRONMENTS', 'control-plane'
}

// produce reproducible archives
Expand Down
15 changes: 15 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server;

import io.micronaut.runtime.Micronaut;

public class Application {

public static void main(final String[] args) {
Micronaut.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server;

import io.airbyte.db.check.DatabaseCheckException;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.discovery.event.ServiceReadyEvent;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DatabaseEventListener implements ApplicationEventListener<ServiceReadyEvent> {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final DatabaseMigrationCheck configsMigrationCheck;

private final DatabaseMigrationCheck jobsMigrationCheck;

public DatabaseEventListener(
@Named("configsDatabaseMigrationCheck") final DatabaseMigrationCheck configsMigrationCheck,
@Named("jobsDatabaseMigrationCheck") final DatabaseMigrationCheck jobsMigrationCheck) {
this.configsMigrationCheck = configsMigrationCheck;
this.jobsMigrationCheck = jobsMigrationCheck;
}

@Override
public void onApplicationEvent(final ServiceReadyEvent event) {
log.info("Checking configs database flyway migration version...");
try {
configsMigrationCheck.check();
} catch (final DatabaseCheckException e) {
throw new RuntimeException(e);
}

log.info("Checking jobs database flyway migration version...");
try {
jobsMigrationCheck.check();
} catch (final DatabaseCheckException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 9adb63c

Please sign in to comment.