diff --git a/.env b/.env index 41c8d7eec145..dde8138871be 100644 --- a/.env +++ b/.env @@ -8,12 +8,14 @@ DATABASE_PORT=5432 DATABASE_DB=airbyte # translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} (do not include the username or password here) DATABASE_URL=jdbc:postgresql://db:5432/airbyte +JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 # Airbyte Internal Config Database, default to reuse the Job Database when they are empty # Usually you do not need to set them; they are explicitly left empty to mute docker compose warnings CONFIG_DATABASE_USER= CONFIG_DATABASE_PASSWORD= CONFIG_DATABASE_URL= +CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001 RUN_DATABASE_MIGRATION_ON_STARTUP=true diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index c275b2d57770..7d22d1bb62a1 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -262,7 +262,7 @@ jobs: run: ./tools/bin/acceptance_test.sh - name: Automatic Migration Acceptance Test - run: MIGRATION_TEST_VERSION=$(grep VERSION .env | tr -d "VERSION=") SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i + run: SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:automaticMigrationAcceptanceTest --scan -i - name: Slack Notification - Failure if: failure() && github.ref == 'refs/heads/master' diff --git a/airbyte-bootloader/build.gradle b/airbyte-bootloader/build.gradle index 4c074fb2e88c..c057ab6d7fac 100644 --- a/airbyte-bootloader/build.gradle +++ b/airbyte-bootloader/build.gradle @@ -21,3 +21,17 @@ application { mainClass = 'io.airbyte.bootloader.BootloaderApp' applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0'] } + +task copyGeneratedTar(type: Copy) { + dependsOn copyDocker + dependsOn distTar + + from('build/distributions') { + include 'airbyte-bootloader-*.tar' + } + into 'build/docker/bin' +} + +Task dockerBuildTask = getDockerBuildTask("bootloader", "$project.projectDir") +dockerBuildTask.dependsOn(copyGeneratedTar) +assemble.dependsOn(dockerBuildTask) diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index bf035bc61840..346a3537d27f 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -57,37 +57,39 @@ public BootloaderApp() { } public void load() throws Exception { - final Database configDatabase = new ConfigsDatabaseInstance( - configs.getConfigDatabaseUser(), - configs.getConfigDatabasePassword(), - configs.getConfigDatabaseUrl()) - .getAndInitialize(); - final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); - final ConfigRepository configRepository = - new ConfigRepository(configPersistence.withValidation(), null, Optional.empty(), Optional.empty()); - createWorkspaceIfNoneExists(configRepository); - LOGGER.info("Set up config database and default workspace.."); - - final Database jobDatabase = new JobsDatabaseInstance( - configs.getDatabaseUser(), - configs.getDatabasePassword(), - configs.getDatabaseUrl()) - .getAndInitialize(); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); - createDeploymentIfNoneExists(jobPersistence); - LOGGER.info("Set up job database and default deployment.."); - - final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); - assertNonBreakingMigration(jobPersistence, currAirbyteVersion); - - runFlywayMigration(configs, configDatabase, jobDatabase); - LOGGER.info("Ran Flyway migrations..."); - - jobPersistence.setVersion(currAirbyteVersion.serialize()); - LOGGER.info("Set version to {}", currAirbyteVersion); - - configPersistence.loadData(YamlSeedConfigPersistence.getDefault()); - LOGGER.info("Loaded seed data..."); + LOGGER.info("Setting up config database and default workspace.."); + + try ( + final Database configDatabase = + new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) + .getAndInitialize(); + final Database jobDatabase = + new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) { + LOGGER.info("Created initial jobs and configs database..."); + + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); + final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); + assertNonBreakingMigration(jobPersistence, currAirbyteVersion); + + runFlywayMigration(configs, configDatabase, jobDatabase); + LOGGER.info("Ran Flyway migrations..."); + + final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); + final ConfigRepository configRepository = + new ConfigRepository(configPersistence.withValidation(), null, Optional.empty(), Optional.empty()); + + createWorkspaceIfNoneExists(configRepository); + LOGGER.info("Default workspace created.."); + + createDeploymentIfNoneExists(jobPersistence); + LOGGER.info("Default deployment created.."); + + jobPersistence.setVersion(currAirbyteVersion.serialize()); + LOGGER.info("Set version to {}", currAirbyteVersion); + + configPersistence.loadData(YamlSeedConfigPersistence.getDefault()); + LOGGER.info("Loaded seed data..."); + } LOGGER.info("Finished bootstrapping Airbyte environment.."); } @@ -129,6 +131,7 @@ private static void createWorkspaceIfNoneExists(final ConfigRepository configRep private static void assertNonBreakingMigration(JobPersistence jobPersistence, AirbyteVersion airbyteVersion) throws IOException { // version in the database when the server main method is called. may be empty if this is the first // time the server is started. + LOGGER.info("Checking illegal upgrade.."); final Optional initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new); if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) { final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt"); @@ -148,10 +151,14 @@ private static void assertNonBreakingMigration(JobPersistence jobPersistence, Ai static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { // means there was no previous version so upgrade even needs to happen. always legal. if (airbyteDatabaseVersion == null) { + LOGGER.info("No previous Airbyte Version set.."); return true; } - final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && airbyteVersion.greaterThan(VERSION_BREAK); + LOGGER.info("Current Airbyte version: {}", airbyteDatabaseVersion); + LOGGER.info("Future Airbyte version: {}", airbyteVersion); + final var futureVersionIsAfterVersionBreak = airbyteVersion.greaterThan(VERSION_BREAK) || airbyteVersion.isDev(); + final var isUpgradingThroughVersionBreak = airbyteDatabaseVersion.lessThan(VERSION_BREAK) && futureVersionIsAfterVersionBreak; return !isUpgradingThroughVersionBreak; } diff --git a/airbyte-server/src/main/resources/banner/attention-banner.txt b/airbyte-bootloader/src/main/resources/banner/attention-banner.txt similarity index 100% rename from airbyte-server/src/main/resources/banner/attention-banner.txt rename to airbyte-bootloader/src/main/resources/banner/attention-banner.txt diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index f4ab8374db8d..9cca6cb67c19 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -55,12 +55,20 @@ public interface Configs { String getDatabaseUrl(); + String getJobsDatabaseMinimumFlywayMigrationVersion(); + + long getJobsDatabaseInitializationTimeoutMs(); + String getConfigDatabaseUser(); String getConfigDatabasePassword(); String getConfigDatabaseUrl(); + String getConfigsDatabaseMinimumFlywayMigrationVersion(); + + long getConfigsDatabaseInitializationTimeoutMs(); + boolean runDatabaseMigrationOnStartup(); // Airbyte Services diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index df15331864ad..a3abbd0e1d4c 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -80,6 +80,10 @@ public class EnvConfigs implements Configs { private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE"; private static final String JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET"; private static final String PUBLISH_METRICS = "PUBLISH_METRICS"; + private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION"; + private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS"; + private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION"; + private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS"; private static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME"; private static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION"; @@ -106,6 +110,7 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1; private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60; private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000; + private static final int DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS = 60 * 1000; public static final long DEFAULT_MAX_SPEC_WORKERS = 5; public static final long DEFAULT_MAX_CHECK_WORKERS = 5; @@ -275,6 +280,16 @@ public String getDatabaseUrl() { return getEnsureEnv(DATABASE_URL); } + @Override + public String getJobsDatabaseMinimumFlywayMigrationVersion() { + return getEnsureEnv(JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION); + } + + @Override + public long getJobsDatabaseInitializationTimeoutMs() { + return getEnvOrDefault(JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS, DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS); + } + @Override public String getConfigDatabaseUser() { // Default to reuse the job database @@ -293,6 +308,16 @@ public String getConfigDatabaseUrl() { return getEnvOrDefault(CONFIG_DATABASE_URL, getDatabaseUrl()); } + @Override + public String getConfigsDatabaseMinimumFlywayMigrationVersion() { + return getEnsureEnv(CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION); + } + + @Override + public long getConfigsDatabaseInitializationTimeoutMs() { + return getEnvOrDefault(JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS, DEFAULT_DATABASE_INTILIZATION_TIMEOUT_MS); + } + @Override public boolean runDatabaseMigrationOnStartup() { return getEnvOrDefault(RUN_DATABASE_MIGRATION_ON_STARTUP, true); diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java index e8cde92455ba..da37d040a9a9 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java @@ -13,8 +13,10 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.StreamingJdbcDatabase; import io.airbyte.db.mongodb.MongoDatabase; +import java.io.IOException; import java.util.Optional; import java.util.function.Function; +import lombok.val; import org.apache.commons.dbcp2.BasicDataSource; import org.jooq.SQLDialect; import org.slf4j.Logger; @@ -23,6 +25,7 @@ public class Databases { private static final Logger LOGGER = LoggerFactory.getLogger(Databases.class); + private static final long DEFAULT_WAIT_MS = 5 * 1000; public static Database createPostgresDatabase(final String username, final String password, final String jdbcConnectionString) { return createDatabase(username, password, jdbcConnectionString, "org.postgresql.Driver", SQLDialect.POSTGRES); @@ -33,25 +36,54 @@ public static Database createPostgresDatabaseWithRetry(final String username, final String jdbcConnectionString, final Function isDbReady) { Database database = null; + while (database == null) { + try { + val infinity = Integer.MAX_VALUE; + database = createPostgresDatabaseWithRetryTimeout(username, password, jdbcConnectionString, isDbReady, infinity); + } catch (IOException e) { + // This should theoretically never happen since we set the timeout to be a very high number. + } + } + + LOGGER.info("Database available!"); + return database; + } + + public static Database createPostgresDatabaseWithRetryTimeout(final String username, + final String password, + final String jdbcConnectionString, + final Function isDbReady, + final long timeoutMs) + throws IOException { + Database database = null; if (jdbcConnectionString == null || jdbcConnectionString.trim().equals("")) { throw new IllegalArgumentException("Using a null or empty jdbc url will hang database creation; aborting."); } + var totalTime = 0; while (database == null) { LOGGER.warn("Waiting for database to become available..."); + if (totalTime >= timeoutMs) { + final var error = String.format("Unable to connection to database at %s..", jdbcConnectionString); + throw new IOException(error); + } try { database = createPostgresDatabase(username, password, jdbcConnectionString); if (!isDbReady.apply(database)) { LOGGER.info("Database is not ready yet. Please wait a moment, it might still be initializing..."); + database.close(); + database = null; - Exceptions.toRuntime(() -> Thread.sleep(5000)); + Exceptions.toRuntime(() -> Thread.sleep(DEFAULT_WAIT_MS)); + totalTime += DEFAULT_WAIT_MS; } } catch (final Exception e) { // Ignore the exception because this likely means that the database server is still initializing. LOGGER.warn("Ignoring exception while trying to request database:", e); database = null; - Exceptions.toRuntime(() -> Thread.sleep(5000)); + Exceptions.toRuntime(() -> Thread.sleep(DEFAULT_WAIT_MS)); + totalTime += DEFAULT_WAIT_MS; } } @@ -175,6 +207,8 @@ private static BasicDataSource createBasicDataSource(final String username, connectionPool.setDriverClassName(driverClassName); connectionPool.setUsername(username); connectionPool.setPassword(password); + connectionPool.setInitialSize(0); + connectionPool.setMaxTotal(5); connectionPool.setUrl(jdbcConnectionString); connectionProperties.ifPresent(connectionPool::setConnectionProperties); return connectionPool; diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java index c3a855fa9e81..f2d81056a1b1 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java @@ -19,6 +19,9 @@ public abstract class BaseDatabaseInstance implements DatabaseInstance { + // Public so classes consuming the getInitialized method have a sense of the time taken. + public static final long DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(BaseDatabaseInstance.class); protected final String username; @@ -56,11 +59,12 @@ protected BaseDatabaseInstance(final String username, @Override public boolean isInitialized() throws IOException { - final Database database = Databases.createPostgresDatabaseWithRetry( + final Database database = Databases.createPostgresDatabaseWithRetryTimeout( username, password, connectionString, - isDatabaseConnected(databaseName)); + isDatabaseConnected(databaseName), + DEFAULT_CONNECTION_TIMEOUT_MS); return new ExceptionWrappingDatabase(database).transaction(ctx -> tableNames.stream().allMatch(tableName -> hasTable(ctx, tableName))); } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java new file mode 100644 index 000000000000..e211e5ecad13 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheck.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance; + +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains reusable methods asserting if a database is ready. + *

+ * This is intended to be used by applications in combination with the bootloader, and the minimum + * migration env vars from {@link io.airbyte.config.Configs}, so application know it is safe to + * start interacting with the database. + *

+ * Both methods here poll every {@link #DEFAULT_POLL_PERIOD_MS} and have configurable timeouts. + */ +public class MinimumFlywayMigrationVersionCheck { + + public static final long DEFAULT_ASSERT_DATABASE_TIMEOUT_MS = 2 * BaseDatabaseInstance.DEFAULT_CONNECTION_TIMEOUT_MS; + + private static final Logger LOGGER = LoggerFactory.getLogger(MinimumFlywayMigrationVersionCheck.class); + private static final long DEFAULT_POLL_PERIOD_MS = 2000; + + /** + * Assert the given database can be connected to. + * + * @param db + * @param timeoutMs + */ + public static void assertDatabase(DatabaseInstance db, long timeoutMs) { + var currWaitingTime = 0; + var initialized = false; + while (!initialized) { + if (currWaitingTime >= timeoutMs) { + throw new RuntimeException("Timeout while connecting to the database.."); + } + + try { + initialized = db.isInitialized(); + } catch (IOException e) { + currWaitingTime += BaseDatabaseInstance.DEFAULT_CONNECTION_TIMEOUT_MS; + } + } + } + + /** + * Assert the given database contains the minimum flyway migrations needed to run the application. + * + * @param migrator + * @param minimumFlywayVersion + * @param timeoutMs + * @throws InterruptedException + */ + public static void assertMigrations(DatabaseMigrator migrator, String minimumFlywayVersion, long timeoutMs) throws InterruptedException { + var currWaitingTime = 0; + var currDatabaseMigrationVersion = migrator.getLatestMigration().getVersion().getVersion(); + + while (currDatabaseMigrationVersion.compareTo(minimumFlywayVersion) < 0) { + if (currWaitingTime >= timeoutMs) { + throw new RuntimeException("Timeout while waiting for database to fulfill minimum flyway migration version.."); + } + + Thread.sleep(DEFAULT_POLL_PERIOD_MS); + currWaitingTime += DEFAULT_POLL_PERIOD_MS; + currDatabaseMigrationVersion = migrator.getLatestMigration().getVersion().getVersion(); + } + } + +} diff --git a/airbyte-db/lib/src/main/resources/init.sql b/airbyte-db/lib/src/main/resources/init.sql index f32874a59ba2..088db3a8e97f 100644 --- a/airbyte-db/lib/src/main/resources/init.sql +++ b/airbyte-db/lib/src/main/resources/init.sql @@ -1,3 +1,8 @@ +-- Default is 100. Give this slightly more to accommodate the multiple setup applications running at the start. +ALTER SYSTEM +SET +max_connections = 150; + CREATE DATABASE airbyte; diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheckTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheckTest.java new file mode 100644 index 000000000000..7ce8c0b62c61 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/MinimumFlywayMigrationVersionCheckTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Date; +import lombok.val; +import org.flywaydb.core.api.MigrationInfo; +import org.flywaydb.core.api.MigrationState; +import org.flywaydb.core.api.MigrationType; +import org.flywaydb.core.api.MigrationVersion; +import org.junit.jupiter.api.Test; + +public class MinimumFlywayMigrationVersionCheckTest { + + private static final long DEFAULT_TIMEOUT_MS = 10 * 1000; + + @Test + void testDatabaseNotSetupFails() throws IOException { + val database = mock(DatabaseInstance.class); + when(database.isInitialized()).thenThrow(new IOException()).thenReturn(false); + + assertThrows(RuntimeException.class, () -> MinimumFlywayMigrationVersionCheck.assertDatabase(database, DEFAULT_TIMEOUT_MS)); + } + + @Test + void testDatabaseSetupSucceeds() throws IOException { + val database = mock(DatabaseInstance.class); + when(database.isInitialized()) + .thenReturn(false) + .thenReturn(false) + .thenReturn(true); + + assertDoesNotThrow(() -> MinimumFlywayMigrationVersionCheck.assertDatabase(database, DEFAULT_TIMEOUT_MS)); + } + + @Test + void testMatchesMinimum() { + val version = "0.22.0.1"; + val migrator = mock(DatabaseMigrator.class); + when(migrator.getLatestMigration()).thenReturn(new StubMigrationInfo(version)); + + assertDoesNotThrow(() -> MinimumFlywayMigrationVersionCheck.assertMigrations(migrator, version, DEFAULT_TIMEOUT_MS)); + } + + @Test + void testExceedsMinimum() { + val minVersion = "0.22.0.1"; + val latestVersion = "0.30.0"; + val migrator = mock(DatabaseMigrator.class); + when(migrator.getLatestMigration()).thenReturn(new StubMigrationInfo(latestVersion)); + + assertDoesNotThrow(() -> MinimumFlywayMigrationVersionCheck.assertMigrations(migrator, minVersion, DEFAULT_TIMEOUT_MS)); + } + + @Test + void testFulfilledAfter() { + val startVersion = "0.22.0.1"; + val minVersion = "0.30.0"; + val latestVersion = "0.33.0.1"; + + val migrator = mock(DatabaseMigrator.class); + when(migrator.getLatestMigration()) + .thenReturn(new StubMigrationInfo(startVersion)) + .thenReturn(new StubMigrationInfo(startVersion)) + .thenReturn(new StubMigrationInfo(startVersion)) + .thenReturn(new StubMigrationInfo(latestVersion)); + + assertDoesNotThrow(() -> MinimumFlywayMigrationVersionCheck.assertMigrations(migrator, minVersion, DEFAULT_TIMEOUT_MS)); + } + + @Test + void testTimeout() { + val startVersion = "0.22.0.1"; + val minVersion = "0.30.0"; + + val migrator = mock(DatabaseMigrator.class); + when(migrator.getLatestMigration()).thenReturn(new StubMigrationInfo(startVersion)); + + assertThrows(RuntimeException.class, () -> MinimumFlywayMigrationVersionCheck.assertMigrations(migrator, minVersion, DEFAULT_TIMEOUT_MS)); + } + + /** + * For testing purposes. + */ + private static class StubMigrationInfo implements MigrationInfo { + + private final String version; + + public StubMigrationInfo(String version) { + this.version = version; + } + + @Override + public MigrationType getType() { + return null; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion(version); + } + + @Override + public String getDescription() { + return null; + } + + @Override + public String getScript() { + return null; + } + + @Override + public MigrationState getState() { + return null; + } + + @Override + public Date getInstalledOn() { + return null; + } + + @Override + public String getInstalledBy() { + return null; + } + + @Override + public Integer getInstalledRank() { + return null; + } + + @Override + public Integer getExecutionTime() { + return null; + } + + @Override + public String getPhysicalLocation() { + return null; + } + + @Override + public int compareVersion(MigrationInfo o) { + return 0; + } + + @Override + public int compareTo(MigrationInfo o) { + return 0; + } + + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 6f8489dab1c1..f8a39c6db8d6 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -4,7 +4,6 @@ package io.airbyte.server; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; @@ -12,7 +11,6 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; -import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.init.YamlSeedConfigPersistence; import io.airbyte.config.persistence.ConfigPersistence; @@ -21,7 +19,8 @@ import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.db.Database; -import io.airbyte.db.instance.DatabaseMigrator; +import io.airbyte.db.instance.DatabaseInstance; +import io.airbyte.db.instance.MinimumFlywayMigrationVersionCheck; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; @@ -40,16 +39,14 @@ import io.airbyte.server.errors.KnownExceptionMapper; import io.airbyte.server.errors.NotFoundExceptionMapper; import io.airbyte.server.errors.UncaughtExceptionMapper; -import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.serviceclient.WorkflowServiceStubs; -import java.io.IOException; import java.net.http.HttpClient; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; +import lombok.val; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -64,7 +61,6 @@ public class ServerApp implements ServerRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class); private static final int PORT = 8001; - private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha"); private final AirbyteVersion airbyteVersion; private final Set> customComponentClasses; @@ -115,86 +111,63 @@ public void start() throws Exception { server.join(); } - private static void createDeploymentIfNoneExists(final JobPersistence jobPersistence) throws IOException { - final Optional deploymentOptional = jobPersistence.getDeployment(); - if (deploymentOptional.isPresent()) { - LOGGER.info("running deployment: {}", deploymentOptional.get()); - } else { - final UUID deploymentId = UUID.randomUUID(); - jobPersistence.setDeployment(deploymentId); - LOGGER.info("created deployment: {}", deploymentId); - } - } + private static void assertDatabasesReady(Configs configs, DatabaseInstance configsDatabaseInstance, DatabaseInstance jobsDatabaseInstance) + throws InterruptedException { + LOGGER.info("Checking configs database flyway migration version.."); + MinimumFlywayMigrationVersionCheck.assertDatabase(configsDatabaseInstance, MinimumFlywayMigrationVersionCheck.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS); + val configsMigrator = new ConfigsDatabaseMigrator(configsDatabaseInstance.getInitialized(), ServerApp.class.getName()); + MinimumFlywayMigrationVersionCheck.assertMigrations(configsMigrator, configs.getConfigsDatabaseMinimumFlywayMigrationVersion(), + configs.getConfigsDatabaseInitializationTimeoutMs()); - private static void createWorkspaceIfNoneExists(final ConfigRepository configRepository) throws JsonValidationException, IOException { - if (!configRepository.listStandardWorkspaces(true).isEmpty()) { - LOGGER.info("workspace already exists for the deployment."); - return; - } + LOGGER.info("Checking jobs database flyway migration version.."); + MinimumFlywayMigrationVersionCheck.assertDatabase(jobsDatabaseInstance, MinimumFlywayMigrationVersionCheck.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS); + val jobsMigrator = new JobsDatabaseMigrator(jobsDatabaseInstance.getInitialized(), ServerApp.class.getName()); + MinimumFlywayMigrationVersionCheck.assertMigrations(jobsMigrator, configs.getJobsDatabaseMinimumFlywayMigrationVersion(), + configs.getJobsDatabaseInitializationTimeoutMs()); - final UUID workspaceId = UUID.randomUUID(); - final StandardWorkspace workspace = new StandardWorkspace() - .withWorkspaceId(workspaceId) - .withCustomerId(UUID.randomUUID()) - .withName(workspaceId.toString()) - .withSlug(workspaceId.toString()) - .withInitialSetupComplete(false) - .withDisplaySetupWizard(true) - .withTombstone(false); - configRepository.writeStandardWorkspace(workspace); - TrackingClientSingleton.get().identify(workspaceId); } public static ServerRunnable getServer(final ServerFactory apiFactory, final ConfigPersistence seed) throws Exception { final Configs configs = new EnvConfigs(); - LogClientSingleton.getInstance().setWorkspaceMdc(configs.getWorkerEnvironment(), configs.getLogConfigs(), + LogClientSingleton.getInstance().setWorkspaceMdc( + configs.getWorkerEnvironment(), + configs.getLogConfigs(), LogClientSingleton.getInstance().getServerLogsRoot(configs.getWorkspaceRoot())); + LOGGER.info("Checking databases.."); + final DatabaseInstance configsDatabaseInstance = + new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()); + final DatabaseInstance jobsDatabaseInstance = + new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()); + assertDatabasesReady(configs, configsDatabaseInstance, jobsDatabaseInstance); + LOGGER.info("Creating Staged Resource folder..."); ConfigDumpImporter.initStagedResourceFolder(); LOGGER.info("Creating config repository..."); - // all these should be converted to get initialise calls - // insert the migration version check here - final Database configDatabase = new ConfigsDatabaseInstance( - configs.getConfigDatabaseUser(), - configs.getConfigDatabasePassword(), - configs.getConfigDatabaseUrl()) - .getAndInitialize(); - final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).migrateFileConfigs(configs); - + final Database configDatabase = configsDatabaseInstance.getInitialized(); + final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs); final Optional secretPersistence = SecretPersistence.getLongLived(configs); final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); - final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation(), secretsHydrator, secretPersistence, ephemeralSecretPersistence); - LOGGER.info("Creating Scheduler persistence..."); - final Database jobDatabase = new JobsDatabaseInstance( - configs.getDatabaseUser(), - configs.getDatabasePassword(), - configs.getDatabaseUrl()) - .getAndInitialize(); + LOGGER.info("Creating jobs persistence..."); + final Database jobDatabase = jobsDatabaseInstance.getInitialized(); final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); - // this should be moved to the controller - createDeploymentIfNoneExists(jobPersistence); - - // must happen after deployment id is set TrackingClientSingleton.initialize( configs.getTrackingStrategy(), new Deployment(configs.getDeploymentMode(), jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()), configs.getAirbyteRole(), configs.getAirbyteVersion(), configRepository); - final TrackingClient trackingClient = TrackingClientSingleton.get(); - // must happen after the tracking client is initialized. - // if no workspace exists, we create one so the user starts out with a place to add configuration. - createWorkspaceIfNoneExists(configRepository); + final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost()); final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot()); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient); @@ -204,34 +177,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); - // version in the database when the server main method is called. may be empty if this is the first - // time the server is started. - final AirbyteVersion airbyteVersion = configs.getAirbyteVersion(); - final Optional initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new); - if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) { - final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt"); - LOGGER.error(attentionBanner); - final String message = String.format( - "Cannot upgrade from version %s to version %s directly. First you must upgrade to version %s. After that upgrade is complete, you may upgrade to version %s", - initialAirbyteDatabaseVersion.get().serialize(), - airbyteVersion.serialize(), - VERSION_BREAK.serialize(), - airbyteVersion.serialize()); - - LOGGER.error(message); - throw new RuntimeException(message); - } - LOGGER.info("Starting server..."); - runFlywayMigration(configs, configDatabase, jobDatabase); - LOGGER.info("Ran Flyway migrations..."); - - jobPersistence.setVersion(airbyteVersion.serialize()); - - configPersistence.loadData(seed); - LOGGER.info("Loaded seed data..."); - return apiFactory.create( schedulerJobClient, syncSchedulerClient, @@ -250,47 +197,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con httpClient); } - @VisibleForTesting - static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { - // means there was no previous version so upgrade even needs to happen. always legal. - if (airbyteDatabaseVersion == null) { - return true; - } - - return !isUpgradingThroughVersionBreak(airbyteDatabaseVersion, airbyteVersion); - } - - /** - * Check to see if given the current version of the app and the version we are trying to upgrade if - * it passes through a version break (i.e. a major version bump). - * - * @param airbyteDatabaseVersion - current version of the app - * @param airbyteVersion - version we are trying to upgrade to - * @return true if upgrading through a major version, otherwise false. - */ - private static boolean isUpgradingThroughVersionBreak(final AirbyteVersion airbyteDatabaseVersion, final AirbyteVersion airbyteVersion) { - return airbyteDatabaseVersion.lessThan(VERSION_BREAK) && airbyteVersion.greaterThan(VERSION_BREAK); - } - public static void main(final String[] args) throws Exception { getServer(new ServerFactory.Api(), YamlSeedConfigPersistence.getDefault()).start(); } - private static void runFlywayMigration(final Configs configs, final Database configDatabase, final Database jobDatabase) { - final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, ServerApp.class.getSimpleName()); - final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, ServerApp.class.getSimpleName()); - - configDbMigrator.createBaseline(); - jobDbMigrator.createBaseline(); - - if (configs.runDatabaseMigrationOnStartup()) { - LOGGER.info("Migrating configs database"); - configDbMigrator.migrate(); - LOGGER.info("Migrating jobs database"); - jobDbMigrator.migrate(); - } else { - LOGGER.info("Auto database migration is skipped"); - } - } - } diff --git a/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java b/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java deleted file mode 100644 index 3c9f7dcdde9a..000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.airbyte.commons.version.AirbyteVersion; -import org.junit.jupiter.api.Test; - -class ServerAppTest { - - @Test - void testIsLegalUpgradePredicate() { - // starting from no previous version is always legal. - assertTrue(ServerApp.isLegalUpgrade(null, new AirbyteVersion("0.17.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(null, new AirbyteVersion("0.32.0-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(null, new AirbyteVersion("0.32.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(null, new AirbyteVersion("0.33.1-alpha"))); - // starting from a version that is pre-breaking migration cannot go past the breaking migration. - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.17.0-alpha"), new AirbyteVersion("0.17.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.17.0-alpha"), new AirbyteVersion("0.18.0-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.17.0-alpha"), new AirbyteVersion("0.32.0-alpha"))); - assertFalse(ServerApp.isLegalUpgrade(new AirbyteVersion("0.17.0-alpha"), new AirbyteVersion("0.32.1-alpha"))); - assertFalse(ServerApp.isLegalUpgrade(new AirbyteVersion("0.17.0-alpha"), new AirbyteVersion("0.33.0-alpha"))); - // any migration starting at the breaking migration or after it can upgrade to anything. - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.32.0-alpha"), new AirbyteVersion("0.32.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.32.0-alpha"), new AirbyteVersion("0.33.0-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.32.1-alpha"), new AirbyteVersion("0.32.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.32.1-alpha"), new AirbyteVersion("0.33.0-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.33.0-alpha"), new AirbyteVersion("0.33.1-alpha"))); - assertTrue(ServerApp.isLegalUpgrade(new AirbyteVersion("0.33.0-alpha"), new AirbyteVersion("0.34.0-alpha"))); - } - -} diff --git a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java index 146ad70686df..cae778f191ee 100644 --- a/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java +++ b/airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java @@ -47,8 +47,21 @@ import org.slf4j.LoggerFactory; /** - * In order to run this test from intellij, build the docker images via SUB_BUILD=PLATFORM ./gradlew - * composeBuild and set VERSION in .env to the pertinent version. + * This class contains an e2e test simulating what a user encounter when trying to upgrade Airybte. + *

+ * Three invariants are tested: + *

+ * - upgrading pass 0.32.0 without first upgrading to 0.32.0 should error. + *

+ * - upgrading pass 0.32.0 without first upgrading to 0.32.0 should not put the db in a bad state. + *

+ * - upgrading from 0.32.0 to the latest version should work. + *

+ * This test runs on the current code version and expects local images with the `dev` tag to be + * available. To do so, run SUB_BUILD=PLATFORM ./gradlew build. + *

+ * When running this test consecutively locally, it might be necessary to run `docker volume prune` + * to remove hanging volumes. */ public class MigrationAcceptanceTest { @@ -74,15 +87,21 @@ public void testAutomaticMigration() throws Exception { healthCheck(getApiClient()); }); + LOGGER.info("Finish initial 0.17.0-alpha start.."); + // attempt to run from pre-version bump version to post-version bump version. expect failure. final File currentDockerComposeFile = MoreResources.readResourceAsFile("docker-compose.yaml"); // piggybacks off of whatever the existing .env file is, so override default filesystem values in to // point at test paths. final Properties envFileProperties = overrideDirectoriesForTest(MoreProperties.envFileToProperties(ENV_FILE)); + // use the dev version so the test is run on the current code version. + envFileProperties.setProperty("VERSION", "dev"); runAirbyteAndWaitForUpgradeException(currentDockerComposeFile, envFileProperties); + LOGGER.info("Finished testing upgrade exception.."); // run "faux" major version bump version final File version32DockerComposeFile = MoreResources.readResourceAsFile("docker-compose-migration-test-0-32-0-alpha.yaml"); + final Properties version32EnvFileProperties = MoreProperties .envFileToProperties(MoreResources.readResourceAsFile("env-file-migration-test-0-32-0.env")); runAirbyte(version32DockerComposeFile, version32EnvFileProperties, MigrationAcceptanceTest::assertHealthy); @@ -129,7 +148,7 @@ private void runAirbyteAndWaitForUpgradeException(final File dockerComposeFile, LOGGER.info("Start up Airbyte at version {}", env.get("VERSION")); final AirbyteTestContainer airbyteTestContainer = new AirbyteTestContainer.Builder(dockerComposeFile) .setEnv(env) - .setLogListener("server", waitForLogLine.getListener("After that upgrade is complete, you may upgrade to version")) + .setLogListener("bootloader", waitForLogLine.getListener("After that upgrade is complete, you may upgrade to version")) .build(); airbyteTestContainer.startAsync(); diff --git a/airbyte-tests/src/automaticMigrationAcceptanceTest/resources/env-file-migration-test-0-32-0.env b/airbyte-tests/src/automaticMigrationAcceptanceTest/resources/env-file-migration-test-0-32-0.env index b8e12fa301cd..968a1f65b695 100644 --- a/airbyte-tests/src/automaticMigrationAcceptanceTest/resources/env-file-migration-test-0-32-0.env +++ b/airbyte-tests/src/automaticMigrationAcceptanceTest/resources/env-file-migration-test-0-32-0.env @@ -1,4 +1,4 @@ -VERSION=0.32.0-alpha +VERSION=0.32.0-alpha-patch-1 # Airbyte Internal Job Database, see https://docs.airbyte.io/operator-guides/configuring-airbyte-db DATABASE_USER=docker diff --git a/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java b/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java index 965288e55d8e..78b11aa2cf8b 100644 --- a/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java +++ b/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java @@ -72,14 +72,12 @@ public void startBlocking() throws IOException, InterruptedException { public void startAsync() throws IOException, InterruptedException { final File cleanedDockerComposeFile = prepareDockerComposeFile(dockerComposeFile); dockerComposeContainer = new DockerComposeContainer(cleanedDockerComposeFile).withEnv(env); + // Only expose logs related to db migrations. serviceLogConsumer(dockerComposeContainer, "init"); + serviceLogConsumer(dockerComposeContainer, "bootloader"); serviceLogConsumer(dockerComposeContainer, "db"); serviceLogConsumer(dockerComposeContainer, "seed"); - serviceLogConsumer(dockerComposeContainer, "scheduler"); serviceLogConsumer(dockerComposeContainer, "server"); - serviceLogConsumer(dockerComposeContainer, "webapp"); - serviceLogConsumer(dockerComposeContainer, "worker"); - serviceLogConsumer(dockerComposeContainer, "airbyte-temporal"); dockerComposeContainer.start(); } diff --git a/docker-compose.build-m1.yaml b/docker-compose.build-m1.yaml index 5aa778e6ec97..41a21f0aefc0 100644 --- a/docker-compose.build-m1.yaml +++ b/docker-compose.build-m1.yaml @@ -17,6 +17,14 @@ services: context: airbyte-config/init labels: io.airbyte.git-revision: ${GIT_REVISION} + bootloader: + platform: linux/amd64 + image: airbyte/bootloader:${VERSION} + build: + dockerfile: Dockerfile + context: airbyte-bootloader + labels: + io.airbyte.git-revision: ${GIT_REVISION} db: platform: linux/amd64 image: airbyte/db:${VERSION} diff --git a/docker-compose.build.yaml b/docker-compose.build.yaml index 17770daee246..21d03bf16722 100644 --- a/docker-compose.build.yaml +++ b/docker-compose.build.yaml @@ -8,6 +8,13 @@ services: context: airbyte-config/init labels: io.airbyte.git-revision: ${GIT_REVISION} + bootloader: + image: airbyte/bootloader:${VERSION} + build: + dockerfile: Dockerfile + context: airbyte-bootloader + labels: + io.airbyte.git-revision: ${GIT_REVISION} db: image: airbyte/db:${VERSION} build: diff --git a/docker-compose.yaml b/docker-compose.yaml index 5ee4f40c60b6..cfab010b50eb 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -18,6 +18,18 @@ services: - HACK_LOCAL_ROOT_PARENT=${HACK_LOCAL_ROOT_PARENT} volumes: - ${HACK_LOCAL_ROOT_PARENT}:/local_parent + bootloader: + image: airbyte/bootloader:${VERSION} + logging: *default-logging + container_name: airbyte-bootloader + environment: + - AIRBYTE_VERSION=${VERSION} + - DATABASE_USER=${DATABASE_USER} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_URL=${DATABASE_URL} + - CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-} + - CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-} + - CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-} db: image: airbyte/db:${VERSION} logging: *default-logging @@ -152,6 +164,8 @@ services: - JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST=${JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST} - JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT=${JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT} - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} + - CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} + - JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} ports: - 8001:8001 volumes: diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 6e3f81a6cadf..4bd0da6c1647 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -6,6 +6,8 @@ DATABASE_PORT=5432 DATABASE_DB=airbyte # translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} DATABASE_URL=jdbc:postgresql://airbyte-db-svc:5432/airbyte +JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 +CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001 # When using the airbyte-db via default docker image: CONFIG_ROOT=/configs diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index 8862c110d093..04f0725377a9 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -9,6 +9,8 @@ bases: images: - name: airbyte/db newTag: dev + - name: airbyte/bootloader + newTag: dev - name: airbyte/scheduler newTag: dev - name: airbyte/server diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index e52b05f042dd..7766364a0edf 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -6,6 +6,8 @@ DATABASE_PORT=5432 DATABASE_DB=airbyte # translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} DATABASE_URL=jdbc:postgresql://airbyte-db-svc:5432/airbyte +JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 +CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001 # When using the airbyte-db via default docker image: CONFIG_ROOT=/configs diff --git a/kube/overlays/dev/kustomization.yaml b/kube/overlays/dev/kustomization.yaml index 262271ad352a..5842f9ccb052 100644 --- a/kube/overlays/dev/kustomization.yaml +++ b/kube/overlays/dev/kustomization.yaml @@ -9,6 +9,8 @@ bases: images: - name: airbyte/db newTag: dev + - name: airbyte/bootloader + newTag: dev - name: airbyte/scheduler newTag: dev - name: airbyte/server diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 1a8b3ac0e0cf..71c87d8bb407 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -6,6 +6,8 @@ DATABASE_PORT=5432 DATABASE_DB=airbyte # translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} DATABASE_URL=jdbc:postgresql://airbyte-db-svc:5432/airbyte +JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 +CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001 # When using the airbyte-db via default docker image: CONFIG_ROOT=/configs diff --git a/kube/overlays/stable-with-resource-limits/kustomization.yaml b/kube/overlays/stable-with-resource-limits/kustomization.yaml index 6da67d49b576..89e09a457923 100644 --- a/kube/overlays/stable-with-resource-limits/kustomization.yaml +++ b/kube/overlays/stable-with-resource-limits/kustomization.yaml @@ -9,6 +9,8 @@ bases: images: - name: airbyte/db newTag: 0.33.12-alpha + - name: airbyte/bootloader + newTag: 0.33.12-alpha - name: airbyte/scheduler newTag: 0.33.12-alpha - name: airbyte/server diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 1a8b3ac0e0cf..71c87d8bb407 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -6,6 +6,8 @@ DATABASE_PORT=5432 DATABASE_DB=airbyte # translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} DATABASE_URL=jdbc:postgresql://airbyte-db-svc:5432/airbyte +JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 +CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.30.22.001 # When using the airbyte-db via default docker image: CONFIG_ROOT=/configs diff --git a/kube/overlays/stable/kustomization.yaml b/kube/overlays/stable/kustomization.yaml index 7b50c532c6d7..941eef696bd0 100644 --- a/kube/overlays/stable/kustomization.yaml +++ b/kube/overlays/stable/kustomization.yaml @@ -9,6 +9,8 @@ bases: images: - name: airbyte/db newTag: 0.33.12-alpha + - name: airbyte/bootloader + newTag: 0.33.12-alpha - name: airbyte/scheduler newTag: 0.33.12-alpha - name: airbyte/server diff --git a/kube/resources/bootloader.yaml b/kube/resources/bootloader.yaml new file mode 100644 index 000000000000..0db7fd787b68 --- /dev/null +++ b/kube/resources/bootloader.yaml @@ -0,0 +1,40 @@ +apiVersion: v1 +kind: Pod +metadata: + name: airbyte-bootloader +spec: + restartPolicy: Never + containers: + - name: airbyte-bootloader-container + image: airbyte/bootloader + env: + - name: AIRBYTE_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AIRBYTE_VERSION + - name: DATABASE_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_HOST + - name: DATABASE_PORT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_PORT + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_PASSWORD + - name: DATABASE_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_URL + - name: DATABASE_USER + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_USER diff --git a/kube/resources/kustomization.yaml b/kube/resources/kustomization.yaml index f83707d3f39c..b00b02668c65 100644 --- a/kube/resources/kustomization.yaml +++ b/kube/resources/kustomization.yaml @@ -3,6 +3,7 @@ kind: Kustomization resources: - airbyte-minio.yaml + - bootloader.yaml - db.yaml - pod-sweeper.yaml - scheduler.yaml diff --git a/kube/resources/server.yaml b/kube/resources/server.yaml index 779db78b2336..9c42d48f4def 100644 --- a/kube/resources/server.yaml +++ b/kube/resources/server.yaml @@ -143,6 +143,16 @@ spec: configMapKeyRef: name: airbyte-env key: GCS_LOG_BUCKET + - name: CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION + - name: JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION ports: - containerPort: 8001 volumeMounts: diff --git a/tools/bin/acceptance_test.sh b/tools/bin/acceptance_test.sh index e2d5e652f7ed..c01b58d1d523 100755 --- a/tools/bin/acceptance_test.sh +++ b/tools/bin/acceptance_test.sh @@ -13,7 +13,7 @@ VERSION=dev TRACKING_STRATEGY=logging docker-compose up -d trap "echo 'docker-compose logs:' && docker-compose logs -t --tail 1000 && docker-compose down -v" EXIT echo "Waiting for services to begin" -sleep 10 # TODO need a better way to wait +while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' localhost:8000/api/v1/health)" != "200" ]]; do echo "Waiting for docker deployment.."; sleep 5; done echo "Running e2e tests via gradle" SUB_BUILD=PLATFORM USE_EXTERNAL_DEPLOYMENT=true ./gradlew :airbyte-tests:acceptanceTests --rerun-tasks --scan diff --git a/tools/bin/acceptance_test_kube.sh b/tools/bin/acceptance_test_kube.sh index 08cce16bcb79..15c7a262139f 100755 --- a/tools/bin/acceptance_test_kube.sh +++ b/tools/bin/acceptance_test_kube.sh @@ -14,6 +14,7 @@ kind load docker-image airbyte/scheduler:dev --name chart-testing & kind load docker-image airbyte/webapp:dev --name chart-testing & kind load docker-image airbyte/worker:dev --name chart-testing & kind load docker-image airbyte/db:dev --name chart-testing & +kind load docker-image airbyte/bootloader:dev --name chart-testing & wait echo "Starting app..."