diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 5b11fd48112a..e10d8db3612b 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -30,6 +30,7 @@ import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate; import io.airbyte.api.client.model.generated.DestinationIdRequestBody; import io.airbyte.api.client.model.generated.DestinationRead; +import io.airbyte.api.client.model.generated.DestinationSyncMode; import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.api.client.model.generated.JobListRequestBody; @@ -50,6 +51,9 @@ import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreProperties; @@ -717,7 +721,7 @@ private void deleteOperation(final UUID destinationId) throws ApiException { apiClient.getOperationApi().deleteOperation(new OperationIdRequestBody().operationId(destinationId)); } - public JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception { + public JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { return apiClient.getJobsApi() .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) .getJobs() @@ -799,4 +803,50 @@ public enum Type { DESTINATION } + public void assertDestinationDbEmpty(final boolean withScdTable) throws Exception { + final Database source = getSourceDatabase(); + final Set sourceTables = listAllTables(source); + final Set sourceTablesWithRawTablesAdded = addAirbyteGeneratedTables(withScdTable, sourceTables); + final Database destination = getDestinationDatabase(); + final Set destinationTables = listAllTables(destination); + assertEquals(sourceTablesWithRawTablesAdded, destinationTables, + String.format("streams did not match.\n source stream names: %s\n destination stream names: %s\n", sourceTables, destinationTables)); + + for (final SchemaTableNamePair pair : sourceTables) { + final List sourceRecords = retrieveRawDestinationRecords(pair); + assertTrue(sourceRecords.isEmpty()); + } + } + + public void setIncrementalAppendSyncMode(final AirbyteCatalog airbyteCatalog, final List cursorField) { + airbyteCatalog.getStreams().forEach(stream -> { + stream.getConfig().syncMode(SyncMode.INCREMENTAL) + .destinationSyncMode(DestinationSyncMode.APPEND) + .cursorField(cursorField); + }); + } + + public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID)); + + return new WebBackendConnectionUpdate() + .connectionId(connection.getConnectionId()) + .name(connection.getName()) + .operationIds(connection.getOperationIds()) + .operations(List.of(new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .syncCatalog(catalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .prefix(connection.getPrefix()) + .withRefreshedCatalog(true) + .skipReset(false); + } + } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 1a24b63d72a0..6883bec7f507 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -15,7 +15,11 @@ import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobHasStatus; import static java.lang.Thread.sleep; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -26,7 +30,45 @@ import io.airbyte.api.client.generated.WebBackendApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.api.client.invoker.generated.ApiException; -import io.airbyte.api.client.model.generated.*; +import io.airbyte.api.client.model.generated.AirbyteCatalog; +import io.airbyte.api.client.model.generated.AirbyteStream; +import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; +import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration; +import io.airbyte.api.client.model.generated.AttemptInfoRead; +import io.airbyte.api.client.model.generated.AttemptStatus; +import io.airbyte.api.client.model.generated.CheckConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.DataType; +import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; +import io.airbyte.api.client.model.generated.DestinationDefinitionIdWithWorkspaceId; +import io.airbyte.api.client.model.generated.DestinationDefinitionRead; +import io.airbyte.api.client.model.generated.DestinationDefinitionSpecificationRead; +import io.airbyte.api.client.model.generated.DestinationIdRequestBody; +import io.airbyte.api.client.model.generated.DestinationRead; +import io.airbyte.api.client.model.generated.DestinationSyncMode; +import io.airbyte.api.client.model.generated.JobConfigType; +import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.api.client.model.generated.JobInfoRead; +import io.airbyte.api.client.model.generated.JobListRequestBody; +import io.airbyte.api.client.model.generated.JobRead; +import io.airbyte.api.client.model.generated.JobStatus; +import io.airbyte.api.client.model.generated.JobWithAttemptsRead; +import io.airbyte.api.client.model.generated.OperationRead; +import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; +import io.airbyte.api.client.model.generated.SourceDefinitionIdWithWorkspaceId; +import io.airbyte.api.client.model.generated.SourceDefinitionRead; +import io.airbyte.api.client.model.generated.SourceDefinitionSpecificationRead; +import io.airbyte.api.client.model.generated.SourceIdRequestBody; +import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.StreamDescriptor; +import io.airbyte.api.client.model.generated.StreamState; +import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -37,11 +79,26 @@ import java.net.URISyntaxException; import java.sql.SQLException; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import org.jooq.DSLContext; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1105,6 +1162,179 @@ public void testIncrementalSyncMultipleStreams() throws Exception { } + @Test + public void testMultipleSchemasAndTablesSyncAndReset() throws Exception { + // create tables in another schema + PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_second_schema_multiple_tables.sql"), sourcePsql); + + final String connectionName = "test-connection"; + final UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + + final SyncMode syncMode = SyncMode.FULL_REFRESH; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; + catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); + testHarness.assertSourceAndDestinationDbInSync(false); + final JobInfoRead connectionResetRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionResetRead.getJob()); + testHarness.assertDestinationDbEmpty(false); + } + + @Test + public void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throws Exception { + LOGGER.info("Running: " + testInfo.getDisplayName()); + + // Add Table + final String additionalTable = "additional_table"; + final Database sourceDb = testHarness.getSourceDatabase(); + sourceDb.query(ctx -> { + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(1, "1").execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(2, "2").execute(); + return null; + }); + UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final OperationRead operation = testHarness.createOperation(); + final UUID operationId = operation.getOperationId(); + final String name = "test_reset_when_schema_is_modified_" + UUID.randomUUID(); + + testHarness.setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID)); + + final ConnectionRead connection = + testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, null); + + // Run initial sync + final JobInfoRead syncRead = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob()); + + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + LOGGER.info("Initial sync ran, now running an update with a stream being removed."); + + /** + * Remove stream + */ + sourceDb.query(ctx -> ctx.dropTableIfExists(additionalTable).execute()); + + // Update with refreshed catalog + AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); + WebBackendConnectionUpdate update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnectionNew(update); + + // Wait until the sync from the UpdateConnection is finished + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"))); + + LOGGER.info("Remove done, now running an update with a stream being added."); + + /** + * Add a stream -- the value of in the table are different than the initial import to ensure that it + * is properly reset. + */ + sourceDb.query(ctx -> { + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(3, "3").execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(4, "4").execute(); + return null; + }); + + sourceId = testHarness.createPostgresSource().getSourceId(); + refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnectionNew(update); + + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + LOGGER.info("Addition done, now running an update with a stream being updated."); + + // Update + sourceDb.query(ctx -> { + ctx.dropTableIfExists(additionalTable).execute(); + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR), DSL.field("another_field", SQLDataType.VARCHAR)) + .execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(3, "3", "three") + .execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(4, "4", "four") + .execute(); + return null; + }); + + sourceId = testHarness.createPostgresSource().getSourceId(); + refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnectionNew(update); + + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + } + + private void assertStreamStateContainsStream(final UUID connectionId, final List expectedStreamDescriptors) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + final List streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList(); + + Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors)); + } + + private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { + return apiClient.getJobsApi() + .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) + .getJobs() + .stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow(); + } + + private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { + final JobRead lastJob = getMostRecentSyncJobId(connectionId); + if (lastJob.getStatus() != JobStatus.SUCCEEDED) { + return lastJob; + } + + JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + while (mostRecentSyncJob.getId().equals(lastJob.getId())) { + Thread.sleep(Duration.ofSeconds(10).toMillis()); + mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + } + return mostRecentSyncJob; + } + // This test is disabled because it takes a couple of minutes to run, as it is testing timeouts. // It should be re-enabled when the @SlowIntegrationTest can be applied to it. // See relevant issue: https://github.com/airbytehq/airbyte/issues/8397 diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 6087b74c3d7a..820dfabc6bbf 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -38,7 +38,6 @@ import io.airbyte.api.client.model.generated.StreamState; import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; -import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -341,7 +340,7 @@ public void testPartialResetFromSchemaUpdate(final TestInfo testInfo) throws Exc final UUID sourceId = createCdcSource().getSourceId(); final AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Refreshed catalog: {}", refreshedCatalog); - final WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); + final WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, refreshedCatalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); @@ -388,7 +387,7 @@ public void testPartialResetFromStreamSelection(final TestInfo testInfo) throws .toList(); catalog.setStreams(updatedStreams); LOGGER.info("Updated catalog: {}", catalog); - WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); + WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); @@ -403,7 +402,7 @@ public void testPartialResetFromStreamSelection(final TestInfo testInfo) throws LOGGER.info("Adding color palette stream back to configured catalog"); catalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Updated catalog: {}", catalog); - update = getUpdateInput(connectionRead, catalog, operationRead); + update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); @@ -568,32 +567,6 @@ private void assertNoState(final UUID connectionId) throws ApiException { assertNull(state.getGlobalState()); } - private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; - catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) - .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); - - return new WebBackendConnectionUpdate() - .connectionId(connection.getConnectionId()) - .name(connection.getName()) - .operationIds(connection.getOperationIds()) - .operations(List.of(new WebBackendOperationCreateOrUpdate() - .name(operation.getName()) - .operationId(operation.getOperationId()) - .workspaceId(operation.getWorkspaceId()) - .operatorConfiguration(operation.getOperatorConfiguration()))) - .namespaceDefinition(connection.getNamespaceDefinition()) - .namespaceFormat(connection.getNamespaceFormat()) - .syncCatalog(catalog) - .schedule(connection.getSchedule()) - .sourceCatalogId(connection.getSourceCatalogId()) - .status(connection.getStatus()) - .prefix(connection.getPrefix()); - } - // can be helpful for debugging @SuppressWarnings("PMD") private void printDbs() throws SQLException {