diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java index 81626ced0c2e..2a11f656ebe8 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/ConfigSchema.java @@ -37,7 +37,7 @@ public enum ConfigSchema implements AirbyteConfig { destinationConnection -> destinationConnection.getDestinationId().toString(), "destinationId"), - // sync + // sync (i.e. connection) STANDARD_SYNC("StandardSync.yaml", StandardSync.class, standardSync -> standardSync.getConnectionId().toString(), @@ -46,6 +46,10 @@ public enum ConfigSchema implements AirbyteConfig { StandardSyncOperation.class, standardSyncOperation -> standardSyncOperation.getOperationId().toString(), "operationId"), + STANDARD_SYNC_STATE("StandardSyncState.yaml", + StandardSyncState.class, + standardSyncState -> standardSyncState.getConnectionId().toString(), + "connectionId"), SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class, sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(), diff --git a/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml b/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml new file mode 100644 index 000000000000..964e084bc696 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml @@ -0,0 +1,17 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml +title: StandardSyncState +description: The current state of a connection (i.e. StandardSync). +type: object +additionalProperties: false +required: + - connectionId +properties: + connectionId: + type: string + format: uuid + description: This is a foreign key that references a connection (i.e. StandardSync). + state: + "$ref": State.yaml + description: The current (latest) connection state. diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index c03d7b44fca6..d7d47242df73 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -5,6 +5,7 @@ package io.airbyte.config.persistence; import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.Preconditions; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; @@ -19,7 +20,9 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.StandardSyncState; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.State; import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.config.persistence.split_secrets.SecretsHelpers; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; @@ -42,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class ConfigRepository { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class); @@ -431,6 +435,30 @@ public List listDestinationOAuthParam() throws JsonVa return persistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class); } + public Optional getConnectionState(final UUID connectionId) throws IOException { + try { + final StandardSyncState connectionState = persistence.getConfig( + ConfigSchema.STANDARD_SYNC_STATE, + connectionId.toString(), + StandardSyncState.class); + return Optional.of(connectionState.getState()); + } catch (final ConfigNotFoundException e) { + return Optional.empty(); + } catch (final JsonValidationException e) { + throw new IllegalStateException(e); + } + } + + public void updateConnectionState(final UUID connectionId, final State state) throws IOException { + LOGGER.info("Updating connection {} state: {}", connectionId, state); + final StandardSyncState connectionState = new StandardSyncState().withConnectionId(connectionId).withState(state); + try { + persistence.writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState); + } catch (final JsonValidationException e) { + throw new IllegalStateException(e); + } + } + /** * Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the * string/jsonnode into the AirbyteConfig, Stream @@ -453,6 +481,7 @@ public void replaceAllConfigsDeserializing(final Map> c public void replaceAllConfigs(final Map> configs, final boolean dryRun) throws IOException { if (longLivedSecretPersistence.isPresent()) { + Preconditions.checkNotNull(specFetcherFn); final var augmentedMap = new HashMap<>(configs); // get all source defs so that we can use their specs when storing secrets. diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index b4efefc427a1..19035ed42295 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -61,7 +61,7 @@ public DatabaseConfigPersistence(final Database database) { * If this is a migration deployment from an old version that relies on file system config * persistence, copy the existing configs from local files. */ - public void migrateFileConfigs(final Configs serverConfigs) throws IOException { + public DatabaseConfigPersistence migrateFileConfigs(final Configs serverConfigs) throws IOException { database.transaction(ctx -> { final boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS); if (isInitialized) { @@ -77,6 +77,8 @@ public void migrateFileConfigs(final Configs serverConfigs) throws IOException { return null; }); + + return this; } @Override diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index 9a1c9befbf29..8681788cb6f8 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -11,7 +11,6 @@ import io.airbyte.config.AirbyteConfig; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -186,7 +185,7 @@ public void replaceAllConfigs(final Map> configs, final @Override public void loadData(final ConfigPersistence seedPersistence) throws IOException { - throw new UnsupportedEncodingException("This method is not supported in this implementation"); + // this method is not supported in this implementation, but needed in tests; do nothing } private T getConfigInternal(final AirbyteConfig configType, final String configId, final Class clazz) diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index 6818f4e13128..7580216f12e4 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -6,16 +6,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; +import io.airbyte.config.StandardSyncState; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.State; import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence; import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.Optional; import java.util.UUID; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,6 +41,11 @@ void setup() { new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence)); } + @AfterEach + void cleanUp() { + reset(configPersistence); + } + @Test void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException { assertReturnsWorkspace(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID)); @@ -55,4 +67,34 @@ void assertReturnsWorkspace(final StandardWorkspace workspace) throws ConfigNotF assertEquals(workspace, configRepository.getStandardWorkspace(WORKSPACE_ID, true)); } + @Test + void testGetConnectionState() throws Exception { + final UUID connectionId = UUID.randomUUID(); + final State state = new State().withState(Jsons.deserialize("{ \"cursor\": 1000 }")); + final StandardSyncState connectionState = new StandardSyncState().withConnectionId(connectionId).withState(state); + + when(configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), StandardSyncState.class)) + .thenThrow(new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_STATE, connectionId)); + assertEquals(Optional.empty(), configRepository.getConnectionState(connectionId)); + + reset(configPersistence); + when(configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), StandardSyncState.class)) + .thenReturn(connectionState); + assertEquals(Optional.of(state), configRepository.getConnectionState(connectionId)); + } + + @Test + void testUpdateConnectionState() throws Exception { + final UUID connectionId = UUID.randomUUID(); + final State state1 = new State().withState(Jsons.deserialize("{ \"cursor\": 1 }")); + final StandardSyncState connectionState1 = new StandardSyncState().withConnectionId(connectionId).withState(state1); + final State state2 = new State().withState(Jsons.deserialize("{ \"cursor\": 2 }")); + final StandardSyncState connectionState2 = new StandardSyncState().withConnectionId(connectionId).withState(state2); + + configRepository.updateConnectionState(connectionId, state1); + verify(configPersistence, times(1)).writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState1); + configRepository.updateConnectionState(connectionId, state2); + verify(configPersistence, times(1)).writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState2); + } + } diff --git a/airbyte-db/lib/build.gradle b/airbyte-db/lib/build.gradle index 0d9d22059bf2..1d3d5dd2d331 100644 --- a/airbyte-db/lib/build.gradle +++ b/airbyte-db/lib/build.gradle @@ -10,6 +10,7 @@ dependencies { implementation project(':airbyte-protocol:models') implementation project(':airbyte-json-validation') + implementation project(':airbyte-config:models') implementation "org.flywaydb:flyway-core:7.14.0" implementation "org.testcontainers:postgresql:1.15.3" // These are required because gradle might be using lower version of Jna from other diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java new file mode 100644 index 000000000000..99d4fbfd1c1c --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/ConfigsDatabaseTestProvider.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.table; + +import io.airbyte.config.ConfigSchema; +import io.airbyte.db.Database; +import io.airbyte.db.ExceptionWrappingDatabase; +import io.airbyte.db.instance.DatabaseMigrator; +import io.airbyte.db.instance.test.TestDatabaseProvider; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.jooq.JSONB; + +public class ConfigsDatabaseTestProvider implements TestDatabaseProvider { + + private final String user; + private final String password; + private final String jdbcUrl; + + public ConfigsDatabaseTestProvider(final String user, final String password, final String jdbcUrl) { + this.user = user; + this.password = password; + this.jdbcUrl = jdbcUrl; + } + + @Override + public Database create(final boolean runMigration) throws IOException { + final Database database = new ConfigsDatabaseInstance(user, password, jdbcUrl) + .getAndInitialize(); + + if (runMigration) { + final DatabaseMigrator migrator = new ConfigsDatabaseMigrator( + database, + ConfigsDatabaseTestProvider.class.getSimpleName()); + migrator.createBaseline(); + migrator.migrate(); + } + + // The configs database is considered ready only if there are some seed records. + // So we need to create at least one record here. + final OffsetDateTime timestamp = OffsetDateTime.now(); + new ExceptionWrappingDatabase(database).transaction(ctx -> ctx.insertInto(table("airbyte_configs")) + .set(field("config_id"), UUID.randomUUID().toString()) + .set(field("config_type"), ConfigSchema.STATE.name()) + .set(field("config_blob"), JSONB.valueOf("{}")) + .set(field("created_at"), timestamp) + .set(field("updated_at"), timestamp) + .execute()); + + return database; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java new file mode 100644 index 000000000000..79567bb34c5c --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.StandardSyncState; +import io.airbyte.config.State; +import io.airbyte.db.Database; +import io.airbyte.db.instance.jobs.JobsDatabaseInstance; +import java.io.IOException; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.Table; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copy the latest job state for each standard sync to the config database. + */ +public class V0_30_22_001__Store_last_sync_state extends BaseJavaMigration { + + private static final ObjectMapper MAPPER = MoreMappers.initMapper(); + private static final String MIGRATION_NAME = "Configs db migration 0.30.22.001"; + private static final Logger LOGGER = LoggerFactory.getLogger(V0_30_22_001__Store_last_sync_state.class); + + // airbyte configs table + // (we cannot use the jooq generated code here to avoid circular dependency) + static final Table TABLE_AIRBYTE_CONFIGS = DSL.table("airbyte_configs"); + static final Field COLUMN_CONFIG_TYPE = DSL.field("config_type", SQLDataType.VARCHAR(60).nullable(false)); + static final Field COLUMN_CONFIG_ID = DSL.field("config_id", SQLDataType.VARCHAR(36).nullable(false)); + static final Field COLUMN_CONFIG_BLOB = DSL.field("config_blob", SQLDataType.JSONB.nullable(false)); + static final Field COLUMN_CREATED_AT = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); + static final Field COLUMN_UPDATED_AT = DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE); + + private final Configs configs; + + public V0_30_22_001__Store_last_sync_state() { + this.configs = new EnvConfigs(); + } + + @VisibleForTesting + V0_30_22_001__Store_last_sync_state(final Configs configs) { + this.configs = configs; + } + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + final DSLContext ctx = DSL.using(context.getConnection()); + + final Optional jobsDatabase = getJobsDatabase(configs); + if (jobsDatabase.isPresent()) { + copyData(ctx, getStandardSyncStates(jobsDatabase.get()), OffsetDateTime.now()); + } + } + + @VisibleForTesting + static void copyData(final DSLContext ctx, final Set standardSyncStates, final OffsetDateTime timestamp) { + LOGGER.info("[{}] Number of connection states to copy: {}", MIGRATION_NAME, standardSyncStates.size()); + + for (final StandardSyncState standardSyncState : standardSyncStates) { + ctx.insertInto(TABLE_AIRBYTE_CONFIGS) + .set(COLUMN_CONFIG_TYPE, ConfigSchema.STANDARD_SYNC_STATE.name()) + .set(COLUMN_CONFIG_ID, standardSyncState.getConnectionId().toString()) + .set(COLUMN_CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(standardSyncState))) + .set(COLUMN_CREATED_AT, timestamp) + .set(COLUMN_UPDATED_AT, timestamp) + // This migration is idempotent. If the record for a sync_id already exists, + // it means that the migration has already been run before. Abort insertion. + .onDuplicateKeyIgnore() + .execute(); + } + } + + /** + * This migration requires a connection to the job database, which may be a separate database from + * the config database. However, the job database only exists in production, not in development or + * test. We use the job database environment variables to determine how to connect to the job + * database. This approach is not 100% reliable. However, it is better than doing half of the + * migration here (creating the table), and the rest of the work during server start up (copying the + * data from the job database). + */ + @VisibleForTesting + static Optional getJobsDatabase(final Configs configs) { + try { + // If the environment variables exist, it means the migration is run in production. + // Connect to the official job database. + final Database jobsDatabase = new JobsDatabaseInstance( + configs.getDatabaseUser(), + configs.getDatabasePassword(), + configs.getDatabaseUrl()) + .getInitialized(); + LOGGER.info("[{}] Connected to jobs database: {}", MIGRATION_NAME, configs.getDatabaseUrl()); + return Optional.of(jobsDatabase); + } catch (final IllegalArgumentException e) { + // If the environment variables do not exist, it means the migration is run in development. + LOGGER.info("[{}] This is the dev environment; there is no jobs database", MIGRATION_NAME); + return Optional.empty(); + } catch (final IOException e) { + throw new RuntimeException("Cannot connect to jobs database", e); + } + } + + /** + * @return a set of StandardSyncStates from the latest attempt for each connection. + */ + @VisibleForTesting + static Set getStandardSyncStates(final Database jobsDatabase) throws SQLException { + final Table jobsTable = DSL.table("jobs"); + final Field jobId = DSL.field("jobs.id", SQLDataType.BIGINT); + final Field connectionId = DSL.field("jobs.scope", SQLDataType.VARCHAR); + + final Table attemptsTable = DSL.table("attempts"); + final Field attemptJobId = DSL.field("attempts.job_id", SQLDataType.BIGINT); + final Field attemptNumber = DSL.field("attempts.attempt_number", SQLDataType.INTEGER); + final Field attemptCreatedAt = DSL.field("attempts.created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); + + // output schema: JobOutput.yaml + // sync schema: StandardSyncOutput.yaml + // state schema: State.yaml, e.g. { "state": { "cursor": 1000 } } + final Field attemptState = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB); + + return jobsDatabase.query(ctx -> ctx + .select(connectionId, attemptState) + .distinctOn(connectionId) + .from(attemptsTable) + .innerJoin(jobsTable) + .on(jobId.eq(attemptJobId)) + .where(attemptState.isNotNull()) + // this query assumes that an attempt with larger created_at field is always a newer attempt + .orderBy(connectionId, attemptCreatedAt.desc()) + .fetch() + .stream() + .map(r -> getStandardSyncState(UUID.fromString(r.value1()), Jsons.deserialize(r.value2().data(), State.class)))) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + static StandardSyncState getStandardSyncState(final UUID connectionId, final State state) { + return new StandardSyncState().withConnectionId(connectionId).withState(state); + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/development/MigrationDevHelper.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/development/MigrationDevHelper.java index 3b75359695be..076d446d8729 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/development/MigrationDevHelper.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/development/MigrationDevHelper.java @@ -65,8 +65,8 @@ public static void createNextMigrationFile(final String dbIdentifier, final Flyw final String template = MoreResources.readResource("migration_template.txt"); final String newMigration = template.replace("", dbIdentifier) - .replace("", versionId) - .replace("", description) + .replaceAll("", versionId) + .replaceAll("", description) .strip(); final String fileName = String.format("V%s__%s.java", versionId, description); diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseTestProvider.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseTestProvider.java new file mode 100644 index 000000000000..133b9c290807 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/JobsDatabaseTestProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.jobs; + +import io.airbyte.db.Database; +import io.airbyte.db.instance.DatabaseMigrator; +import io.airbyte.db.instance.test.TestDatabaseProvider; +import java.io.IOException; + +public class JobsDatabaseTestProvider implements TestDatabaseProvider { + + private final String user; + private final String password; + private final String jdbcUrl; + + public JobsDatabaseTestProvider(String user, String password, String jdbcUrl) { + this.user = user; + this.password = password; + this.jdbcUrl = jdbcUrl; + } + + @Override + public Database create(final boolean runMigration) throws IOException { + final Database jobsDatabase = new JobsDatabaseInstance(user, password, jdbcUrl) + .getAndInitialize(); + + if (runMigration) { + final DatabaseMigrator migrator = new JobsDatabaseMigrator( + jobsDatabase, + JobsDatabaseTestProvider.class.getSimpleName()); + migrator.createBaseline(); + migrator.migrate(); + } + + return jobsDatabase; + } + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.java index be9cc2b2cb4d..d188ba6fe662 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.java @@ -9,14 +9,17 @@ import org.jooq.DSLContext; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts extends BaseJavaMigration { + private static final Logger LOGGER = LoggerFactory.getLogger(V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.class); + @Override public void migrate(final Context context) throws Exception { - // Warning: please do not use any jOOQ generated code to write a migration. - // As database schema changes, the generated jOOQ code can be deprecated. So - // old migration may not compile if there is any generated code. + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + final DSLContext ctx = DSL.using(context.getConnection()); ctx.alterTable("attempts") .addColumnIfNotExists(DSL.field("temporal_workflow_id", SQLDataType.VARCHAR(256).nullable(true))) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProvider.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProvider.java new file mode 100644 index 000000000000..a5f0b9abe474 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProvider.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.test; + +import io.airbyte.db.Database; +import java.io.IOException; + +/** + * Create mock database in unit tests. The implementation will be responsible for: 1) constructing + * and preparing the database, and 2) running the Flyway migration. + */ +public interface TestDatabaseProvider { + + /** + * @param runMigration Whether the mock database should run Flyway migration before it is used in + * unit test. Usually this parameter should be false only when the migration itself is being + * tested. + */ + Database create(final boolean runMigration) throws IOException; + +} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProviders.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProviders.java new file mode 100644 index 000000000000..7bb34b7dc588 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/test/TestDatabaseProviders.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.test; + +import com.google.api.client.util.Preconditions; +import io.airbyte.config.Configs; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; +import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider; +import java.io.IOException; +import java.util.Optional; +import org.testcontainers.containers.PostgreSQLContainer; + +/** + * Use this class to create mock databases in unit tests. This class takes care of database + * initialization and migration. + */ +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +public class TestDatabaseProviders { + + private final Optional configs; + private final Optional> container; + private boolean runMigration = true; + + public TestDatabaseProviders(final Configs configs) { + this.configs = Optional.of(configs); + this.container = Optional.empty(); + } + + public TestDatabaseProviders(final PostgreSQLContainer container) { + this.configs = Optional.empty(); + this.container = Optional.of(container); + } + + /** + * When creating mock databases in unit tests, migration should be run by default. Call this method + * to turn migration off, which is needed when unit testing migration code. + */ + public TestDatabaseProviders turnOffMigration() { + this.runMigration = false; + return this; + } + + public Database createNewConfigsDatabase() throws IOException { + Preconditions.checkArgument(configs.isPresent() || container.isPresent()); + if (configs.isPresent()) { + final Configs c = configs.get(); + return new ConfigsDatabaseTestProvider( + c.getConfigDatabaseUser(), + c.getConfigDatabasePassword(), + c.getConfigDatabaseUrl()) + .create(runMigration); + } else { + final PostgreSQLContainer c = container.get(); + return new ConfigsDatabaseTestProvider( + c.getUsername(), + c.getPassword(), + c.getJdbcUrl()) + .create(runMigration); + } + } + + public Database createNewJobsDatabase() throws IOException { + Preconditions.checkArgument(configs.isPresent() || container.isPresent()); + if (configs.isPresent()) { + final Configs c = configs.get(); + return new JobsDatabaseTestProvider( + c.getDatabaseUser(), + c.getDatabasePassword(), + c.getDatabaseUrl()) + .create(runMigration); + } else { + final PostgreSQLContainer c = container.get(); + return new JobsDatabaseTestProvider( + c.getUsername(), + c.getPassword(), + c.getJdbcUrl()) + .create(runMigration); + } + } + +} diff --git a/airbyte-db/lib/src/main/resources/migration_template.txt b/airbyte-db/lib/src/main/resources/migration_template.txt index 4398aba69439..074c2d97b9be 100644 --- a/airbyte-db/lib/src/main/resources/migration_template.txt +++ b/airbyte-db/lib/src/main/resources/migration_template.txt @@ -4,16 +4,22 @@ import org.flywaydb.core.api.migration.BaseJavaMigration; import org.flywaydb.core.api.migration.Context; import org.jooq.DSLContext; import org.jooq.impl.DSL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO: update migration description in the class name public class V__ extends BaseJavaMigration { + private static final Logger LOGGER = LoggerFactory.getLogger(V__.class); + @Override - public void migrate(Context context) throws Exception { + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + // Warning: please do not use any jOOQ generated code to write a migration. // As database schema changes, the generated jOOQ code can be deprecated. So // old migration may not compile if there is any generated code. - DSLContext ctx = DSL.using(context.getConnection()); + final DSLContext ctx = DSL.using(context.getConnection()); } } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/AbstractDatabaseTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/AbstractDatabaseTest.java index 1fae9c7f320c..de040b91c344 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/AbstractDatabaseTest.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/AbstractDatabaseTest.java @@ -34,7 +34,7 @@ public static void dbDown() { @BeforeEach public void setup() throws Exception { - database = getAndInitializeDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl()); + database = getDatabase(); } @AfterEach @@ -44,8 +44,9 @@ void tearDown() throws Exception { /** * Create an initialized database. The downstream implementation should do it by calling - * {@link DatabaseInstance#getAndInitialize}. + * {@link DatabaseInstance#getAndInitialize} or {@link DatabaseInstance#getInitialized}, and + * {@link DatabaseMigrator#migrate} if necessary. */ - public abstract Database getAndInitializeDatabase(String username, String password, String connectionString) throws IOException; + public abstract Database getDatabase() throws IOException; } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/AbstractConfigsDatabaseTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/AbstractConfigsDatabaseTest.java index bd7b1695145d..edb6a454c66a 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/AbstractConfigsDatabaseTest.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/AbstractConfigsDatabaseTest.java @@ -8,11 +8,10 @@ import static org.jooq.impl.DSL.table; import io.airbyte.db.Database; -import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.instance.AbstractDatabaseTest; +import io.airbyte.db.instance.test.TestDatabaseProviders; import java.io.IOException; import java.time.OffsetDateTime; -import java.util.UUID; import org.jooq.Field; import org.jooq.JSONB; import org.jooq.Record; @@ -27,21 +26,9 @@ public abstract class AbstractConfigsDatabaseTest extends AbstractDatabaseTest { public static final Field CREATED_AT = field("created_at", OffsetDateTime.class); public static final Field UPDATED_AT = field("updated_at", OffsetDateTime.class); - public Database getAndInitializeDatabase(final String username, final String password, final String connectionString) throws IOException { - final Database database = - new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - - // The configs database is considered ready only if there are some seed records. - // So we need to create at least one record here. - final OffsetDateTime timestamp = OffsetDateTime.now(); - new ExceptionWrappingDatabase(database).transaction(ctx -> ctx.insertInto(AIRBYTE_CONFIGS) - .set(CONFIG_ID, UUID.randomUUID().toString()) - .set(CONFIG_TYPE, "STANDARD_SOURCE_DEFINITION") - .set(CONFIG_BLOB, JSONB.valueOf("{}")) - .set(CREATED_AT, timestamp) - .set(UPDATED_AT, timestamp) - .execute()); - return database; + @Override + public Database getDatabase() throws IOException { + return new TestDatabaseProviders(container).turnOffMigration().createNewConfigsDatabase(); } } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java new file mode 100644 index 000000000000..e7fbd7e371a7 --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_CONFIG_BLOB; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_CONFIG_ID; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_CONFIG_TYPE; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_CREATED_AT; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.COLUMN_UPDATED_AT; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.TABLE_AIRBYTE_CONFIGS; +import static io.airbyte.db.instance.configs.migrations.V0_30_22_001__Store_last_sync_state.getStandardSyncState; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.table; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.JobOutput; +import io.airbyte.config.JobOutput.OutputType; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.config.StandardSyncState; +import io.airbyte.config.State; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.jobs.JobsDatabaseInstance; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import javax.annotation.Nullable; +import org.flywaydb.core.api.configuration.Configuration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.Table; +import org.jooq.impl.SQLDataType; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTest { + + private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); + private static final OffsetDateTime TIMESTAMP = OffsetDateTime.now(); + + private static final Table JOBS_TABLE = table("jobs"); + private static final Field JOB_ID_FIELD = field("id", SQLDataType.BIGINT); + private static final Field JOB_SCOPE_FIELD = field("scope", SQLDataType.VARCHAR); + private static final Field JOB_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); + + private static final Table ATTEMPTS_TABLE = table("attempts"); + private static final Field ATTEMPT_ID_FIELD = field("id", SQLDataType.BIGINT); + private static final Field ATTEMPT_JOB_ID_FIELD = field("job_id", SQLDataType.BIGINT); + private static final Field ATTEMPT_NUMBER_FIELD = field("attempt_number", SQLDataType.INTEGER); + private static final Field ATTEMPT_OUTPUT_FIELD = field("output", SQLDataType.JSONB); + private static final Field ATTEMPT_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); + + private static final UUID CONNECTION_1_ID = UUID.randomUUID(); + private static final UUID CONNECTION_2_ID = UUID.randomUUID(); + private static final UUID CONNECTION_3_ID = UUID.randomUUID(); + + private static final State CONNECTION_2_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 2222 } }", State.class); + private static final State CONNECTION_3_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 3333 } }", State.class); + private static final State CONNECTION_OLD_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": -1 } }", State.class); + + private static final StandardSyncState STD_CONNECTION_STATE_2 = getStandardSyncState(CONNECTION_2_ID, CONNECTION_2_STATE); + private static final StandardSyncState STD_CONNECTION_STATE_3 = getStandardSyncState(CONNECTION_3_ID, CONNECTION_3_STATE); + private static final Set STD_CONNECTION_STATES = Set.of(STD_CONNECTION_STATE_2, STD_CONNECTION_STATE_3); + + private static Database jobDatabase; + + @BeforeAll + public static void setupJobDatabase() throws Exception { + jobDatabase = new JobsDatabaseInstance( + container.getUsername(), + container.getPassword(), + container.getJdbcUrl()) + .getAndInitialize(); + } + + @Test + @Order(10) + public void testGetJobsDatabase() { + // when there is no database environment variable, the return value is empty + assertTrue(V0_30_22_001__Store_last_sync_state.getJobsDatabase(new EnvConfigs()).isEmpty()); + + // when there is database environment variable, return the database + final Configs configs = mock(Configs.class); + when(configs.getDatabaseUser()).thenReturn(container.getUsername()); + when(configs.getDatabasePassword()).thenReturn(container.getPassword()); + when(configs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); + + assertTrue(V0_30_22_001__Store_last_sync_state.getJobsDatabase(configs).isPresent()); + } + + @Test + @Order(20) + public void testGetStandardSyncStates() throws Exception { + jobDatabase.query(ctx -> { + // Connection 1 has 1 job, no attempt. + // This is to test that connection without no state is not returned. + createJob(ctx, CONNECTION_1_ID, 30); + + // Connection 2 has two jobs, each has one attempt. + // This is to test that only the state from the latest job is returned. + final long job21 = createJob(ctx, CONNECTION_2_ID, 10); + final long job22 = createJob(ctx, CONNECTION_2_ID, 20); + assertNotEquals(job21, job22); + createAttempt(ctx, job21, 1, createAttemptOutput(CONNECTION_OLD_STATE), 11); + createAttempt(ctx, job22, 1, createAttemptOutput(CONNECTION_2_STATE), 21); + + // Connection 3 has two jobs. + // The first job has multiple attempts. Its third attempt has the latest state. + // The second job has two attempts with no state. + // This is to test that only the state from the latest attempt is returned. + final long job31 = createJob(ctx, CONNECTION_3_ID, 5); + final long job32 = createJob(ctx, CONNECTION_3_ID, 15); + assertNotEquals(job31, job32); + createAttempt(ctx, job31, 1, createAttemptOutput(CONNECTION_OLD_STATE), 6); + createAttempt(ctx, job31, 2, null, 7); + createAttempt(ctx, job31, 3, createAttemptOutput(CONNECTION_3_STATE), 8); + createAttempt(ctx, job31, 4, null, 9); + createAttempt(ctx, job31, 5, null, 10); + createAttempt(ctx, job32, 1, null, 20); + createAttempt(ctx, job32, 2, null, 25); + + assertEquals(STD_CONNECTION_STATES, V0_30_22_001__Store_last_sync_state.getStandardSyncStates(jobDatabase)); + + return null; + }); + } + + @Test + @Order(30) + public void testCopyData() throws SQLException { + + final Set newConnectionStates = Collections.singleton( + new StandardSyncState() + .withConnectionId(CONNECTION_2_ID) + .withState(new State().withState(Jsons.deserialize("{ \"cursor\": 3 }")))); + + final OffsetDateTime timestamp = OffsetDateTime.now(); + + database.query(ctx -> { + V0_30_22_001__Store_last_sync_state.copyData(ctx, STD_CONNECTION_STATES, timestamp); + checkSyncStates(ctx, STD_CONNECTION_STATES, timestamp); + + // call the copyData method again with different data will not affect existing records + V0_30_22_001__Store_last_sync_state.copyData(ctx, newConnectionStates, OffsetDateTime.now()); + // the states remain the same as those in STD_CONNECTION_STATES + checkSyncStates(ctx, STD_CONNECTION_STATES, timestamp); + + return null; + }); + } + + /** + * Clear the table and test the migration end-to-end. + */ + @Test + @Order(40) + public void testMigration() throws Exception { + database.query(ctx -> ctx.deleteFrom(TABLE_AIRBYTE_CONFIGS) + .where(COLUMN_CONFIG_TYPE.eq(ConfigSchema.STANDARD_SYNC_STATE.name())) + .execute()); + + final Configs configs = mock(Configs.class); + when(configs.getDatabaseUser()).thenReturn(container.getUsername()); + when(configs.getDatabasePassword()).thenReturn(container.getPassword()); + when(configs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); + + final var migration = new V0_30_22_001__Store_last_sync_state(configs); + // this context is a flyway class; only the getConnection method is needed to run the migration + final Context context = new Context() { + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Connection getConnection() { + try { + return database.getDataSource().getConnection(); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + }; + migration.migrate(context); + database.query(ctx -> { + checkSyncStates(ctx, STD_CONNECTION_STATES, null); + return null; + }); + } + + /** + * Create a job record whose scope equals to the passed in connection id, and return the job id. + * + * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. + */ + private static long createJob(final DSLContext ctx, final UUID connectionId, final long creationOffset) { + final int insertCount = ctx.insertInto(JOBS_TABLE) + .set(JOB_SCOPE_FIELD, connectionId.toString()) + .set(JOB_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) + .execute(); + assertEquals(1, insertCount); + + return ctx.select(JOB_ID_FIELD) + .from(JOBS_TABLE) + .where(JOB_SCOPE_FIELD.eq(connectionId.toString())) + .orderBy(JOB_CREATED_AT_FIELD.desc()) + .limit(1) + .fetchOne() + .get(JOB_ID_FIELD); + } + + /** + * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. + */ + private static void createAttempt(final DSLContext ctx, + final long jobId, + final int attemptNumber, + final JobOutput attemptOutput, + final long creationOffset) { + final int insertCount = ctx.insertInto(ATTEMPTS_TABLE) + .set(ATTEMPT_JOB_ID_FIELD, jobId) + .set(ATTEMPT_NUMBER_FIELD, attemptNumber) + .set(ATTEMPT_OUTPUT_FIELD, JSONB.valueOf(Jsons.serialize(attemptOutput))) + .set(ATTEMPT_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) + .execute(); + assertEquals(1, insertCount); + + ctx.select(ATTEMPT_ID_FIELD) + .from(ATTEMPTS_TABLE) + .where(ATTEMPT_JOB_ID_FIELD.eq(jobId), ATTEMPT_NUMBER_FIELD.eq(attemptNumber)) + .fetchOne() + .get(ATTEMPT_ID_FIELD); + } + + /** + * Create an JobOutput object whose output type is StandardSyncOutput. + * + * @param state The state object within a StandardSyncOutput. + */ + private static JobOutput createAttemptOutput(final State state) { + final StandardSyncOutput standardSyncOutput = new StandardSyncOutput().withState(state); + return new JobOutput().withOutputType(OutputType.SYNC).withSync(standardSyncOutput); + } + + private static void checkSyncStates(final DSLContext ctx, + final Set standardSyncStates, + @Nullable final OffsetDateTime expectedTimestamp) { + for (final StandardSyncState standardSyncState : standardSyncStates) { + final var record = ctx + .select(COLUMN_CONFIG_BLOB, + COLUMN_CREATED_AT, + COLUMN_UPDATED_AT) + .from(TABLE_AIRBYTE_CONFIGS) + .where(COLUMN_CONFIG_ID.eq(standardSyncState.getConnectionId().toString()), + COLUMN_CONFIG_TYPE.eq(ConfigSchema.STANDARD_SYNC_STATE.name())) + .fetchOne(); + assertEquals(standardSyncState, Jsons.deserialize(record.value1().data(), StandardSyncState.class)); + if (expectedTimestamp != null) { + assertEquals(expectedTimestamp, record.value2()); + assertEquals(expectedTimestamp, record.value3()); + } + } + } + +} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/jobs/AbstractJobsDatabaseTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/jobs/AbstractJobsDatabaseTest.java index a7c49167e73e..b14fc53b8a63 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/jobs/AbstractJobsDatabaseTest.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/jobs/AbstractJobsDatabaseTest.java @@ -6,13 +6,14 @@ import io.airbyte.db.Database; import io.airbyte.db.instance.AbstractDatabaseTest; +import io.airbyte.db.instance.test.TestDatabaseProviders; import java.io.IOException; public abstract class AbstractJobsDatabaseTest extends AbstractDatabaseTest { @Override - public Database getAndInitializeDatabase(final String username, final String password, final String connectionString) throws IOException { - return new JobsDatabaseInstance(username, password, connectionString).getAndInitialize(); + public Database getDatabase() throws IOException { + return new TestDatabaseProviders(container).turnOffMigration().createNewJobsDatabase(); } } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/AbstractToysDatabaseTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/AbstractToysDatabaseTest.java deleted file mode 100644 index cccf8919c4e8..000000000000 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/AbstractToysDatabaseTest.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.instance.toys; - -import io.airbyte.db.Database; -import io.airbyte.db.instance.AbstractDatabaseTest; -import java.io.IOException; - -public abstract class AbstractToysDatabaseTest extends AbstractDatabaseTest { - - public Database getAndInitializeDatabase(final String username, final String password, final String connectionString) throws IOException { - return new ToysDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - } - -} diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/ToysDatabaseMigratorTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/ToysDatabaseMigratorTest.java index 8733a37a716c..4c432e844262 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/ToysDatabaseMigratorTest.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/toys/ToysDatabaseMigratorTest.java @@ -7,14 +7,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.Database; +import io.airbyte.db.instance.AbstractDatabaseTest; import io.airbyte.db.instance.DatabaseMigrator; +import java.io.IOException; import org.junit.jupiter.api.Test; -class ToysDatabaseMigratorTest extends AbstractToysDatabaseTest { +class ToysDatabaseMigratorTest extends AbstractDatabaseTest { private static final String PRE_MIGRATION_SCHEMA_DUMP = "toys_database/pre_migration_schema.txt"; private static final String POST_MIGRATION_SCHEMA_DUMP = "toys_database/schema_dump.txt"; + @Override + public Database getDatabase() throws IOException { + return new ToysDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + } + @Test public void testMigration() throws Exception { final DatabaseMigrator migrator = new ToysDatabaseMigrator(database, ToysDatabaseMigratorTest.class.getSimpleName()); diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java index d7629b278d11..1725ec346b1b 100644 --- a/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/Migrations.java @@ -17,6 +17,7 @@ import io.airbyte.migrate.migrations.MigrationV0_27_0; import io.airbyte.migrate.migrations.MigrationV0_28_0; import io.airbyte.migrate.migrations.MigrationV0_29_0; +import io.airbyte.migrate.migrations.MigrationV0_30_0; import io.airbyte.migrate.migrations.NoOpMigration; import java.util.List; @@ -39,7 +40,7 @@ public class Migrations { private static final Migration MIGRATION_V_0_27_0 = new MigrationV0_27_0(MIGRATION_V_0_26_0); public static final Migration MIGRATION_V_0_28_0 = new MigrationV0_28_0(MIGRATION_V_0_27_0); public static final Migration MIGRATION_V_0_29_0 = new MigrationV0_29_0(MIGRATION_V_0_28_0); - public static final Migration MIGRATION_V_0_30_0 = new NoOpMigration(MIGRATION_V_0_29_0, "0.30.0-alpha"); + public static final Migration MIGRATION_V_0_30_0 = new MigrationV0_30_0(MIGRATION_V_0_29_0); // all migrations must be added to the list in the order that they should be applied. public static final List MIGRATIONS = ImmutableList.of( diff --git a/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_30_0.java b/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_30_0.java new file mode 100644 index 000000000000..3ed383e32b35 --- /dev/null +++ b/airbyte-migration/src/main/java/io/airbyte/migrate/migrations/MigrationV0_30_0.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.migrate.migrations; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.migrate.Migration; +import io.airbyte.migrate.MigrationUtils; +import io.airbyte.migrate.ResourceId; +import io.airbyte.migrate.ResourceType; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This migration adds the StandardSyncState in the migration resource. + */ +public class MigrationV0_30_0 extends BaseMigration implements Migration { + + private static final Logger LOGGER = LoggerFactory.getLogger(MigrationV0_30_0.class); + private static final String MIGRATION_VERSION = "0.30.0-alpha"; + + private static final Path RESOURCE_PATH = Path.of("migrations/migrationV0_30_0/airbyte_config"); + + @VisibleForTesting + protected static final ResourceId STANDARD_SYNC_STATE_RESOURCE_ID = ResourceId + .fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC_STATE"); + + private final Migration previousMigration; + + public MigrationV0_30_0(final Migration previousMigration) { + super(previousMigration); + this.previousMigration = previousMigration; + } + + @Override + public String getVersion() { + return MIGRATION_VERSION; + } + + @Override + public Map getOutputSchema() { + final Map outputSchema = new HashMap<>(previousMigration.getOutputSchema()); + outputSchema.put(STANDARD_SYNC_STATE_RESOURCE_ID, + MigrationUtils.getSchemaFromResourcePath(RESOURCE_PATH, STANDARD_SYNC_STATE_RESOURCE_ID)); + return outputSchema; + } + + // no op migration. + @Override + public void migrate(final Map> inputData, final Map> outputData) { + for (final Map.Entry> entry : inputData.entrySet()) { + final Consumer recordConsumer = outputData.get(entry.getKey()); + entry.getValue().forEach(recordConsumer); + } + } + +} diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/StandardSyncState.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/StandardSyncState.yaml new file mode 100644 index 000000000000..964e084bc696 --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/StandardSyncState.yaml @@ -0,0 +1,17 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml +title: StandardSyncState +description: The current state of a connection (i.e. StandardSync). +type: object +additionalProperties: false +required: + - connectionId +properties: + connectionId: + type: string + format: uuid + description: This is a foreign key that references a connection (i.e. StandardSync). + state: + "$ref": State.yaml + description: The current (latest) connection state. diff --git a/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/State.yaml b/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/State.yaml new file mode 100644 index 000000000000..666941a7e3bb --- /dev/null +++ b/airbyte-migration/src/main/resources/migrations/migrationV0_30_0/airbyte_config/State.yaml @@ -0,0 +1,14 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/State.yaml +title: State +description: information output by the connection. +type: object +required: + - state +additionalProperties: false +properties: + state: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode diff --git a/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrationV0_30_0Test.java b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrationV0_30_0Test.java new file mode 100644 index 000000000000..688a7d0df410 --- /dev/null +++ b/airbyte-migration/src/test/java/io/airbyte/migrate/migrations/MigrationV0_30_0Test.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.migrate.migrations; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.migrate.Migrations; +import org.junit.jupiter.api.Test; + +class MigrationV0_30_0Test { + + @Test + void testMigration() { + // standard sync state does not exist before migration v0.30.0 + assertFalse(Migrations.MIGRATION_V_0_29_0.getOutputSchema().containsKey(MigrationV0_30_0.STANDARD_SYNC_STATE_RESOURCE_ID)); + assertTrue(Migrations.MIGRATION_V_0_30_0.getOutputSchema().containsKey(MigrationV0_30_0.STANDARD_SYNC_STATE_RESOURCE_ID)); + } + +} diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java index ddd943bb8c36..7e2db6e7b900 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java @@ -54,7 +54,7 @@ public JobScheduler(final JobPersistence jobPersistence, configRepository, new ScheduleJobPredicate(Instant::now), new DefaultSyncJobFactory( - new DefaultJobCreator(jobPersistence), + new DefaultJobCreator(jobPersistence, configRepository), configRepository, new OAuthConfigSupplier(configRepository, false, trackingClient))); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 835440ec30de..d02b1cd275c9 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -204,7 +204,6 @@ public static void main(final String[] args) throws IOException, InterruptedExce configs.getDatabaseUrl()) .getInitialized(); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final Database configDatabase = new ConfigsDatabaseInstance( configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), @@ -215,11 +214,13 @@ public static void main(final String[] args) throws IOException, InterruptedExce final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs); final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence); + + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final JobCleaner jobCleaner = new JobCleaner( configs.getWorkspaceRetentionConfig(), workspaceRoot, jobPersistence); - AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get()); + AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow()); TrackingClientSingleton.initialize( configs.getTrackingStrategy(), @@ -238,8 +239,16 @@ public static void main(final String[] args) throws IOException, InterruptedExce MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics()); LOGGER.info("Launching scheduler..."); - new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient, - Integer.parseInt(configs.getSubmitterNumThreads()), configs.getMaxSyncJobAttempts(), configs.getAirbyteVersionOrWarning()) + new SchedulerApp( + workspaceRoot, + jobPersistence, + configRepository, + jobCleaner, + jobNotifier, + temporalClient, + Integer.parseInt(configs.getSubmitterNumThreads()), + configs.getMaxSyncJobAttempts(), + configs.getAirbyteVersionOrWarning()) .start(); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java index 579e7b5e9266..65aed379df31 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java @@ -20,6 +20,7 @@ import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalResponse; import java.nio.file.Path; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +45,14 @@ public WorkerRun create(final Job job) { public CheckedSupplier, Exception> createSupplier(final Job job, final int attemptId) { final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType()); + final UUID connectionId = UUID.fromString(job.getScope()); return switch (job.getConfigType()) { case SYNC -> () -> { - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, job.getConfig().getSync()); + final TemporalResponse output = temporalClient.submitSync( + job.getId(), + attemptId, + job.getConfig().getSync(), + connectionId); return toOutputAndStatus(output); }; case RESET_CONNECTION -> () -> { @@ -63,7 +69,7 @@ public CheckedSupplier, Exception> createSupplier(fin .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()); - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config); + final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); return toOutputAndStatus(output); }; default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType); @@ -84,7 +90,7 @@ private OutputAndStatus toOutputAndStatus(final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync())).thenReturn(mockResponse); + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); - verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync()); + verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), CONNECTION_ID); assertEquals(jobRoot, workerRun.getJobRoot()); } @@ -83,13 +86,13 @@ void testResetConnection() throws Exception { when(job.getConfigType()).thenReturn(ConfigType.RESET_CONNECTION); when(job.getConfig().getResetConnection()).thenReturn(resetConfig); final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig)).thenReturn(mockResponse); + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); final ArgumentCaptor argument = ArgumentCaptor.forClass(JobSyncConfig.class); - verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture()); + verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture(), eq(CONNECTION_ID)); assertEquals(syncConfig, argument.getValue()); assertEquals(jobRoot, workerRun.getJobRoot()); } diff --git a/airbyte-scheduler/persistence/build.gradle b/airbyte-scheduler/persistence/build.gradle index fa9abd9512a3..73ae03582082 100644 --- a/airbyte-scheduler/persistence/build.gradle +++ b/airbyte-scheduler/persistence/build.gradle @@ -8,6 +8,7 @@ dependencies { implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') implementation project(':airbyte-db:lib') + implementation project(':airbyte-db:jooq') implementation project(':airbyte-json-validation') implementation project(':airbyte-notification') implementation project(':airbyte-oauth') diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java index 64b2162740cf..dd2194cad4dc 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java @@ -12,6 +12,7 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.SyncMode; @@ -22,9 +23,11 @@ public class DefaultJobCreator implements JobCreator { private final JobPersistence jobPersistence; + private final ConfigRepository configRepository; - public DefaultJobCreator(final JobPersistence jobPersistence) { + public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigRepository configRepository) { this.jobPersistence = jobPersistence; + this.configRepository = configRepository; } @Override @@ -49,7 +52,7 @@ public Optional createSyncJob(final SourceConnection source, .withState(null) .withResourceRequirements(standardSync.getResourceRequirements()); - jobPersistence.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); + configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.SYNC) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 13c9379b9768..8feceddbda9a 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -4,6 +4,8 @@ package io.airbyte.scheduler.persistence; +import static io.airbyte.db.instance.jobs.jooq.Tables.ATTEMPTS; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -18,8 +20,6 @@ import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; -import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.State; import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; @@ -32,6 +32,7 @@ import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -51,6 +52,7 @@ import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.InsertValuesStepN; +import org.jooq.JSONB; import org.jooq.JSONFormat; import org.jooq.JSONFormat.RecordFormat; import org.jooq.Named; @@ -105,26 +107,29 @@ public class DefaultJobPersistence implements JobPersistence { public static final String ORDER_BY_JOB_TIME_ATTEMPT_TIME = "ORDER BY jobs.created_at DESC, jobs.id DESC, attempts.created_at ASC, attempts.id ASC "; - private final ExceptionWrappingDatabase database; + private final ExceptionWrappingDatabase jobDatabase; private final Supplier timeSupplier; @VisibleForTesting - DefaultJobPersistence(final Database database, + DefaultJobPersistence(final Database jobDatabase, final Supplier timeSupplier, final int minimumAgeInDays, final int excessiveNumberOfJobs, final int minimumRecencyCount) { - this.database = new ExceptionWrappingDatabase(database); + this.jobDatabase = new ExceptionWrappingDatabase(jobDatabase); this.timeSupplier = timeSupplier; JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays; JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs; JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount; } - public DefaultJobPersistence(final Database database) { - this(database, Instant::now, 30, 500, 10); + public DefaultJobPersistence(final Database jobDatabase) { + this(jobDatabase, Instant::now, 30, 500, 10); } + /** + * @param scope This is the primary id of a standard sync (StandardSync#connectionId). + */ @Override public Optional enqueueJob(final String scope, final JobConfig jobConfig) throws IOException { LOGGER.info("enqueuing pending job for scope: {}", scope); @@ -137,7 +142,7 @@ public Optional enqueueJob(final String scope, final JobConfig jobConfig) JobStatus.TERMINAL_STATUSES.stream().map(Sqls::toSqlName).map(Names::singleQuote).collect(Collectors.joining(","))) : ""; - return database.query( + return jobDatabase.query( ctx -> ctx.fetch( "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + "SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " + @@ -157,7 +162,7 @@ public Optional enqueueJob(final String scope, final JobConfig jobConfig) @Override public void resetJob(final long jobId) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - database.query(ctx -> { + jobDatabase.query(ctx -> { updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.PENDING, now, new IllegalStateException(String.format("Attempt to reset a job that is in a terminal state. job id: %s", jobId))); return null; @@ -167,7 +172,7 @@ public void resetJob(final long jobId) throws IOException { @Override public void cancelJob(final long jobId) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - database.query(ctx -> { + jobDatabase.query(ctx -> { updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.CANCELLED, now); return null; }); @@ -176,14 +181,13 @@ public void cancelJob(final long jobId) throws IOException { @Override public void failJob(final long jobId) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - database.query(ctx -> { + jobDatabase.query(ctx -> { updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.FAILED, now); return null; }); } - private void updateJobStatusIfNotInTerminalState( - final DSLContext ctx, + private void updateJobStatusIfNotInTerminalState(final DSLContext ctx, final long jobId, final JobStatus newStatus, final LocalDateTime now, @@ -212,19 +216,19 @@ private void updateJobStatus(final DSLContext ctx, final long jobId, final JobSt public int createAttempt(final long jobId, final Path logPath) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - return database.transaction(ctx -> { + return jobDatabase.transaction(ctx -> { final Job job = getJob(ctx, jobId); if (job.isJobInTerminalState()) { - final var errMsg = - String.format("Cannot create an attempt for a job id: %s that is in a terminal state: %s for connection id: %s", job.getId(), - job.getStatus(), job.getScope()); + final var errMsg = String.format( + "Cannot create an attempt for a job id: %s that is in a terminal state: %s for connection id: %s", + job.getId(), job.getStatus(), job.getScope()); throw new IllegalStateException(errMsg); } if (job.hasRunningAttempt()) { - final var errMsg = - String.format("Cannot create an attempt for a job id: %s that has a running attempt: %s for connection id: %s", job.getId(), - job.getStatus(), job.getScope()); + final var errMsg = String.format( + "Cannot create an attempt for a job id: %s that has a running attempt: %s for connection id: %s", + job.getId(), job.getStatus(), job.getScope()); throw new IllegalStateException(errMsg); } @@ -250,7 +254,7 @@ public int createAttempt(final long jobId, final Path logPath) throws IOExceptio @Override public void failAttempt(final long jobId, final int attemptNumber) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - database.transaction(ctx -> { + jobDatabase.transaction(ctx -> { // do not overwrite terminal states. updateJobStatusIfNotInTerminalState(ctx, jobId, JobStatus.INCOMPLETE, now); @@ -268,7 +272,7 @@ public void failAttempt(final long jobId, final int attemptNumber) throws IOExce @Override public void succeedAttempt(final long jobId, final int attemptNumber) throws IOException { final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - database.transaction(ctx -> { + jobDatabase.transaction(ctx -> { // override any other terminal statuses if we are now succeeded. updateJobStatus(ctx, jobId, JobStatus.SUCCEEDED, now); @@ -285,7 +289,7 @@ public void succeedAttempt(final long jobId, final int attemptNumber) throws IOE @Override public void setAttemptTemporalWorkflowId(final long jobId, final int attemptNumber, final String temporalWorkflowId) throws IOException { - database.query(ctx -> ctx.execute( + jobDatabase.query(ctx -> ctx.execute( " UPDATE attempts SET temporal_workflow_id = ? WHERE job_id = ? AND attempt_number = ?", temporalWorkflowId, jobId, @@ -294,7 +298,7 @@ public void setAttemptTemporalWorkflowId(final long jobId, final int attemptNumb @Override public Optional getAttemptTemporalWorkflowId(final long jobId, final int attemptNumber) throws IOException { - final var result = database.query(ctx -> ctx.fetch( + final var result = jobDatabase.query(ctx -> ctx.fetch( " SELECT temporal_workflow_id from attempts WHERE job_id = ? AND attempt_number = ?", jobId, attemptNumber)).stream().findFirst(); @@ -308,20 +312,18 @@ public Optional getAttemptTemporalWorkflowId(final long jobId, final int @Override public void writeOutput(final long jobId, final int attemptNumber, final T output) throws IOException { - final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - - database.query( - ctx -> ctx.execute( - "UPDATE attempts SET output = CAST(? as JSONB), updated_at = ? WHERE job_id = ? AND attempt_number = ?", - Jsons.serialize(output), - now, - jobId, - attemptNumber)); + final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); + jobDatabase.transaction( + ctx -> ctx.update(ATTEMPTS) + .set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output))) + .set(ATTEMPTS.UPDATED_AT, now) + .where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber)) + .execute()); } @Override public Job getJob(final long jobId) throws IOException { - return database.query(ctx -> getJob(ctx, jobId)); + return jobDatabase.query(ctx -> getJob(ctx, jobId)); } private Job getJob(final DSLContext ctx, final long jobId) { @@ -339,7 +341,7 @@ public List listJobs(final ConfigType configType, final String configId, fi @Override public List listJobs(final Set configTypes, final String configId, final int pagesize, final int offset) throws IOException { - return database.query(ctx -> getJobsFromResult(ctx.fetch( + return jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch( BASE_JOB_SELECT_AND_JOIN + "WHERE " + "CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " " + "AND scope = ? " + @@ -355,7 +357,7 @@ public List listJobsWithStatus(final JobStatus status) throws IOException { @Override public List listJobsWithStatus(final Set configTypes, final JobStatus status) throws IOException { - return database.query(ctx -> getJobsFromResult(ctx + return jobDatabase.query(ctx -> getJobsFromResult(ctx .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + "CAST(config_type AS VARCHAR) IN " + Sqls.toSqlInFragment(configTypes) + " AND " + "CAST(jobs.status AS VARCHAR) = ? " + @@ -370,7 +372,7 @@ public List listJobsWithStatus(final ConfigType configType, final JobStatus @Override public Optional getLastReplicationJob(final UUID connectionId) throws IOException { - return database.query(ctx -> ctx + return jobDatabase.query(ctx -> ctx .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + "CAST(jobs.config_type AS VARCHAR) in " + Sqls.toSqlInFragment(Job.REPLICATION_TYPES) + " AND " + "scope = ? AND " + @@ -383,31 +385,13 @@ public Optional getLastReplicationJob(final UUID connectionId) throws IOExc .flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class)))); } - @Override - public Optional getCurrentState(final UUID connectionId) throws IOException { - return database.query(ctx -> ctx - .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + - "CAST(jobs.config_type AS VARCHAR) in " + Sqls.toSqlInFragment(Job.REPLICATION_TYPES) + " AND " + - "scope = ? AND " + - "output->'sync'->'state' IS NOT NULL " + - "ORDER BY attempts.created_at DESC LIMIT 1", - connectionId.toString()) - .stream() - .findFirst() - .flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))) - .flatMap(Job::getLastAttemptWithOutput) - .flatMap(Attempt::getOutput) - .map(JobOutput::getSync) - .map(StandardSyncOutput::getState)); - } - @Override public Optional getNextJob() throws IOException { // rules: // 1. get oldest, pending job // 2. job is excluded if another job of the same scope is already running // 3. job is excluded if another job of the same scope is already incomplete - return database.query(ctx -> ctx + return jobDatabase.query(ctx -> ctx .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + "CAST(jobs.status AS VARCHAR) = 'pending' AND " + "jobs.scope NOT IN ( SELECT scope FROM jobs WHERE status = 'running' OR status = 'incomplete' ) " + @@ -420,7 +404,7 @@ public Optional getNextJob() throws IOException { @Override public List listJobs(final ConfigType configType, final Instant attemptEndedAtTimestamp) throws IOException { final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(attemptEndedAtTimestamp, ZoneOffset.UTC); - return database.query(ctx -> getJobsFromResult(ctx + return jobDatabase.query(ctx -> getJobsFromResult(ctx .fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + "CAST(config_type AS VARCHAR) = ? AND " + " attempts.ended_at > ? ORDER BY jobs.created_at ASC, attempts.created_at ASC", Sqls.toSqlName(configType), @@ -473,7 +457,7 @@ private static long getEpoch(final Record record, final String fieldName) { @Override public Optional getVersion() throws IOException { - final Result result = database.query(ctx -> ctx.select() + final Result result = jobDatabase.query(ctx -> ctx.select() .from(AIRBYTE_METADATA_TABLE) .where(DSL.field(METADATA_KEY_COL).eq(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME)) .fetch()); @@ -482,7 +466,7 @@ public Optional getVersion() throws IOException { @Override public void setVersion(final String airbyteVersion) throws IOException { - database.query(ctx -> ctx.execute(String.format( + jobDatabase.query(ctx -> ctx.execute(String.format( "INSERT INTO %s(%s, %s) VALUES('%s', '%s'), ('%s_init_db', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'", AIRBYTE_METADATA_TABLE, METADATA_KEY_COL, @@ -498,7 +482,7 @@ public void setVersion(final String airbyteVersion) throws IOException { @Override public Optional getDeployment() throws IOException { - final Result result = database.query(ctx -> ctx.select() + final Result result = jobDatabase.query(ctx -> ctx.select() .from(AIRBYTE_METADATA_TABLE) .where(DSL.field(METADATA_KEY_COL).eq(DEPLOYMENT_ID_KEY)) .fetch()); @@ -508,7 +492,7 @@ public Optional getDeployment() throws IOException { @Override public void setDeployment(final UUID deployment) throws IOException { // if an existing deployment id already exists, on conflict, return it so we can log it. - final UUID committedDeploymentId = database.query(ctx -> ctx.fetch(String.format( + final UUID committedDeploymentId = jobDatabase.query(ctx -> ctx.fetch(String.format( "INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO NOTHING RETURNING (SELECT %s FROM %s WHERE %s='%s') as existing_deployment_id", AIRBYTE_METADATA_TABLE, METADATA_KEY_COL, @@ -577,7 +561,7 @@ private Map> exportDatabase(final String sc */ private List listTables(final String schema) throws IOException { if (schema != null) { - return database.query(context -> context.meta().getSchemas(schema).stream() + return jobDatabase.query(context -> context.meta().getSchemas(schema).stream() .flatMap(s -> context.meta(s).getTables().stream()) .map(Named::getName) .filter(table -> JobsDatabaseSchema.getTableNames().contains(table.toLowerCase())) @@ -598,7 +582,7 @@ public void purgeJobHistory(final LocalDateTime asOfDate) { final String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql"); // interval '?' days cannot use a ? bind, so we're using %d instead. final String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1)); - final Integer rows = database.query(ctx -> ctx.execute(sql, + final Integer rows = jobDatabase.query(ctx -> ctx.execute(sql, asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, JOB_HISTORY_MINIMUM_RECENCY)); @@ -609,7 +593,7 @@ public void purgeJobHistory(final LocalDateTime asOfDate) { private List listAllTables(final String schema) throws IOException { if (schema != null) { - return database.query(context -> context.meta().getSchemas(schema).stream() + return jobDatabase.query(context -> context.meta().getSchemas(schema).stream() .flatMap(s -> context.meta(s).getTables().stream()) .map(Named::getName) .collect(Collectors.toList())); @@ -619,7 +603,7 @@ private List listAllTables(final String schema) throws IOException { } private List listSchemas() throws IOException { - return database.query(context -> context.meta().getSchemas().stream() + return jobDatabase.query(context -> context.meta().getSchemas().stream() .map(Named::getName) .filter(c -> !SYSTEM_SCHEMA.contains(c)) .collect(Collectors.toList())); @@ -628,7 +612,7 @@ private List listSchemas() throws IOException { private Stream exportTable(final String schema, final String tableName) throws IOException { final Table tableSql = getTable(schema, tableName); - try (final Stream records = database.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) { + try (final Stream records = jobDatabase.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) { return records.map(record -> { final Set jsonFieldNames = Arrays.stream(record.fields()) .filter(f -> f.getDataType().getTypeName().equals("jsonb")) @@ -655,7 +639,7 @@ private void importDatabase(final String airbyteVersion, throws IOException { if (!data.isEmpty()) { createSchema(BACKUP_SCHEMA); - database.transaction(ctx -> { + jobDatabase.transaction(ctx -> { for (final JobsDatabaseSchema tableType : data.keySet()) { if (!incrementalImport) { truncateTable(ctx, targetSchema, tableType.name(), BACKUP_SCHEMA); @@ -670,7 +654,7 @@ private void importDatabase(final String airbyteVersion, } private void createSchema(final String schema) throws IOException { - database.query(ctx -> ctx.createSchemaIfNotExists(schema).execute()); + jobDatabase.query(ctx -> ctx.createSchemaIfNotExists(schema).execute()); } /** diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index a059e9dc47d5..d784c1f3f51c 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; -import io.airbyte.config.State; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; @@ -36,7 +35,8 @@ public interface JobPersistence { /** * Enqueue a new job. Its initial status will be pending. * - * @param scope key that will be used to determine if two jobs should not be run at the same time. + * @param scope key that will be used to determine if two jobs should not be run at the same time; + * it is the primary id of the standard sync (StandardSync#connectionId) * @param jobConfig configuration for the job * @return job id * @throws IOException exception due to interaction with persistence @@ -117,6 +117,11 @@ public interface JobPersistence { */ Optional getAttemptTemporalWorkflowId(long jobId, int attemptNumber) throws IOException; + /** + * When the output is a StandardSyncOutput, caller of this method should persiste + * StandardSyncOutput#state in the configs database by calling + * ConfigRepository#updateConnectionState, which takes care of persisting the connection state. + */ void writeOutput(long jobId, int attemptNumber, T output) throws IOException; /** @@ -146,19 +151,6 @@ public interface JobPersistence { Optional getLastReplicationJob(UUID connectionId) throws IOException; - /** - * if a job does not succeed, we assume that it synced nothing. that is the most conservative - * assumption we can make. as long as all destinations write the final data output in a - * transactional way, this will be true. if this changes, then we may end up writing duplicate data - * with our incremental append only. this is preferable to failing to send data at all. our - * incremental append only most closely resembles a deliver at least once strategy anyway. - * - * @param connectionId - id of the connection whose state we want to fetch. - * @return the current state, if any of, the connection - * @throws IOException exception due to interaction with persistence - */ - Optional getCurrentState(UUID connectionId) throws IOException; - Optional getNextJob() throws IOException; /// ARCHIVE diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java index fd0acbea3704..e47d14d9d79b 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java @@ -24,6 +24,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -52,6 +53,7 @@ public class DefaultJobCreatorTest { private static final long JOB_ID = 12L; private JobPersistence jobPersistence; + private ConfigRepository configRepository; private JobCreator jobCreator; static { @@ -109,9 +111,10 @@ public class DefaultJobCreatorTest { } @BeforeEach - void setup() throws IOException { + void setup() { jobPersistence = mock(JobPersistence.class); - jobCreator = new DefaultJobCreator(jobPersistence); + configRepository = mock(ConfigRepository.class); + jobCreator = new DefaultJobCreator(jobPersistence, configRepository); } @Test diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 84c3379f8e7b..cb886bb5db55 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -4,6 +4,9 @@ package io.airbyte.scheduler.persistence; +import static io.airbyte.db.instance.jobs.jooq.Tables.AIRBYTE_METADATA; +import static io.airbyte.db.instance.jobs.jooq.Tables.ATTEMPTS; +import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -14,7 +17,6 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; @@ -23,15 +25,10 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.JobOutput; -import io.airbyte.config.JobOutput.OutputType; import io.airbyte.config.JobSyncConfig; -import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.State; import io.airbyte.db.Database; -import io.airbyte.db.instance.DatabaseMigrator; -import io.airbyte.db.instance.jobs.JobsDatabaseInstance; -import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.AttemptStatus; import io.airbyte.scheduler.models.Job; @@ -90,7 +87,8 @@ class DefaultJobPersistenceTest { .withSync(new JobSyncConfig()); private static PostgreSQLContainer container; - private Database database; + private Database jobDatabase; + private Database configDatabase; private Supplier timeSupplier; private JobPersistence jobPersistence; @@ -158,33 +156,30 @@ private static Job createJob( @SuppressWarnings("unchecked") @BeforeEach public void setup() throws Exception { - database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + jobDatabase = databaseProviders.createNewJobsDatabase(); resetDb(); - final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(database, "test"); - jobDbMigrator.createBaseline(); - jobDbMigrator.migrate(); - timeSupplier = mock(Supplier.class); when(timeSupplier.get()).thenReturn(NOW); - jobPersistence = new DefaultJobPersistence(database, timeSupplier, 30, 500, 10); + jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10); } @AfterEach void tearDown() throws Exception { - database.close(); + jobDatabase.close(); } private void resetDb() throws SQLException { // todo (cgardens) - truncate whole db. - database.query(ctx -> ctx.execute("TRUNCATE TABLE jobs")); - database.query(ctx -> ctx.execute("TRUNCATE TABLE attempts")); - database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_metadata")); + jobDatabase.query(ctx -> ctx.truncateTable(JOBS).execute()); + jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).execute()); + jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).execute()); } private Result getJobRecord(final long jobId) throws SQLException { - return database.query(ctx -> ctx.fetch(DefaultJobPersistence.BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.id = ?", jobId)); + return jobDatabase.query(ctx -> ctx.fetch(DefaultJobPersistence.BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.id = ?", jobId)); } @Test @@ -336,7 +331,7 @@ void testListJobsWithTimestamp() throws IOException { now.plusSeconds(14), now.plusSeconds(15), now.plusSeconds(16)); - jobPersistence = new DefaultJobPersistence(database, timeSupplier, 30, 500, 10); + jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10); final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH); jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0); @@ -439,7 +434,7 @@ void testSuccessfulGet() throws IOException, SQLException { final var defaultWorkflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, attemptNumber); assertTrue(defaultWorkflowId.isEmpty()); - database.query(ctx -> ctx.execute( + jobDatabase.query(ctx -> ctx.execute( "UPDATE attempts SET temporal_workflow_id = '56a81f3a-006c-42d7-bce2-29d675d08ea4' WHERE job_id = ? AND attempt_number =?", jobId, attemptNumber)); final var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, attemptNumber).get(); @@ -724,107 +719,6 @@ public void testGetLastSyncJobForConnectionId() throws IOException { } - @Nested - @DisplayName("When getting current state") - class GetCurrentState { - - @Test - @DisplayName("Should take latest state (regardless of attempt status)") - void testGetCurrentStateWithMultipleAttempts() throws IOException { - // just have the time monotonically increase. - when(timeSupplier.get()).thenAnswer(a -> Instant.now()); - - // create a job - final long jobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - - // fail the first attempt - jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH)); - assertTrue(jobPersistence.getCurrentState(UUID.fromString(SCOPE)).isEmpty()); - - // succeed the second attempt - final State state = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 4))); - final JobOutput jobOutput = new JobOutput().withOutputType(OutputType.SYNC).withSync(new StandardSyncOutput().withState(state)); - final int attemptId = jobPersistence.createAttempt(jobId, LOG_PATH); - jobPersistence.writeOutput(jobId, attemptId, jobOutput); - jobPersistence.succeedAttempt(jobId, attemptId); - - // verify we get that state back. - assertEquals(Optional.of(state), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - - // create a second job - final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - - // check that when we have a failed attempt with no state, we still use old state. - jobPersistence.failAttempt(jobId2, jobPersistence.createAttempt(jobId2, LOG_PATH)); - assertEquals(Optional.of(state), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - - // check that when we have a failed attempt, if we have a state we use it. - final State state2 = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 5))); - final JobOutput jobOutput2 = new JobOutput().withSync(new StandardSyncOutput().withState(state2)); - final int attemptId2 = jobPersistence.createAttempt(jobId2, LOG_PATH); - jobPersistence.writeOutput(jobId2, attemptId2, jobOutput2); - jobPersistence.failAttempt(jobId2, attemptId2); - assertEquals(Optional.of(state2), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - } - - @Test - @DisplayName("Should not have state if the latest attempt did not succeeded and have state otherwise") - public void testGetCurrentStateForConnectionIdNoState() throws IOException { - // no state when the connection has never had a job. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - final long jobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); - - // no state when connection has a job but it has not completed that has not completed - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.failJob(jobId); - - // no state when connection has a job but it is failed. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.cancelJob(jobId); - - // no state when connection has a job but it is cancelled. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - final JobOutput jobOutput1 = new JobOutput() - .withSync(new StandardSyncOutput().withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "1"))))); - jobPersistence.writeOutput(jobId, attemptNumber, jobOutput1); - jobPersistence.succeedAttempt(jobId, attemptNumber); - - // job 1 state, after first success. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - when(timeSupplier.get()).thenReturn(NOW.plusSeconds(1000)); - final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - final int attemptNumber2 = jobPersistence.createAttempt(jobId2, LOG_PATH); - - // job 1 state, second job created. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.failJob(jobId2); - - // job 1 state, second job failed. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.cancelJob(jobId2); - - // job 1 state, second job cancelled - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - final JobOutput jobOutput2 = new JobOutput() - .withSync(new StandardSyncOutput().withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "2"))))); - jobPersistence.writeOutput(jobId2, attemptNumber2, jobOutput2); - jobPersistence.succeedAttempt(jobId2, attemptNumber2); - - // job 2 state, after second job success. - assertEquals(Optional.of(jobOutput2.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - } - - } - @Nested @DisplayName("When getting next job") class GetNextJob { @@ -1181,7 +1075,7 @@ class PurgeJobHistory { private Job persistJobForJobHistoryTesting(final String scope, final JobConfig jobConfig, final JobStatus status, final LocalDateTime runDate) throws IOException, SQLException { final String when = runDate.toString(); - final Optional id = database.query( + final Optional id = jobDatabase.query( ctx -> ctx.fetch( "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + "SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " + @@ -1210,7 +1104,7 @@ private void persistAttemptForJobHistoryTesting(final Job job, final String logP + " \"sync\": {\n" + " \"output_catalog\": {" + "}}}"; - final Integer attemptNumber = database.query(ctx -> ctx.fetch( + final Integer attemptNumber = jobDatabase.query(ctx -> ctx.fetch( "INSERT INTO attempts(job_id, attempt_number, log_path, status, created_at, updated_at, output) " + "VALUES(?, ?, ?, CAST(? AS ATTEMPT_STATUS), ?, ?, CAST(? as JSONB)) RETURNING attempt_number", job.getId(), @@ -1295,7 +1189,8 @@ void testPurgeJobHistory(final int numJobs, final String DECOY_SCOPE = UUID.randomUUID().toString(); // Reconfigure constants to test various combinations of tuning knobs and make sure all work. - final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(database, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); + final DefaultJobPersistence jobPersistence = + new DefaultJobPersistence(jobDatabase, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); final LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 6d03bd2c040e..f12273be2df9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -168,8 +168,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) .getAndInitialize(); - final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase); - configPersistence.migrateFileConfigs(configs); + final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).migrateFileConfigs(configs); final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs); final Optional secretPersistence = SecretPersistence.getLongLived(configs); @@ -210,7 +209,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost()); final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot()); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, false, trackingClient); - final SchedulerJobClient schedulerJobClient = new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence)); + final SchedulerJobClient schedulerJobClient = + new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence, configRepository)); final DefaultSynchronousSchedulerClient syncSchedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); final SynchronousSchedulerClient bucketSpecCacheSchedulerClient = diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index b9709985d165..1f025d29672a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -68,7 +68,6 @@ public class SchedulerHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class); - private static final UUID NO_WORKSPACE = UUID.fromString("00000000-0000-0000-0000-000000000000"); private final ConfigRepository configRepository; private final SchedulerJobClient schedulerJobClient; @@ -354,7 +353,7 @@ public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdReq } public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException { - final Optional currentState = jobPersistence.getCurrentState(connectionIdRequestBody.getConnectionId()); + final Optional currentState = configRepository.getConnectionState(connectionIdRequestBody.getConnectionId()); LOGGER.info("currentState server: {}", currentState); final ConnectionState connectionState = new ConnectionState() @@ -365,6 +364,7 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques return connectionState; } + // todo (cgardens) - this method needs a test. public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException { final long jobId = jobIdRequestBody.getId(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index 2aa2546a191d..971778031dae 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -31,8 +31,7 @@ import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; import io.airbyte.db.Database; -import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; -import io.airbyte.db.instance.jobs.JobsDatabaseInstance; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; @@ -68,7 +67,8 @@ public class ArchiveHandlerTest { private static final String VERSION = "0.6.8"; private static PostgreSQLContainer container; - private Database database; + private Database jobDatabase; + private Database configDatabase; private JobPersistence jobPersistence; private DatabaseConfigPersistence configPersistence; private ConfigPersistence seedPersistence; @@ -102,11 +102,12 @@ public static void dbDown() { @BeforeEach public void setup() throws Exception { - database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - jobPersistence = new DefaultJobPersistence(database); - database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + jobDatabase = databaseProviders.createNewJobsDatabase(); + configDatabase = databaseProviders.createNewConfigsDatabase(); + jobPersistence = new DefaultJobPersistence(jobDatabase); seedPersistence = YamlSeedConfigPersistence.getDefault(); - configPersistence = new DatabaseConfigPersistence(database); + configPersistence = new DatabaseConfigPersistence(jobDatabase); configPersistence.replaceAllConfigs(Collections.emptyMap(), false); configPersistence.loadData(seedPersistence); configRepository = new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty()); @@ -131,7 +132,8 @@ public void setup() throws Exception { @AfterEach void tearDown() throws Exception { - database.close(); + jobDatabase.close(); + configDatabase.close(); } /** diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 85c1b0b76ba5..24681797ab91 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -557,7 +557,7 @@ void testResetConnection() throws JsonValidationException, IOException, ConfigNo void testGetCurrentState() throws IOException { final UUID connectionId = UUID.randomUUID(); final State state = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 1))); - when(jobPersistence.getCurrentState(connectionId)).thenReturn(Optional.of(state)); + when(configRepository.getConnectionState(connectionId)).thenReturn(Optional.of(state)); final ConnectionState connectionState = schedulerHandler.getState(new ConnectionIdRequestBody().connectionId(connectionId)); assertEquals(new ConnectionState().connectionId(connectionId).state(state.getState()), connectionState); @@ -566,7 +566,7 @@ void testGetCurrentState() throws IOException { @Test void testGetCurrentStateEmpty() throws IOException { final UUID connectionId = UUID.randomUUID(); - when(jobPersistence.getCurrentState(connectionId)).thenReturn(Optional.empty()); + when(configRepository.getConnectionState(connectionId)).thenReturn(Optional.empty()); final ConnectionState connectionState = schedulerHandler.getState(new ConnectionIdRequestBody().connectionId(connectionId)); assertEquals(new ConnectionState().connectionId(connectionId), connectionState); diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java index 3bef88fc15c6..d67f98fd73f9 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java @@ -9,8 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.airbyte.db.Database; -import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import java.io.IOException; @@ -28,7 +28,8 @@ public class DatabaseArchiverTest { private static final String TEMP_PREFIX = "testDatabaseArchive"; private PostgreSQLContainer container; - private Database database; + private Database jobDatabase; + private Database configDatabase; private DatabaseArchiver databaseArchiver; @BeforeEach @@ -39,21 +40,24 @@ void setUp() throws IOException { .withPassword("docker"); container.start(); - database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - final JobPersistence persistence = new DefaultJobPersistence(database); + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + jobDatabase = databaseProviders.createNewJobsDatabase(); + configDatabase = databaseProviders.createNewConfigsDatabase(); + final JobPersistence persistence = new DefaultJobPersistence(jobDatabase); databaseArchiver = new DatabaseArchiver(persistence); } @AfterEach void tearDown() throws Exception { - database.close(); + jobDatabase.close(); + configDatabase.close(); container.close(); } @Test void testUnknownTableExport() throws Exception { // Create a table that is not declared in JobsDatabaseSchema - database.query(ctx -> { + jobDatabase.query(ctx -> { ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);"); ctx.fetch( "INSERT INTO id_and_name (id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');"); @@ -74,7 +78,7 @@ void testUnknownTableExport() throws Exception { @Test void testDatabaseExportImport() throws Exception { - database.query(ctx -> { + jobDatabase.query(ctx -> { ctx.fetch( "INSERT INTO jobs(id, scope, config_type, config, status, created_at, started_at, updated_at) VALUES " + "(1,'get_spec_scope', 'get_spec', '{ \"type\" : \"getSpec\" }', 'succeeded', '2004-10-19', null, '2004-10-19'), " diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index 8fe1365079c1..79129e554937 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -11,9 +11,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.io.Resources; import io.airbyte.commons.io.Archives; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.Configs; import io.airbyte.config.DestinationConnection; import io.airbyte.config.OperatorNormalization.Option; import io.airbyte.config.SourceConnection; @@ -24,14 +27,17 @@ import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.config.persistence.FileSystemConfigPersistence; +import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence; import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.db.Database; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.migrate.Migrations; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.RunMigration; @@ -49,10 +55,12 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; public class RunMigrationTest { @@ -60,9 +68,35 @@ public class RunMigrationTest { private static final String TARGET_VERSION = Migrations.MIGRATIONS.get(Migrations.MIGRATIONS.size() - 1).getVersion(); private static final String DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED = "d2147be5-fa36-4936-977e-f031affa5895"; private static final String DEPRECATED_SOURCE_DEFINITION_BEING_USED = "4eb22946-2a79-4d20-a3e6-effd234613c3"; + private static final ConnectorSpecification MOCK_CONNECTOR_SPEC = new ConnectorSpecification() + .withConnectionSpecification(Jsons.deserialize("{}")); + + private static PostgreSQLContainer container; + private static Database jobDatabase; + private static Database configDatabase; + private List resourceToBeCleanedUp; private SecretPersistence secretPersistence; + @BeforeAll + public static void prepareContainer() throws IOException { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName("airbyte") + .withUsername("docker") + .withPassword("docker"); + container.start(); + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + jobDatabase = databaseProviders.createNewJobsDatabase(); + configDatabase = databaseProviders.createNewConfigsDatabase(); + } + + @AfterAll + public static void closeContainer() { + if (container != null) { + container.close(); + } + } + @BeforeEach public void setup() { resourceToBeCleanedUp = new ArrayList<>(); @@ -84,35 +118,47 @@ public void cleanup() throws IOException { @SuppressWarnings("UnstableApiUsage") @Test - @Disabled - // TODO(#5857): Make migration tests compatible with writing new migrations. public void testRunMigration() throws Exception { - try (final StubAirbyteDB stubAirbyteDB = new StubAirbyteDB()) { - final File file = Path - .of(Resources.getResource("migration/03a4c904-c91d-447f-ab59-27a43b52c2fd.gz").toURI()) - .toFile(); + final Path configRoot = Files.createTempDirectory(Path.of("/tmp"), "dummy_data"); + final ConfigPersistence configPersistence = getConfigPersistence(configRoot); - final Path dummyDataSource = Path.of(Resources.getResource("migration/dummy_data").toURI()); + final File file = Path + .of(Resources.getResource("migration/03a4c904-c91d-447f-ab59-27a43b52c2fd.gz").toURI()) + .toFile(); + final JobPersistence jobPersistence = getJobPersistence(file, INITIAL_VERSION); + assertPreMigrationConfigs(configPersistence, jobPersistence); - final Path configRoot = Files.createTempDirectory(Path.of("/tmp"), "dummy_data"); - FileUtils.copyDirectory(dummyDataSource.toFile(), configRoot.toFile()); - resourceToBeCleanedUp.add(configRoot.toFile()); - final JobPersistence jobPersistence = getJobPersistence(stubAirbyteDB.getDatabase(), file, INITIAL_VERSION); - assertPreMigrationConfigs(configRoot, jobPersistence); + runMigration(configPersistence, jobPersistence); - runMigration(jobPersistence, configRoot); + assertDatabaseVersion(jobPersistence, TARGET_VERSION); + assertPostMigrationConfigs(configPersistence); + FileUtils.deleteDirectory(configRoot.toFile()); + } - assertDatabaseVersion(jobPersistence, TARGET_VERSION); - assertPostMigrationConfigs(configRoot); - FileUtils.deleteDirectory(configRoot.toFile()); - } + @SuppressWarnings("UnstableApiUsage") + private ConfigPersistence getConfigPersistence(final Path configRoot) throws Exception { + final Path dummyDataSource = Path.of(Resources.getResource("migration/dummy_data").toURI()); + + FileUtils.copyDirectory(dummyDataSource.toFile(), configRoot.toFile()); + resourceToBeCleanedUp.add(configRoot.toFile()); + final Configs configs = mock(Configs.class); + when(configs.getConfigRoot()).thenReturn(configRoot); + // The database provider creates a config database that is ready to use, which contains a mock + // config entry. However, the migrateFileConfigs method will only copy the file configs if the + // database is empty. So we need to purge the database first. + configDatabase.transaction(ctx -> ctx.execute("TRUNCATE TABLE airbyte_configs;")); + return new DatabaseConfigPersistence(configDatabase) + .migrateFileConfigs(configs); } - private void assertPreMigrationConfigs(final Path configRoot, final JobPersistence jobPersistence) throws Exception { + private void assertPreMigrationConfigs(final ConfigPersistence configPersistence, final JobPersistence jobPersistence) throws Exception { assertDatabaseVersion(jobPersistence, INITIAL_VERSION); - final ConfigRepository configRepository = - new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence), - Optional.of(secretPersistence)); + final ConfigRepository configRepository = new ConfigRepository( + configPersistence, + new NoOpSecretsHydrator(), + Optional.of(secretPersistence), + Optional.of(secretPersistence)); + configRepository.setSpecFetcher(s -> MOCK_CONNECTOR_SPEC); final Map sourceDefinitionsBeforeMigration = configRepository.listStandardSourceDefinitions().stream() .collect(Collectors.toMap(c -> c.getSourceDefinitionId().toString(), c -> c)); assertTrue(sourceDefinitionsBeforeMigration.containsKey(DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED)); @@ -125,10 +171,13 @@ private void assertDatabaseVersion(final JobPersistence jobPersistence, final St assertEquals(versionFromDb.get(), version); } - private void assertPostMigrationConfigs(final Path importRoot) throws Exception { - final ConfigRepository configRepository = - new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence), - Optional.of(secretPersistence)); + private void assertPostMigrationConfigs(final ConfigPersistence configPersistence) throws Exception { + final ConfigRepository configRepository = new ConfigRepository( + configPersistence, + new NoOpSecretsHydrator(), + Optional.of(secretPersistence), + Optional.of(secretPersistence)); + configRepository.setSpecFetcher(s -> MOCK_CONNECTOR_SPEC); final UUID workspaceId = configRepository.listStandardWorkspaces(true).get(0).getWorkspaceId(); // originally the default workspace started with a hardcoded id. the migration in version 0.29.0 // took that id and randomized it. we want to check that the id is now NOT that hardcoded id and @@ -147,9 +196,9 @@ private void assertSourceDefinitions(final ConfigRepository configRepository) th final Map sourceDefinitions = configRepository.listStandardSourceDefinitions() .stream() .collect(Collectors.toMap(c -> c.getSourceDefinitionId().toString(), c -> c)); - assertTrue(sourceDefinitions.size() >= 59); - // the definition is not present in latest seeds so it should be deleted - assertFalse(sourceDefinitions.containsKey(DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED)); + assertTrue(sourceDefinitions.size() >= 98); + // the definition is not present in latest seeds but we no longer purge unused deprecated definition + assertTrue(sourceDefinitions.containsKey(DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED)); // the definition is not present in latest seeds but it was being used as a connection so it should // not be deleted assertTrue(sourceDefinitions.containsKey(DEPRECATED_SOURCE_DEFINITION_BEING_USED)); @@ -294,23 +343,26 @@ private void assertDestinations(final ConfigRepository configRepository, final U } } - private void runMigration(final JobPersistence jobPersistence, final Path configRoot) throws Exception { + private void runMigration(final ConfigPersistence configPersistence, final JobPersistence jobPersistence) throws Exception { + final ConfigRepository configRepository = new ConfigRepository( + configPersistence, + new NoOpSecretsHydrator(), + Optional.of(secretPersistence), + Optional.of(secretPersistence)); + configRepository.setSpecFetcher(s -> MOCK_CONNECTOR_SPEC); try (final RunMigration runMigration = new RunMigration( jobPersistence, - new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence), - Optional.of(secretPersistence)), + configRepository, TARGET_VERSION, YamlSeedConfigPersistence.getDefault(), - mock(SpecFetcher.class) // this test was disabled/broken when this fetcher mock was added. apologies if you have to fix this - // in the future. - )) { + mock(SpecFetcher.class))) { runMigration.run(); } } @SuppressWarnings("SameParameterValue") - private JobPersistence getJobPersistence(final Database database, final File file, final String version) throws IOException { - final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(database); + private JobPersistence getJobPersistence(final File file, final String version) throws IOException { + final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "db_init"); resourceToBeCleanedUp.add(tempFolder.toFile()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/StubAirbyteDB.java b/airbyte-server/src/test/java/io/airbyte/server/migration/StubAirbyteDB.java deleted file mode 100644 index c35427637efc..000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/StubAirbyteDB.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.migration; - -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.db.Database; -import io.airbyte.db.instance.jobs.JobsDatabaseInstance; -import java.io.IOException; -import org.testcontainers.containers.PostgreSQLContainer; - -public class StubAirbyteDB implements AutoCloseable { - - private final PostgreSQLContainer container; - private final Database database; - - public Database getDatabase() { - return database; - } - - public StubAirbyteDB() throws IOException { - container = - new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("airbyte") - .withUsername("docker") - .withPassword("docker"); - container.start(); - - final String jobsDatabaseSchema = MoreResources.readResource("migration/schema.sql"); - database = new JobsDatabaseInstance( - container.getUsername(), - container.getPassword(), - container.getJdbcUrl(), - jobsDatabaseSchema) - .getAndInitialize(); - } - - @Override - public void close() throws Exception { - database.close(); - container.close(); - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c7cf3c100ec5..8ad5318df6e8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -9,8 +9,13 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.MaxWorkersConfig; import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.workers.process.DockerProcessFactory; import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; @@ -34,6 +39,7 @@ import java.net.InetAddress; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +56,7 @@ public class WorkerApp { private final WorkflowServiceStubs temporalService; private final MaxWorkersConfig maxWorkers; private final WorkerEnvironment workerEnvironment; - private final String airbyteVersion; + private final ConfigRepository configRepository; public WorkerApp(final Path workspaceRoot, final ProcessFactory processFactory, @@ -58,14 +64,14 @@ public WorkerApp(final Path workspaceRoot, final WorkflowServiceStubs temporalService, final MaxWorkersConfig maxWorkers, final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { + final ConfigRepository configRepository) { this.workspaceRoot = workspaceRoot; this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.temporalService = temporalService; this.maxWorkers = maxWorkers; this.workerEnvironment = workerEnvironment; - this.airbyteVersion = airbyteVersion; + this.configRepository = configRepository; } public void start() { @@ -101,8 +107,9 @@ public void start() { syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); syncWorker.registerActivitiesImplementations( new SyncWorkflow.ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot), - new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, airbyteVersion), - new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot, airbyteVersion)); + new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment), + new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot), + new SyncWorkflow.PersistStateActivityImpl(workspaceRoot, configRepository)); factory.start(); } @@ -146,6 +153,16 @@ public static void main(final String[] args) throws IOException, InterruptedExce final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + final Database configDatabase = new ConfigsDatabaseInstance( + configs.getConfigDatabaseUser(), + configs.getConfigDatabasePassword(), + configs.getConfigDatabaseUrl()) + .getInitialized(); + final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation(); + final Optional secretPersistence = SecretPersistence.getLongLived(configs); + final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); + final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence); + new WorkerApp( workspaceRoot, processFactory, @@ -153,7 +170,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce temporalService, configs.getMaxWorkers(), configs.getWorkerEnvironment(), - configs.getAirbyteVersion()).start(); + configRepository).start(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java index 496c6d83c758..bd298ecf7411 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java @@ -20,6 +20,8 @@ import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.State; +import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; @@ -43,11 +45,14 @@ import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.io.IOException; import java.nio.file.Path; import java.time.Duration; +import java.util.UUID; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +64,8 @@ public interface SyncWorkflow { StandardSyncOutput run(JobRunConfig jobRunConfig, IntegrationLauncherConfig sourceLauncherConfig, IntegrationLauncherConfig destinationLauncherConfig, - StandardSyncInput syncInput); + StandardSyncInput syncInput, + UUID connectionId); class WorkflowImpl implements SyncWorkflow { @@ -73,16 +79,28 @@ class WorkflowImpl implements SyncWorkflow { .setRetryOptions(TemporalUtils.NO_RETRY) .build(); + private static final ActivityOptions persistOptions = options.toBuilder() + .setRetryOptions(RetryOptions.newBuilder() + .setMaximumAttempts(10) + .build()) + .build(); + private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, options); private final NormalizationActivity normalizationActivity = Workflow.newActivityStub(NormalizationActivity.class, options); private final DbtTransformationActivity dbtTransformationActivity = Workflow.newActivityStub(DbtTransformationActivity.class, options); + private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, persistOptions); @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, - final StandardSyncInput syncInput) { + final StandardSyncInput syncInput, + final UUID connectionId) { final StandardSyncOutput run = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); + // the state is persisted immediately after the replication succeeded, because the + // state is a checkpoint of the raw data that has been copied to the destination; + // normalization & dbt does not depend on it + persistActivity.persist(connectionId, run); if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) { @@ -255,14 +273,12 @@ class NormalizationActivityImpl implements NormalizationActivity { private final Path workspaceRoot; private final AirbyteConfigValidator validator; private final WorkerEnvironment workerEnvironment; - private final String airbyteVersion; public NormalizationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, - final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { - this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment, airbyteVersion); + final WorkerEnvironment workerEnvironment) { + this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment); } @VisibleForTesting @@ -270,14 +286,12 @@ public NormalizationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, final AirbyteConfigValidator validator, - final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { + final WorkerEnvironment workerEnvironment) { this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = validator; this.workerEnvironment = workerEnvironment; - this.airbyteVersion = airbyteVersion; } @Override @@ -335,27 +349,23 @@ class DbtTransformationActivityImpl implements DbtTransformationActivity { private final SecretsHydrator secretsHydrator; private final Path workspaceRoot; private final AirbyteConfigValidator validator; - private final String airbyteVersion; public DbtTransformationActivityImpl( final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, - final Path workspaceRoot, - final String airbyteVersion) { - this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), airbyteVersion); + final Path workspaceRoot) { + this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator()); } @VisibleForTesting DbtTransformationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, - final AirbyteConfigValidator validator, - final String airbyteVersion) { + final AirbyteConfigValidator validator) { this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = validator; - this.airbyteVersion = airbyteVersion; } @Override @@ -397,4 +407,40 @@ private CheckedSupplier, Exception> getWorkerFact } + @ActivityInterface + interface PersistStateActivity { + + @ActivityMethod + boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput); + + } + + class PersistStateActivityImpl implements PersistStateActivity { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersistStateActivityImpl.class); + private final Path workspaceRoot; + private final ConfigRepository configRepository; + + public PersistStateActivityImpl(final Path workspaceRoot, final ConfigRepository configRepository) { + this.workspaceRoot = workspaceRoot; + this.configRepository = configRepository; + } + + @Override + public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput) { + final State state = syncOutput.getState(); + if (state != null) { + try { + configRepository.updateConnectionState(connectionId, state); + } catch (final IOException e) { + throw new RuntimeException(e); + } + return true; + } else { + return false; + } + } + + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 94510d187799..d3139315a766 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -78,7 +78,7 @@ public TemporalResponse submitDiscoverSchema(final UUID jobId, f () -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input)); } - public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config) { + public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) { final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig() @@ -107,7 +107,8 @@ public TemporalResponse submitSync(final long jobId, final i jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, - input)); + input, + connectionId)); } private T getWorkflowStub(final Class workflowClass, final TemporalJobType jobType) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java index 66a2b867d3bb..a8799ef430b5 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java @@ -26,6 +26,8 @@ import io.airbyte.workers.temporal.SyncWorkflow.DbtTransformationActivityImpl; import io.airbyte.workers.temporal.SyncWorkflow.NormalizationActivity; import io.airbyte.workers.temporal.SyncWorkflow.NormalizationActivityImpl; +import io.airbyte.workers.temporal.SyncWorkflow.PersistStateActivity; +import io.airbyte.workers.temporal.SyncWorkflow.PersistStateActivityImpl; import io.airbyte.workers.temporal.SyncWorkflow.ReplicationActivity; import io.airbyte.workers.temporal.SyncWorkflow.ReplicationActivityImpl; import io.temporal.api.common.v1.WorkflowExecution; @@ -52,6 +54,7 @@ class SyncWorkflowTest { private ReplicationActivityImpl replicationActivity; private NormalizationActivityImpl normalizationActivity; private DbtTransformationActivityImpl dbtTransformationActivity; + private PersistStateActivityImpl persistStateActivity; // AIRBYTE CONFIGURATION private static final long JOB_ID = 11L; @@ -70,6 +73,7 @@ class SyncWorkflowTest { .withAttemptId((long) ATTEMPT_ID) .withDockerImage(IMAGE_NAME2); + private StandardSync sync; private StandardSyncInput syncInput; private NormalizationInput normalizationInput; private OperatorDbtInput operatorDbtInput; @@ -85,6 +89,7 @@ public void setUp() { client = testEnv.getWorkflowClient(); final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(); + sync = syncPair.getKey(); syncInput = syncPair.getValue(); replicationSuccessOutput = new StandardSyncOutput().withOutputCatalog(syncInput.getCatalog()); @@ -99,15 +104,16 @@ public void setUp() { replicationActivity = mock(ReplicationActivityImpl.class); normalizationActivity = mock(NormalizationActivityImpl.class); dbtTransformationActivity = mock(DbtTransformationActivityImpl.class); + persistStateActivity = mock(PersistStateActivityImpl.class); } // bundle up all of the temporal worker setup / execution into one method. private StandardSyncOutput execute() { - worker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity); + worker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); testEnv.start(); final SyncWorkflow workflow = client.newWorkflowStub(SyncWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); - return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput); + return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput, sync.getConnectionId()); } @Test @@ -122,6 +128,7 @@ void testSuccess() { assertEquals(replicationSuccessOutput, actualOutput); verifyReplication(replicationActivity, syncInput); + verifyPersistState(persistStateActivity, sync, actualOutput); verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); } @@ -137,7 +144,9 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); verifyReplication(replicationActivity, syncInput); + verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); + verifyNoInteractions(dbtTransformationActivity); } @Test @@ -156,7 +165,9 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); verifyReplication(replicationActivity, syncInput); + verifyPersistState(persistStateActivity, sync, replicationSuccessOutput); verifyNormalize(normalizationActivity, normalizationInput); + verifyNoInteractions(dbtTransformationActivity); } @Test @@ -173,7 +184,9 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); verifyReplication(replicationActivity, syncInput); + verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); + verifyNoInteractions(dbtTransformationActivity); } @Test @@ -195,7 +208,9 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); verifyReplication(replicationActivity, syncInput); + verifyPersistState(persistStateActivity, sync, replicationSuccessOutput); verifyNormalize(normalizationActivity, normalizationInput); + verifyNoInteractions(dbtTransformationActivity); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -223,6 +238,14 @@ private static void verifyReplication(final ReplicationActivity replicationActiv syncInput); } + private static void verifyPersistState(final PersistStateActivity persistStateActivity, + final StandardSync sync, + final StandardSyncOutput syncOutput) { + verify(persistStateActivity).persist( + sync.getConnectionId(), + syncOutput); + } + private static void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { verify(normalizationActivity).normalize( JOB_RUN_CONFIG, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java index f2d0f3a70ac1..de7e5827a2e8 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java @@ -14,20 +14,15 @@ import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.config.Configs; -import io.airbyte.db.Database; -import io.airbyte.db.instance.DatabaseMigrator; -import io.airbyte.db.instance.jobs.JobsDatabaseInstance; -import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.Worker; import io.temporal.internal.common.CheckedExceptionWrapper; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.sql.SQLException; import java.util.function.Consumer; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,9 +37,8 @@ class TemporalAttemptExecutionTest { private static final String SOURCE_USERNAME = "sourceusername"; private static final String SOURCE_PASSWORD = "hunter2"; - private static PostgreSQLContainer container; + private static PostgreSQLContainer container; private static Configs configs; - private static Database database; private Path jobRoot; @@ -54,8 +48,8 @@ class TemporalAttemptExecutionTest { private TemporalAttemptExecution attemptExecution; @BeforeAll - static void setUpAll() throws IOException { - container = new PostgreSQLContainer("postgres:13-alpine") + static void setUpAll() { + container = new PostgreSQLContainer<>("postgres:13-alpine") .withUsername(SOURCE_USERNAME) .withPassword(SOURCE_PASSWORD); container.start(); @@ -63,23 +57,14 @@ static void setUpAll() throws IOException { when(configs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); when(configs.getDatabaseUser()).thenReturn(SOURCE_USERNAME); when(configs.getDatabasePassword()).thenReturn(SOURCE_PASSWORD); - - // create the initial schema - database = new JobsDatabaseInstance( - configs.getDatabaseUser(), - configs.getDatabasePassword(), - configs.getDatabaseUrl()) - .getAndInitialize(); - - // make sure schema is up-to-date - final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(database, "test"); - jobDbMigrator.createBaseline(); - jobDbMigrator.migrate(); } @SuppressWarnings("unchecked") @BeforeEach void setup() throws IOException { + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); + databaseProviders.createNewJobsDatabase(); + final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_attempt_execution_test"); jobRoot = workspaceRoot.resolve(JOB_ID).resolve(String.valueOf(ATTEMPT_ID)); @@ -96,13 +81,6 @@ void setup() throws IOException { configs); } - @AfterEach - void tearDown() throws SQLException { - database.query(ctx -> ctx.execute("TRUNCATE TABLE jobs")); - database.query(ctx -> ctx.execute("TRUNCATE TABLE attempts")); - database.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_metadata")); - } - @AfterAll static void tearDownAll() { container.close(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 213649b422d4..7b91e484ecca 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -38,6 +38,7 @@ class TemporalClientTest { + private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final UUID JOB_UUID = UUID.randomUUID(); private static final long JOB_ID = 11L; private static final int ATTEMPT_ID = 21; @@ -176,8 +177,8 @@ void testSubmitSync() { .withAttemptId((long) ATTEMPT_ID) .withDockerImage(IMAGE_NAME2); - temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig); - discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input); + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); + discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID); verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalUtils.getWorkflowOptions(TemporalJobType.SYNC)); } diff --git a/docker-compose.yaml b/docker-compose.yaml index 60e67d5fe392..0b6c19d75962 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -26,6 +26,12 @@ services: environment: - POSTGRES_USER=${DATABASE_USER} - POSTGRES_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_USER=${DATABASE_USER} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_URL=${DATABASE_URL} + - CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-} + - CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-} + - CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-} volumes: - db:/var/lib/postgresql/data scheduler: diff --git a/docs/contributing-to-airbyte/developing-locally.md b/docs/contributing-to-airbyte/developing-locally.md index aaccabd08b24..bfaa11d6283c 100644 --- a/docs/contributing-to-airbyte/developing-locally.md +++ b/docs/contributing-to-airbyte/developing-locally.md @@ -89,7 +89,7 @@ In `dev` mode, all data will be persisted in `/tmp/dev_root`. To run acceptance \(end-to-end\) tests, you must have the Airbyte running locally. ```bash -SUB_BUILD=PLATFORM ./gradlew build +SUB_BUILD=PLATFORM ./gradlew clean build VERSION=dev docker-compose up SUB_BUILD=PLATFORM ./gradlew :airbyte-tests:acceptanceTests ``` @@ -164,7 +164,7 @@ Sometimes you'll want to reset the data in your local environment. One common ca * Rebuild the project ```bash - SUB_BUILD=PLATFORM ./gradlew build + SUB_BUILD=PLATFORM ./gradlew clean build VERSION=dev docker-compose up -V ```