From fd1d8749dadc1a8885de978c0476605109377d48 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 15:08:16 -0700 Subject: [PATCH 01/23] set per stream feature flag to true for testing --- .../io/airbyte/commons/features/EnvVariableFeatureFlags.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 9991fd35c503..dec17b6efcca 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -31,7 +31,7 @@ public boolean forceSecretMigration() { @Override public boolean useStreamCapableState() { - return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean); + return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, true, Boolean::parseBoolean); } // TODO: refactor in order to use the same method than the ones in EnvConfigs.java From c6431de7718da1b2ffd155b1de491d12db36f460 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 17:13:49 -0700 Subject: [PATCH 02/23] add a second table to cdc acceptance tests --- .../test/acceptance/CdcAcceptanceTests.java | 73 ++++++++++++------- .../resources/postgres_init_cdc.sql | 30 +++++++- 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index f21f5693d754..17fdd5cf4c4f 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -69,6 +69,9 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, private static final String SCHEMA_NAME = "public"; private static final String CDC_UPDATED_AT_COLUMN = "_ab_cdc_updated_at"; private static final String CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at"; + private static final String ID_AND_NAME_TABLE = "id_and_name"; + private static final String COLOR_PALETTE_TABLE = "color_palette"; + private static final String COLUMN_COLOR = "color"; // version of the postgres destination connector that was built with the // old Airbyte protocol that does not contain any per-stream logic/fields @@ -126,12 +129,12 @@ public void testIncrementalCdcSync() throws Exception { LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); final Database source = testHarness.getSourceDatabase(); - List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - List expectedDestRecordMatchers = new ArrayList<>(sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList()); - assertDestinationMatches(expectedDestRecordMatchers); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); final Instant beforeFirstUpdate = Instant.now(); @@ -143,24 +146,35 @@ public void testIncrementalCdcSync() throws Exception { // since this is a CDC connection, the destination should contain a record with this // new value and an updated_at time corresponding to this update query source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); - - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), beforeFirstUpdate, Optional.empty())); - - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_NAME, "yennefer").build()), beforeFirstUpdate, Optional.empty())); + // do the same for the other table + source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); + source.query(ctx -> ctx.execute("UPDATE color_palette SET color='purple' WHERE id=2")); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), + beforeFirstUpdate, + Optional.empty())); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_COLOR, "purple").build()), + beforeFirstUpdate, + Optional.empty())); + LOGGER.info("Starting incremental cdc sync 2"); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(expectedDestRecordMatchers); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); // reset back to no data. @@ -170,7 +184,8 @@ public void testIncrementalCdcSync() throws Exception { LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(Collections.emptyList()); + assertDestinationMatches(ID_AND_NAME_TABLE, Collections.emptyList()); + assertDestinationMatches(COLOR_PALETTE_TABLE, Collections.emptyList()); // sync one more time. verify it is the equivalent of a full refresh. LOGGER.info("Starting incremental cdc sync 3"); @@ -179,13 +194,11 @@ public void testIncrementalCdcSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - expectedDestRecordMatchers = sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList(); + expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); - assertDestinationMatches(expectedDestRecordMatchers); + expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); } // tests that incremental syncs still work properly even when using a destination connector that was @@ -224,12 +237,8 @@ public void testDeleteRecordCdcSync() throws Exception { LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); final Database source = testHarness.getSourceDatabase(); - List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - List expectedDestRecordMatchers = new ArrayList<>(sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList()); - assertDestinationMatches(expectedDestRecordMatchers); + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); final Instant beforeDelete = Instant.now(); @@ -240,7 +249,7 @@ public void testDeleteRecordCdcSync() throws Exception { Map deletedRecordMap = new HashMap<>(); deletedRecordMap.put(COLUMN_ID, 1); deletedRecordMap.put(COLUMN_NAME, null); - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(deletedRecordMap), beforeDelete, Optional.of(beforeDelete))); @@ -251,7 +260,15 @@ public void testDeleteRecordCdcSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(expectedDestRecordMatchers); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + } + + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { + List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); + return new ArrayList<>(sourceRecords + .stream() + .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) + .toList()); } private UUID createCdcConnection() throws ApiException { @@ -298,8 +315,8 @@ private SourceRead createCdcSource() throws ApiException { Jsons.jsonNode(sourceDbConfigMap)); } - private void assertDestinationMatches(List expectedDestRecordMatchers) throws Exception { - final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, STREAM_NAME)); + private void assertDestinationMatches(String streamName, List expectedDestRecordMatchers) throws Exception { + final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName)); if (destRecords.size() != expectedDestRecordMatchers.size()) { final String errorMessage = String.format( "The number of destination records %d does not match the expected number %d", diff --git a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql index 9434b4135eb4..a760c0cff425 100644 --- a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql +++ b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql @@ -32,15 +32,43 @@ INSERT 'john' ); +CREATE + TABLE + color_palette( + id INTEGER PRIMARY KEY, + color VARCHAR(200) + ); + +INSERT + INTO + color_palette( + id, + color + ) + VALUES( + 1, + 'red' + ), + ( + 2, + 'blue' + ), + ( + 3, + 'green' + ); + CREATE ROLE airbyte_role REPLICATION LOGIN; ALTER TABLE id_and_name REPLICA IDENTITY DEFAULT; +ALTER TABLE + color_palette REPLICA IDENTITY DEFAULT; CREATE PUBLICATION airbyte_publication FOR TABLE - id_and_name; + id_and_name, color_palette; SELECT pg_create_logical_replication_slot( From d3d615e2a7dcc44fe97a2ea5169ccc20d2a7f09b Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 18:42:08 -0700 Subject: [PATCH 03/23] add partial reset test --- .../test/acceptance/CdcAcceptanceTests.java | 146 +++++++++++++++++- .../resources/postgres_init_cdc.sql | 4 +- 2 files changed, 147 insertions(+), 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 17fdd5cf4c4f..7a57d7c3a1c7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -6,28 +6,42 @@ import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_ID; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_NAME; -import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.STREAM_NAME; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.generated.WebBackendApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStateType; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; import io.airbyte.api.client.model.generated.DestinationSyncMode; +import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobInfoRead; +import io.airbyte.api.client.model.generated.JobListRequestBody; +import io.airbyte.api.client.model.generated.JobRead; +import io.airbyte.api.client.model.generated.JobStatus; +import io.airbyte.api.client.model.generated.JobWithAttemptsRead; +import io.airbyte.api.client.model.generated.OperationRead; import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.SourceDefinitionRead; import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.StreamDescriptor; +import io.airbyte.api.client.model.generated.StreamState; import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -35,16 +49,21 @@ import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -78,7 +97,9 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, private static final String POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION = "0.3.19"; private static AirbyteApiClient apiClient; + private static WebBackendApi webBackendApi; private static UUID workspaceId; + private static OperationRead operationRead; private AirbyteAcceptanceTestHarness testHarness; @@ -89,6 +110,11 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc .setHost("localhost") .setPort(8001) .setBasePath("/api")); + webBackendApi = new WebBackendApi( + new ApiClient().setScheme("http") + .setHost("localhost") + .setPort(8001) + .setBasePath("/api")); // work in whatever default workspace is present. workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId(); LOGGER.info("workspaceId = " + workspaceId); @@ -136,6 +162,11 @@ public void testIncrementalCdcSync() throws Exception { List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + List expectedStreams = List.of( + new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE), + new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE)); + assertGlobalStateContainsStreams(connectionId, expectedStreams); + final Instant beforeFirstUpdate = Instant.now(); LOGGER.info("Inserting and updating source db records"); @@ -175,6 +206,7 @@ public void testIncrementalCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, expectedStreams); // reset back to no data. @@ -186,6 +218,7 @@ public void testIncrementalCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, Collections.emptyList()); assertDestinationMatches(COLOR_PALETTE_TABLE, Collections.emptyList()); + assertNoState(connectionId); // sync one more time. verify it is the equivalent of a full refresh. LOGGER.info("Starting incremental cdc sync 3"); @@ -199,6 +232,8 @@ public void testIncrementalCdcSync() throws Exception { expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + assertGlobalStateContainsStreams(connectionId, expectedStreams); } // tests that incremental syncs still work properly even when using a destination connector that was @@ -263,6 +298,49 @@ public void testDeleteRecordCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); } + @Test + public void testSchemaUpdateResultsInPartialReset() throws Exception { + LOGGER.info("Starting partial reset cdc test"); + + final UUID connectionId = createCdcConnection(); + LOGGER.info("Starting partial reset cdc sync 1"); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + + final Database source = testHarness.getSourceDatabase(); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + LOGGER.info("Removing color palette table"); + source.query(ctx -> ctx.dropTable(COLOR_PALETTE_TABLE).execute()); + + LOGGER.info("Refreshing schema and updating connection"); + final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + UUID sourceId = createCdcSource().getSourceId(); + AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + LOGGER.info("Refreshed catalog: {}", refreshedCatalog); + WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to complete"); + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // delete its data in the destination + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); + } + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); return new ArrayList<>(sourceRecords @@ -276,7 +354,8 @@ private UUID createCdcConnection() throws ApiException { final UUID sourceId = sourceRead.getSourceId(); final UUID destinationId = testHarness.createDestination().getDestinationId(); - final UUID operationId = testHarness.createOperation().getOperationId(); + operationRead = testHarness.createOperation(); + final UUID operationId = operationRead.getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); final AirbyteStream stream = catalog.getStreams().get(0).getStream(); LOGGER.info("stream: {}", stream); @@ -364,4 +443,67 @@ private void assertDestinationMatches(String streamName, List expectedStreams) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + LOGGER.info("state: {}", state); + assertEquals(ConnectionStateType.GLOBAL, state.getStateType()); + final List stateStreams = state.getGlobalState().getStreamStates().stream().map(StreamState::getStreamDescriptor).toList(); + + Assertions.assertTrue(stateStreams.containsAll(expectedStreams) && expectedStreams.containsAll(stateStreams), + String.format("Expected state to have streams %s, but it actually had streams %s", expectedStreams, stateStreams)); + } + + private void assertNoState(final UUID connectionId) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + assertEquals(ConnectionStateType.NOT_SET, state.getStateType()); + assertNull(state.getState()); + assertNull(state.getStreamState()); + assertNull(state.getGlobalState()); + } + + // TODO: consolidate the below methods with those added in + // https://github.com/airbytehq/airbyte/pull/14406 + + private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + return new WebBackendConnectionUpdate() + .connectionId(connection.getConnectionId()) + .name(connection.getName()) + .operationIds(connection.getOperationIds()) + .operations(List.of(new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .syncCatalog(catalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .prefix(connection.getPrefix()); + } + + private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { + return apiClient.getJobsApi() + .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) + .getJobs() + .stream() + .max(Comparator.comparingLong(job -> job.getJob().getCreatedAt())) + .map(JobWithAttemptsRead::getJob).orElseThrow(); + } + + private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { + final JobRead lastJob = getMostRecentSyncJobId(connectionId); + if (lastJob.getStatus() != JobStatus.SUCCEEDED) { + return lastJob; + } + + JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + while (mostRecentSyncJob.getId().equals(lastJob.getId())) { + Thread.sleep(Duration.ofSeconds(2).toMillis()); + mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + } + return mostRecentSyncJob; + } + } diff --git a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql index a760c0cff425..ce7c2da4a538 100644 --- a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql +++ b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql @@ -63,12 +63,14 @@ CREATE ALTER TABLE id_and_name REPLICA IDENTITY DEFAULT; + ALTER TABLE color_palette REPLICA IDENTITY DEFAULT; CREATE PUBLICATION airbyte_publication FOR TABLE - id_and_name, color_palette; + id_and_name, + color_palette; SELECT pg_create_logical_replication_slot( From 832d9446c0b748543c6c9b70a56470315aa47c7d Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 18:42:26 -0700 Subject: [PATCH 04/23] format --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 7a57d7c3a1c7..d4b51308b106 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -60,8 +60,6 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import org.jooq.impl.DSL; -import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; From 36c0e2842a710a7a1988e1ac990d263758e4f9ca Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 11 Jul 2022 14:51:27 -0700 Subject: [PATCH 05/23] add partial reset cdc tests --- .../test/acceptance/CdcAcceptanceTests.java | 110 +++++++++++++++++- 1 file changed, 108 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index d4b51308b106..cfea5f719833 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -20,6 +20,7 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; +import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.api.client.model.generated.ConnectionState; @@ -59,7 +60,10 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import org.jooq.Record; +import org.jooq.Result; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -297,11 +301,11 @@ public void testDeleteRecordCdcSync() throws Exception { } @Test - public void testSchemaUpdateResultsInPartialReset() throws Exception { + public void testPartialResetFromSchemaUpdate() throws Exception { LOGGER.info("Starting partial reset cdc test"); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting partial reset cdc sync 1"); + LOGGER.info("Starting sync 1"); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -339,6 +343,82 @@ public void testSchemaUpdateResultsInPartialReset() throws Exception { assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); } + @Test + public void testPartialResetFromStreamSelection() throws Exception { + LOGGER.info("Starting partial reset cdc test"); + + // TODO: remove this logic to set source to dev once postgres source has been released with the CDC + // changes + LOGGER.info("Setting source connector to dev to test out partial CDC resets..."); + final UUID sourceDefinitionId = testHarness.getPostgresSourceDefinitionId(); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, "dev"); + + final UUID connectionId = createCdcConnection(); + LOGGER.info("Starting sync 1"); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + + final Database source = testHarness.getSourceDatabase(); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + LOGGER.info("Removing color palette stream from configured catalog"); + final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID sourceId = connectionRead.getSourceId(); + AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + final List streams = catalog.getStreams(); + // filter out color_palette stream + final List updatedStreams = streams + .stream() + .filter(stream -> !stream.getStream().getName().equals(COLOR_PALETTE_TABLE)) + .toList(); + catalog.setStreams(updatedStreams); + LOGGER.info("Updated catalog: {}", catalog); + WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to start"); + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + LOGGER.info("Waiting for sync job after update to complete"); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // delete its data in the destination + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); + + LOGGER.info("Adding color palette stream back to configured catalog"); + catalog = testHarness.discoverSourceSchema(sourceId); + LOGGER.info("Updated catalog: {}", catalog); + update = getUpdateInput(connectionRead, catalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to start"); + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + LOGGER.info("Checking that id_and_name table is unaffected by the partial reset"); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + LOGGER.info("Checking that color_palette table was cleared in the destination due to the reset triggered by the update"); + assertDestinationMatches(COLOR_PALETTE_TABLE, List.of()); + LOGGER.info("Waiting for sync job after update to complete"); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // Verify that color palette table records exist in destination again after sync. + // If we see 0 records for this table in the destination, that means the CDC partial reset logic is + // not working properly, and it continued from the replication log cursor for this stream despite + // this stream's state being reset + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + } + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); return new ArrayList<>(sourceRecords @@ -463,6 +543,13 @@ private void assertNoState(final UUID connectionId) throws ApiException { // https://github.com/airbytehq/airbyte/pull/14406 private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode)); + return new WebBackendConnectionUpdate() .connectionId(connection.getConnectionId()) .name(connection.getName()) @@ -504,4 +591,23 @@ private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exc return mostRecentSyncJob; } + // can be helpful for debugging + private void printDbs() throws SQLException { + final Database sourceDb = testHarness.getSourceDatabase(); + Set pairs = testHarness.listAllTables(sourceDb); + LOGGER.info("Printing source tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + + final Database destDb = testHarness.getDestinationDatabase(); + pairs = testHarness.listAllTables(destDb); + LOGGER.info("Printing destination tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + } + } From 676bfe891f4a28325f31087383d8c15ebf1909dd Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 11 Jul 2022 16:20:21 -0700 Subject: [PATCH 06/23] test incremental after partial reset --- .../test/acceptance/CdcAcceptanceTests.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index cfea5f719833..04de6248b8b5 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -417,6 +417,30 @@ public void testPartialResetFromStreamSelection() throws Exception { // this stream's state being reset assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + // Verify that incremental still works properly after partial reset + LOGGER.info("Adding new records to tables"); + final Instant beforeInsert = Instant.now(); + source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), + beforeInsert, + Optional.empty())); + + source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), + beforeInsert, + Optional.empty())); + + LOGGER.info("Starting sync after insert"); + final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); } private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { From 904a7ba0559a7a2a5c6e4bf8e4da221cbc5aad9b Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 12 Jul 2022 11:00:22 -0700 Subject: [PATCH 07/23] remove dev image from acceptance test --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 04de6248b8b5..9fd67568d0f2 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -347,12 +347,6 @@ public void testPartialResetFromSchemaUpdate() throws Exception { public void testPartialResetFromStreamSelection() throws Exception { LOGGER.info("Starting partial reset cdc test"); - // TODO: remove this logic to set source to dev once postgres source has been released with the CDC - // changes - LOGGER.info("Setting source connector to dev to test out partial CDC resets..."); - final UUID sourceDefinitionId = testHarness.getPostgresSourceDefinitionId(); - testHarness.updateSourceDefinitionVersion(sourceDefinitionId, "dev"); - final UUID connectionId = createCdcConnection(); LOGGER.info("Starting sync 1"); From 42007eb548e7e4b28821a4aceef7ef755a79e8ea Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 12 Jul 2022 13:56:05 -0700 Subject: [PATCH 08/23] fix flag and add comment --- .../test/acceptance/CdcAcceptanceTests.java | 15 +++++++++++++++ .../process/AirbyteIntegrationLauncherTest.java | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 9fd67568d0f2..de3f44166032 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -69,9 +69,24 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * These tests test the CDC source behavior in Airbyte, ensuring that the behavior of syncs when in + * CDC mode is as expected + *

+ * Some of the tests in this class are specifically testing partial reset behavior when in CDC mode, + * support for which was recently added to the postgres connector. + *

+ * These tests are disabled in Kube, similar to the BasicAcceptanceTests, because they aren't + * testing any behavior that is specific to or dependent on this being run on kube vs docker. + * Therefore, since operations tend to take longer to perform on kube, there is little value in + * re-running these tests on kube when we already run them on docker. + */ +@DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public class CdcAcceptanceTests { record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, Optional minDeletedAt) {} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index fceedd214535..4b5b7b4a59d6 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -51,7 +51,7 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(false)); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState())); private WorkerConfigs workerConfigs; @Mock From 624b747723513e269a6a83279aa03e7142f0b523 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:42:11 -0700 Subject: [PATCH 09/23] Revert "set per stream feature flag to true for testing" This reverts commit 164d7da05990268b09e315eb88ff297d3a9f52f4. --- .../io/airbyte/commons/features/EnvVariableFeatureFlags.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index dec17b6efcca..9991fd35c503 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -31,7 +31,7 @@ public boolean forceSecretMigration() { @Override public boolean useStreamCapableState() { - return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, true, Boolean::parseBoolean); + return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean); } // TODO: refactor in order to use the same method than the ones in EnvConfigs.java From b04742b130d7c13782091d799f34f2012243cbb0 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:43:59 -0700 Subject: [PATCH 10/23] set USE_STREAM_CAPABLE_STATE flag to true in acceptance test script --- tools/bin/acceptance_test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/acceptance_test.sh b/tools/bin/acceptance_test.sh index 978e941861d6..41ccdd74aaa1 100755 --- a/tools/bin/acceptance_test.sh +++ b/tools/bin/acceptance_test.sh @@ -9,7 +9,7 @@ assert_root echo "Starting app..." # Detach so we can run subsequent commands -VERSION=dev TRACKING_STRATEGY=logging docker-compose up -d +VERSION=dev TRACKING_STRATEGY=logging USE_STREAM_CAPABLE_STATE=true docker-compose up -d # Sometimes source/dest containers using airbyte volumes survive shutdown, which need to be killed in order to shut down properly. shutdown_cmd="docker-compose down -v || docker kill \$(docker ps -a -f volume=airbyte_workspace -f volume=airbyte_data -f volume=airbyte_db -q) && docker-compose down -v" From 16340d4bd690fd85f858ff2da514ea7fc1040145 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:58:37 -0700 Subject: [PATCH 11/23] call new update endpoint --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index de3f44166032..bd526ab8042e 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -347,7 +347,7 @@ public void testPartialResetFromSchemaUpdate() throws Exception { AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Refreshed catalog: {}", refreshedCatalog); WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); @@ -394,7 +394,7 @@ public void testPartialResetFromStreamSelection() throws Exception { catalog.setStreams(updatedStreams); LOGGER.info("Updated catalog: {}", catalog); WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); @@ -409,7 +409,7 @@ public void testPartialResetFromStreamSelection() throws Exception { catalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Updated catalog: {}", catalog); update = getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); From e67ec0e7ab208b96233732299eb4df70192506dc Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 14:41:44 -0700 Subject: [PATCH 12/23] use methods in test harness instead --- .../test/acceptance/CdcAcceptanceTests.java | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index bd526ab8042e..7ee5723224ec 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -350,7 +350,7 @@ public void testPartialResetFromSchemaUpdate() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); - JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); // We do not check that the source and the dest are in sync here because removing a stream doesn't @@ -397,7 +397,7 @@ public void testPartialResetFromStreamSelection() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); - JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); LOGGER.info("Waiting for sync job after update to complete"); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); @@ -412,7 +412,7 @@ public void testPartialResetFromStreamSelection() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); - syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); LOGGER.info("Checking that id_and_name table is unaffected by the partial reset"); assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); LOGGER.info("Checking that color_palette table was cleared in the destination due to the reset triggered by the update"); @@ -601,29 +601,6 @@ private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connectio .prefix(connection.getPrefix()); } - private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { - return apiClient.getJobsApi() - .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) - .getJobs() - .stream() - .max(Comparator.comparingLong(job -> job.getJob().getCreatedAt())) - .map(JobWithAttemptsRead::getJob).orElseThrow(); - } - - private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { - final JobRead lastJob = getMostRecentSyncJobId(connectionId); - if (lastJob.getStatus() != JobStatus.SUCCEEDED) { - return lastJob; - } - - JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - while (mostRecentSyncJob.getId().equals(lastJob.getId())) { - Thread.sleep(Duration.ofSeconds(2).toMillis()); - mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - } - return mostRecentSyncJob; - } - // can be helpful for debugging private void printDbs() throws SQLException { final Database sourceDb = testHarness.getSourceDatabase(); From c71b0bae7588e997f702260a45c4de4490683d41 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 15:17:37 -0700 Subject: [PATCH 13/23] remove comment --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 7ee5723224ec..2616d9b02309 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -572,9 +572,6 @@ private void assertNoState(final UUID connectionId) throws ApiException { assertNull(state.getGlobalState()); } - // TODO: consolidate the below methods with those added in - // https://github.com/airbytehq/airbyte/pull/14406 - private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { final SyncMode syncMode = SyncMode.INCREMENTAL; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; From 28c67570f9a5989c255827efb1e7865f77d43309 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 15:17:56 -0700 Subject: [PATCH 14/23] add env var to worker container --- docker-compose.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index eeaa49fb2bb3..fd0bd479350d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -94,6 +94,7 @@ services: - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS} - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS} - WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS} + - USE_STREAM_CAPABLE_STATE=${USE_STREAM_CAPABLE_STATE} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} From ce315b5a405493448e30ccf6eae85d1873f9b9df Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 15:24:30 -0700 Subject: [PATCH 15/23] format --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 2616d9b02309..97f817c2b1b7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -28,12 +28,8 @@ import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; import io.airbyte.api.client.model.generated.DestinationSyncMode; -import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobInfoRead; -import io.airbyte.api.client.model.generated.JobListRequestBody; import io.airbyte.api.client.model.generated.JobRead; -import io.airbyte.api.client.model.generated.JobStatus; -import io.airbyte.api.client.model.generated.JobWithAttemptsRead; import io.airbyte.api.client.model.generated.OperationRead; import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.SourceDefinitionRead; @@ -50,11 +46,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; -import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; From f5cf0f74de31a66784f96430fdfdeacf8af52f75 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 18:21:37 -0700 Subject: [PATCH 16/23] fix state check in basic acceptance test --- .../io/airbyte/test/acceptance/BasicAcceptanceTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 313a1c15920e..cc27832dfa91 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -797,7 +797,7 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws // sync one more time. verify it is the equivalent of a full refresh. final String expectedState = - "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; + "{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}"; LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -806,7 +806,11 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws LOGGER.info("state after sync 3: {}", state); testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); - assertEquals(Jsons.deserialize(expectedState), state.getState()); + assertNotNull(state.getStreamState()); + assertEquals(1, state.getStreamState().size()); + final StreamState idAndNameState = state.getStreamState().get(0); + assertEquals(new StreamDescriptor().namespace("public").name(STREAM_NAME), idAndNameState.getStreamDescriptor()); + assertEquals(Jsons.deserialize(expectedState), idAndNameState.getStreamState()); } @Test From dbd497459400e41d9471f3617953cfff5f04c560 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 18:25:25 -0700 Subject: [PATCH 17/23] use test info for test name logging --- .../test/acceptance/CdcAcceptanceTests.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 97f817c2b1b7..0859826166be 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,11 +155,11 @@ public void end() { } @Test - public void testIncrementalCdcSync() throws Exception { - LOGGER.info("Starting incremental cdc sync test"); + public void testIncrementalCdcSync(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting incremental cdc sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -209,7 +210,7 @@ public void testIncrementalCdcSync() throws Exception { beforeFirstUpdate, Optional.empty())); - LOGGER.info("Starting incremental cdc sync 2"); + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); @@ -221,7 +222,7 @@ public void testIncrementalCdcSync() throws Exception { // reset back to no data. - LOGGER.info("Starting incremental cdc reset"); + LOGGER.info("Starting {} reset", testInfo.getDisplayName()); final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob()); @@ -232,7 +233,7 @@ public void testIncrementalCdcSync() throws Exception { assertNoState(connectionId); // sync one more time. verify it is the equivalent of a full refresh. - LOGGER.info("Starting incremental cdc sync 3"); + LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); @@ -250,8 +251,8 @@ public void testIncrementalCdcSync() throws Exception { // tests that incremental syncs still work properly even when using a destination connector that was // built on the old protocol that did not have any per-stream state fields @Test - public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Exception { - LOGGER.info("Starting testIncrementalCdcSyncWithLegacyDestinationConnector()"); + public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID postgresDestDefId = testHarness.getPostgresDestinationDefinitionId(); // Fetch the current/most recent source definition version final DestinationDefinitionRead destinationDefinitionRead = apiClient.getDestinationDefinitionApi().getDestinationDefinition( @@ -262,7 +263,7 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except LOGGER.info("Setting postgres destination definition to version {}", POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION); testHarness.updateDestinationDefinitionVersion(postgresDestDefId, POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION); - testIncrementalCdcSync(); + testIncrementalCdcSync(testInfo); } finally { // set postgres destination definition back to latest version for other tests LOGGER.info("Setting postgres destination definition back to version {}", destinationDefinitionRead.getDockerImageTag()); @@ -271,11 +272,11 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except } @Test - public void testDeleteRecordCdcSync() throws Exception { - LOGGER.info("Starting delete record cdc sync test"); + public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting delete record cdc sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -300,7 +301,7 @@ public void testDeleteRecordCdcSync() throws Exception { beforeDelete, Optional.of(beforeDelete))); - LOGGER.info("Starting delete record cdc sync 2"); + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); @@ -310,11 +311,11 @@ public void testDeleteRecordCdcSync() throws Exception { } @Test - public void testPartialResetFromSchemaUpdate() throws Exception { - LOGGER.info("Starting partial reset cdc test"); + public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -353,11 +354,11 @@ public void testPartialResetFromSchemaUpdate() throws Exception { } @Test - public void testPartialResetFromStreamSelection() throws Exception { - LOGGER.info("Starting partial reset cdc test"); + public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -436,7 +437,7 @@ public void testPartialResetFromStreamSelection() throws Exception { beforeInsert, Optional.empty())); - LOGGER.info("Starting sync after insert"); + LOGGER.info("Starting {} sync after insert", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); From 593769458c9e21329be05675db18f78e0c5d7b24 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 15 Jul 2022 13:11:25 -0700 Subject: [PATCH 18/23] Re-add acceptance test --- .../utils/AirbyteAcceptanceTestHarness.java | 13 +- .../test/acceptance/BasicAcceptanceTests.java | 234 +++++++++++++++++- 2 files changed, 241 insertions(+), 6 deletions(-) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 5b11fd48112a..db6c65743402 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -30,6 +30,7 @@ import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate; import io.airbyte.api.client.model.generated.DestinationIdRequestBody; import io.airbyte.api.client.model.generated.DestinationRead; +import io.airbyte.api.client.model.generated.DestinationSyncMode; import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.api.client.model.generated.JobListRequestBody; @@ -50,6 +51,7 @@ import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreProperties; @@ -717,7 +719,7 @@ private void deleteOperation(final UUID destinationId) throws ApiException { apiClient.getOperationApi().deleteOperation(new OperationIdRequestBody().operationId(destinationId)); } - public JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception { + public JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { return apiClient.getJobsApi() .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) .getJobs() @@ -799,4 +801,13 @@ public enum Type { DESTINATION } + public void setIncrementalAppendDedupSyncModeWithPrimaryKey(final AirbyteCatalog airbyteCatalog, final List cursorField) { + airbyteCatalog.getStreams().forEach(stream -> { + stream.getConfig().setSyncMode(SyncMode.INCREMENTAL); + stream.getConfig().setDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP); + stream.getConfig().setCursorField(cursorField); + stream.getConfig().setPrimaryKey(List.of(cursorField)); + }); + } + } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index cc27832dfa91..f9d4b35d2e50 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -15,7 +15,11 @@ import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobHasStatus; import static java.lang.Thread.sleep; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -26,7 +30,46 @@ import io.airbyte.api.client.generated.WebBackendApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.api.client.invoker.generated.ApiException; -import io.airbyte.api.client.model.generated.*; +import io.airbyte.api.client.model.generated.AirbyteCatalog; +import io.airbyte.api.client.model.generated.AirbyteStream; +import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; +import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration; +import io.airbyte.api.client.model.generated.AttemptInfoRead; +import io.airbyte.api.client.model.generated.AttemptStatus; +import io.airbyte.api.client.model.generated.CheckConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionSchedule; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.DataType; +import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; +import io.airbyte.api.client.model.generated.DestinationDefinitionIdWithWorkspaceId; +import io.airbyte.api.client.model.generated.DestinationDefinitionRead; +import io.airbyte.api.client.model.generated.DestinationDefinitionSpecificationRead; +import io.airbyte.api.client.model.generated.DestinationIdRequestBody; +import io.airbyte.api.client.model.generated.DestinationRead; +import io.airbyte.api.client.model.generated.DestinationSyncMode; +import io.airbyte.api.client.model.generated.JobConfigType; +import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.api.client.model.generated.JobInfoRead; +import io.airbyte.api.client.model.generated.JobListRequestBody; +import io.airbyte.api.client.model.generated.JobRead; +import io.airbyte.api.client.model.generated.JobStatus; +import io.airbyte.api.client.model.generated.JobWithAttemptsRead; +import io.airbyte.api.client.model.generated.OperationRead; +import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; +import io.airbyte.api.client.model.generated.SourceDefinitionIdWithWorkspaceId; +import io.airbyte.api.client.model.generated.SourceDefinitionRead; +import io.airbyte.api.client.model.generated.SourceDefinitionSpecificationRead; +import io.airbyte.api.client.model.generated.SourceIdRequestBody; +import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.StreamDescriptor; +import io.airbyte.api.client.model.generated.StreamState; +import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionRead; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -37,11 +80,26 @@ import java.net.URISyntaxException; import java.sql.SQLException; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import org.jooq.DSLContext; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1024,7 +1082,7 @@ public void testIncrementalSyncMultipleStreams() throws Exception { final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - for (AirbyteStreamAndConfiguration streamAndConfig : catalog.getStreams()) { + for (final AirbyteStreamAndConfiguration streamAndConfig : catalog.getStreams()) { final AirbyteStream stream = streamAndConfig.getStream(); assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); // instead of assertFalse to avoid NPE from unboxed. @@ -1105,6 +1163,172 @@ public void testIncrementalSyncMultipleStreams() throws Exception { } + @Test + public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { + // Add Table + final String additionalTable = "additional_table"; + final Database sourceDb = testHarness.getSourceDatabase(); + sourceDb.query(ctx -> { + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(1, "1").execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(2, "2").execute(); + return null; + }); + UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(catalog, List.of(COLUMN_ID)); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final OperationRead operation = testHarness.createOperation(); + final UUID operationId = operation.getOperationId(); + final String name = "test_reset_when_schema_is_modified_" + UUID.randomUUID(); + + final ConnectionRead connection = + testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, null); + + // Run initial sync + final JobInfoRead syncRead = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); + waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob()); + + testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + /** + * Remove stream + */ + sourceDb.query(ctx -> { + ctx.dropTableIfExists(additionalTable).execute(); + return null; + }); + // Update with refreshed catalog + sourceId = testHarness.createPostgresSource().getSourceId(); + AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); + WebBackendConnectionUpdate update = getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnection(update); + + // Wait until the sync from the UpdateConnection is finished + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"))); + + /** + * Add a stream -- the value of in the table are different than the initial import to ensure that it + * is properly reset. + */ + sourceDb.query(ctx -> { + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(3, "3").execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(4, "4").execute(); + return null; + }); + + sourceId = testHarness.createPostgresSource().getSourceId(); + refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); + update = getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnection(update); + + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + // Update + sourceDb.query(ctx -> { + ctx.dropTableIfExists(additionalTable).execute(); + ctx.createTableIfNotExists(additionalTable) + .columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR), DSL.field("another_field", SQLDataType.VARCHAR)) + .execute(); + ctx.truncate(additionalTable).execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(3, "3", "three") + .execute(); + ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(4, "4", "four") + .execute(); + return null; + }); + + sourceId = testHarness.createPostgresSource().getSourceId(); + refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); + update = getUpdateInput(connection, refreshedCatalog, operation); + webBackendApi.webBackendUpdateConnection(update); + + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // remove that + testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + assertStreamStateContainsStream(connection.getConnectionId(), List.of( + new StreamDescriptor().name("id_and_name").namespace("public"), + new StreamDescriptor().name(additionalTable).namespace("public"))); + + } + + private void assertStreamStateContainsStream(final UUID connectionId, final List expectedStreamDescriptors) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + final List streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList(); + + Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors)); + } + + private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + return new WebBackendConnectionUpdate() + .connectionId(connection.getConnectionId()) + .name(connection.getName()) + .operationIds(connection.getOperationIds()) + .operations(List.of(new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .syncCatalog(catalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .prefix(connection.getPrefix()) + .withRefreshedCatalog(true); + } + + private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { + return apiClient.getJobsApi() + .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) + .getJobs() + .stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow(); + } + + private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { + final JobRead lastJob = getMostRecentSyncJobId(connectionId); + if (lastJob.getStatus() != JobStatus.SUCCEEDED) { + return lastJob; + } + + JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + while (mostRecentSyncJob.getId().equals(lastJob.getId())) { + Thread.sleep(Duration.ofSeconds(10).toMillis()); + mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + } + return mostRecentSyncJob; + } + // This test is disabled because it takes a couple of minutes to run, as it is testing timeouts. // It should be re-enabled when the @SlowIntegrationTest can be applied to it. // See relevant issue: https://github.com/airbytehq/airbyte/issues/8397 From 16a219606cec589511448f822154495720df45b3 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 15 Jul 2022 15:14:40 -0700 Subject: [PATCH 19/23] Re-adda acceptance test --- .../WebBackendConnectionsHandler.java | 5 +- .../utils/AirbyteAcceptanceTestHarness.java | 48 ++++++++-- .../test/acceptance/BasicAcceptanceTests.java | 88 +++++++++++------- .../test/acceptance/CdcAcceptanceTests.java | 90 +++++++------------ 4 files changed, 133 insertions(+), 98 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index df5a0b4eedab..683bb8d3c52a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -389,7 +389,7 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo final ConfiguredAirbyteCatalog existingConfiguredCatalog = configRepository.getConfiguredCatalogForConnection(connectionId); ConnectionRead connectionRead; - connectionRead = connectionsHandler.updateConnection(connectionUpdate); + final Boolean skipReset = webBackendConnectionUpdate.getSkipReset() != null ? webBackendConnectionUpdate.getSkipReset() : false; if (!skipReset) { @@ -405,6 +405,7 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo List streamsToReset = allStreamToReset.stream().map(ProtocolConverters::streamDescriptorToProtocol).toList(); + connectionRead = connectionsHandler.updateConnection(connectionUpdate); if (!streamsToReset.isEmpty()) { final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId); final ConnectionStateType stateType = getStateType(connectionIdRequestBody); @@ -420,6 +421,8 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo verifyManualOperationResult(manualOperationResult); connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId()); } + } else { + connectionRead = connectionsHandler.updateConnection(connectionUpdate); } return buildWebBackendConnectionRead(connectionRead); } diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index db6c65743402..37f346e3d1be 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -52,6 +52,8 @@ import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.api.client.model.generated.SourceRead; import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreProperties; @@ -801,13 +803,49 @@ public enum Type { DESTINATION } - public void setIncrementalAppendDedupSyncModeWithPrimaryKey(final AirbyteCatalog airbyteCatalog, final List cursorField) { + public void assertDestinationDbEmpty(final boolean withScdTable) throws Exception { + final Database source = getSourceDatabase(); + final Set sourceTables = listAllTables(source); + final Set sourceTablesWithRawTablesAdded = addAirbyteGeneratedTables(withScdTable, sourceTables); + final Database destination = getDestinationDatabase(); + final Set destinationTables = listAllTables(destination); + assertEquals(sourceTablesWithRawTablesAdded, destinationTables, + String.format("streams did not match.\n source stream names: %s\n destination stream names: %s\n", sourceTables, destinationTables)); + + for (final SchemaTableNamePair pair : sourceTables) { + final List sourceRecords = retrieveRawDestinationRecords(pair); + assertTrue(sourceRecords.isEmpty()); + } + } + + public void setIncrementalAppendSyncMode(final AirbyteCatalog airbyteCatalog, final List cursorField) { airbyteCatalog.getStreams().forEach(stream -> { - stream.getConfig().setSyncMode(SyncMode.INCREMENTAL); - stream.getConfig().setDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP); - stream.getConfig().setCursorField(cursorField); - stream.getConfig().setPrimaryKey(List.of(cursorField)); + stream.getConfig().syncMode(SyncMode.INCREMENTAL) + .destinationSyncMode(DestinationSyncMode.APPEND) + .cursorField(cursorField); }); } + public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID)); + + return new WebBackendConnectionUpdate() + .connectionId(connection.getConnectionId()) + .name(connection.getName()) + .operationIds(connection.getOperationIds()) + .operations(List.of(new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .syncCatalog(catalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .prefix(connection.getPrefix()) + .withRefreshedCatalog(true); + } + } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index f9d4b35d2e50..4c62d0065478 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -89,6 +89,8 @@ import java.util.Set; import java.util.UUID; import org.jooq.DSLContext; +import org.jooq.Record; +import org.jooq.Result; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterAll; @@ -1163,6 +1165,30 @@ public void testIncrementalSyncMultipleStreams() throws Exception { } + @Test + public void testMultipleSchemasAndTablesSyncAndReset() throws Exception { + // create tables in another schema + PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_second_schema_multiple_tables.sql"), sourcePsql); + + final String connectionName = "test-connection"; + final UUID sourceId = testHarness.createPostgresSource().getSourceId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + + final SyncMode syncMode = SyncMode.FULL_REFRESH; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; + catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); + testHarness.assertSourceAndDestinationDbInSync(false); + final JobInfoRead connectionResetRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionResetRead.getJob()); + testHarness.assertDestinationDbEmpty(false); + } + @Test public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { // Add Table @@ -1178,12 +1204,13 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { }); UUID sourceId = testHarness.createPostgresSource().getSourceId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(catalog, List.of(COLUMN_ID)); final UUID destinationId = testHarness.createDestination().getDestinationId(); final OperationRead operation = testHarness.createOperation(); final UUID operationId = operation.getOperationId(); final String name = "test_reset_when_schema_is_modified_" + UUID.randomUUID(); + testHarness.setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID)); + final ConnectionRead connection = testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, null); @@ -1192,7 +1219,7 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId())); waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob()); - testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); assertStreamStateContainsStream(connection.getConnectionId(), List.of( new StreamDescriptor().name("id_and_name").namespace("public"), new StreamDescriptor().name(additionalTable).namespace("public"))); @@ -1200,15 +1227,11 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { /** * Remove stream */ - sourceDb.query(ctx -> { - ctx.dropTableIfExists(additionalTable).execute(); - return null; - }); + sourceDb.query(ctx -> ctx.dropTableIfExists(additionalTable).execute()); + // Update with refreshed catalog - sourceId = testHarness.createPostgresSource().getSourceId(); - AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); - testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); - WebBackendConnectionUpdate update = getUpdateInput(connection, refreshedCatalog, operation); + AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); + WebBackendConnectionUpdate update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); webBackendApi.webBackendUpdateConnection(update); // Wait until the sync from the UpdateConnection is finished @@ -1235,8 +1258,7 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); - testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); - update = getUpdateInput(connection, refreshedCatalog, operation); + update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); webBackendApi.webBackendUpdateConnection(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); @@ -1265,8 +1287,7 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); - testHarness.setIncrementalAppendDedupSyncModeWithPrimaryKey(refreshedCatalog, List.of(COLUMN_ID)); - update = getUpdateInput(connection, refreshedCatalog, operation); + update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); webBackendApi.webBackendUpdateConnection(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); @@ -1281,6 +1302,25 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { } + // can be helpful for debugging + private void printDbs() throws SQLException { + final Database sourceDb = testHarness.getSourceDatabase(); + Set pairs = testHarness.listAllTables(sourceDb); + LOGGER.info("Printing source tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + + final Database destDb = testHarness.getDestinationDatabase(); + pairs = testHarness.listAllTables(destDb); + LOGGER.info("Printing destination tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + } + private void assertStreamStateContainsStream(final UUID connectionId, final List expectedStreamDescriptors) throws ApiException { final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); final List streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList(); @@ -1288,26 +1328,6 @@ private void assertStreamStateContainsStream(final UUID connectionId, final List Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors)); } - private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { - return new WebBackendConnectionUpdate() - .connectionId(connection.getConnectionId()) - .name(connection.getName()) - .operationIds(connection.getOperationIds()) - .operations(List.of(new WebBackendOperationCreateOrUpdate() - .name(operation.getName()) - .operationId(operation.getOperationId()) - .workspaceId(operation.getWorkspaceId()) - .operatorConfiguration(operation.getOperatorConfiguration()))) - .namespaceDefinition(connection.getNamespaceDefinition()) - .namespaceFormat(connection.getNamespaceFormat()) - .syncCatalog(catalog) - .schedule(connection.getSchedule()) - .sourceCatalogId(connection.getSourceCatalogId()) - .status(connection.getStatus()) - .prefix(connection.getPrefix()) - .withRefreshedCatalog(true); - } - private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { return apiClient.getJobsApi() .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 0859826166be..1bed094cd254 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -38,7 +38,6 @@ import io.airbyte.api.client.model.generated.StreamState; import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; -import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -155,7 +154,7 @@ public void end() { } @Test - public void testIncrementalCdcSync(TestInfo testInfo) throws Exception { + public void testIncrementalCdcSync(final TestInfo testInfo) throws Exception { LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); @@ -174,7 +173,7 @@ public void testIncrementalCdcSync(TestInfo testInfo) throws Exception { List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); - List expectedStreams = List.of( + final List expectedStreams = List.of( new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE), new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE)); assertGlobalStateContainsStreams(connectionId, expectedStreams); @@ -251,7 +250,7 @@ public void testIncrementalCdcSync(TestInfo testInfo) throws Exception { // tests that incremental syncs still work properly even when using a destination connector that was // built on the old protocol that did not have any per-stream state fields @Test - public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testInfo) throws Exception { + public void testIncrementalCdcSyncWithLegacyDestinationConnector(final TestInfo testInfo) throws Exception { LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID postgresDestDefId = testHarness.getPostgresDestinationDefinitionId(); // Fetch the current/most recent source definition version @@ -272,7 +271,7 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testIn } @Test - public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { + public void testDeleteRecordCdcSync(final TestInfo testInfo) throws Exception { LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); @@ -284,7 +283,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); final Database source = testHarness.getSourceDatabase(); - List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + final List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); final Instant beforeDelete = Instant.now(); @@ -293,7 +292,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { // delete a record source.query(ctx -> ctx.execute("DELETE FROM id_and_name WHERE id=1")); - Map deletedRecordMap = new HashMap<>(); + final Map deletedRecordMap = new HashMap<>(); deletedRecordMap.put(COLUMN_ID, 1); deletedRecordMap.put(COLUMN_NAME, null); expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( @@ -311,7 +310,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { } @Test - public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception { + public void testPartialResetFromSchemaUpdate(final TestInfo testInfo) throws Exception { LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); @@ -323,14 +322,14 @@ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception final Database source = testHarness.getSourceDatabase(); - List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + final List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); - List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + final List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); - StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); - StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + final StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + final StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); LOGGER.info("Removing color palette table"); @@ -338,14 +337,14 @@ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception LOGGER.info("Refreshing schema and updating connection"); final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - UUID sourceId = createCdcSource().getSourceId(); - AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + final UUID sourceId = createCdcSource().getSourceId(); + final AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Refreshed catalog: {}", refreshedCatalog); - WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); + final WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, refreshedCatalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); - JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); + final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); // We do not check that the source and the dest are in sync here because removing a stream doesn't @@ -354,7 +353,7 @@ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception } @Test - public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Exception { + public void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Exception { LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); @@ -366,14 +365,14 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except final Database source = testHarness.getSourceDatabase(); - List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + final List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); - List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + final List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); - StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); - StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + final StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + final StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); LOGGER.info("Removing color palette stream from configured catalog"); @@ -388,7 +387,7 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except .toList(); catalog.setStreams(updatedStreams); LOGGER.info("Updated catalog: {}", catalog); - WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); + WebBackendConnectionUpdate update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); @@ -403,7 +402,7 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except LOGGER.info("Adding color palette stream back to configured catalog"); catalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Updated catalog: {}", catalog); - update = getUpdateInput(connectionRead, catalog, operationRead); + update = testHarness.getUpdateInput(connectionRead, catalog, operationRead); webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); @@ -447,8 +446,8 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); } - private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { - List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); + private List getCdcRecordMatchersFromSource(final Database source, final String tableName) throws SQLException { + final List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); return new ArrayList<>(sourceRecords .stream() .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) @@ -500,7 +499,8 @@ private SourceRead createCdcSource() throws ApiException { Jsons.jsonNode(sourceDbConfigMap)); } - private void assertDestinationMatches(String streamName, List expectedDestRecordMatchers) throws Exception { + private void assertDestinationMatches(final String streamName, final List expectedDestRecordMatchers) + throws Exception { final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName)); if (destRecords.size() != expectedDestRecordMatchers.size()) { final String errorMessage = String.format( @@ -512,22 +512,22 @@ private void assertDestinationMatches(String streamName, List matchingDestRecords = destRecords.stream().filter(destRecord -> { - Map sourceRecordMap = Jsons.object(recordMatcher.sourceRecord, Map.class); - Map destRecordMap = Jsons.object(destRecord, Map.class); + final Map sourceRecordMap = Jsons.object(recordMatcher.sourceRecord, Map.class); + final Map destRecordMap = Jsons.object(destRecord, Map.class); - boolean sourceRecordValuesMatch = sourceRecordMap.keySet() + final boolean sourceRecordValuesMatch = sourceRecordMap.keySet() .stream() .allMatch(column -> Objects.equals(sourceRecordMap.get(column), destRecordMap.get(column))); final Object cdcUpdatedAtValue = destRecordMap.get(CDC_UPDATED_AT_COLUMN); // use epoch millis to guarantee the two values are at the same precision - boolean cdcUpdatedAtMatches = cdcUpdatedAtValue != null + final boolean cdcUpdatedAtMatches = cdcUpdatedAtValue != null && Instant.parse(String.valueOf(cdcUpdatedAtValue)).toEpochMilli() >= recordMatcher.minUpdatedAt.toEpochMilli(); final Object cdcDeletedAtValue = destRecordMap.get(CDC_DELETED_AT_COLUMN); - boolean cdcDeletedAtMatches; + final boolean cdcDeletedAtMatches; if (recordMatcher.minDeletedAt.isPresent()) { cdcDeletedAtMatches = cdcDeletedAtValue != null && Instant.parse(String.valueOf(cdcDeletedAtValue)).toEpochMilli() >= recordMatcher.minDeletedAt.get().toEpochMilli(); @@ -567,32 +567,6 @@ private void assertNoState(final UUID connectionId) throws ApiException { assertNull(state.getGlobalState()); } - private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; - catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) - .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); - - return new WebBackendConnectionUpdate() - .connectionId(connection.getConnectionId()) - .name(connection.getName()) - .operationIds(connection.getOperationIds()) - .operations(List.of(new WebBackendOperationCreateOrUpdate() - .name(operation.getName()) - .operationId(operation.getOperationId()) - .workspaceId(operation.getWorkspaceId()) - .operatorConfiguration(operation.getOperatorConfiguration()))) - .namespaceDefinition(connection.getNamespaceDefinition()) - .namespaceFormat(connection.getNamespaceFormat()) - .syncCatalog(catalog) - .schedule(connection.getSchedule()) - .sourceCatalogId(connection.getSourceCatalogId()) - .status(connection.getStatus()) - .prefix(connection.getPrefix()); - } - // can be helpful for debugging private void printDbs() throws SQLException { final Database sourceDb = testHarness.getSourceDatabase(); From a375fbfab35edf29a76476c75acdc71878916440 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 15 Jul 2022 15:17:31 -0700 Subject: [PATCH 20/23] Format --- .../io/airbyte/server/handlers/WebBackendConnectionsHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 683bb8d3c52a..0c3aa9af6e12 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -390,7 +390,6 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo configRepository.getConfiguredCatalogForConnection(connectionId); ConnectionRead connectionRead; - final Boolean skipReset = webBackendConnectionUpdate.getSkipReset() != null ? webBackendConnectionUpdate.getSkipReset() : false; if (!skipReset) { final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog); From 9f3639edf1b5909c286442c391ee4eb5cba97204 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 18 Jul 2022 09:17:28 -0700 Subject: [PATCH 21/23] Fix acceptance test --- .../WebBackendConnectionsHandler.java | 8 ++-- .../utils/AirbyteAcceptanceTestHarness.java | 3 +- .../test/acceptance/BasicAcceptanceTests.java | 32 +++------------- .../test/acceptance/CdcAcceptanceTests.java | 38 ++++++++----------- 4 files changed, 27 insertions(+), 54 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 412c128662e1..df5a0b4eedab 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -52,6 +52,7 @@ import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.client.EventRunner; import io.airbyte.server.converters.ProtocolConverters; @@ -388,10 +389,12 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo final ConfiguredAirbyteCatalog existingConfiguredCatalog = configRepository.getConfiguredCatalogForConnection(connectionId); ConnectionRead connectionRead; + connectionRead = connectionsHandler.updateConnection(connectionUpdate); final Boolean skipReset = webBackendConnectionUpdate.getSkipReset() != null ? webBackendConnectionUpdate.getSkipReset() : false; if (!skipReset) { - final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingConfiguredCatalog); + final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog); + final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog); final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog(); final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog); final List apiStreamsToReset = getStreamsToReset(catalogDiff); @@ -402,7 +405,6 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo List streamsToReset = allStreamToReset.stream().map(ProtocolConverters::streamDescriptorToProtocol).toList(); - connectionRead = connectionsHandler.updateConnection(connectionUpdate); if (!streamsToReset.isEmpty()) { final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionId); final ConnectionStateType stateType = getStateType(connectionIdRequestBody); @@ -418,8 +420,6 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo verifyManualOperationResult(manualOperationResult); connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId()); } - } else { - connectionRead = connectionsHandler.updateConnection(connectionUpdate); } return buildWebBackendConnectionRead(connectionRead); } diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 37f346e3d1be..e10d8db3612b 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -845,7 +845,8 @@ public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection .sourceCatalogId(connection.getSourceCatalogId()) .status(connection.getStatus()) .prefix(connection.getPrefix()) - .withRefreshedCatalog(true); + .withRefreshedCatalog(true) + .skipReset(false); } } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index dd5c8dcc047f..8ebd05e4d572 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -67,7 +67,6 @@ import io.airbyte.api.client.model.generated.StreamDescriptor; import io.airbyte.api.client.model.generated.StreamState; import io.airbyte.api.client.model.generated.SyncMode; -import io.airbyte.api.client.model.generated.WebBackendConnectionRead; import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; @@ -89,8 +88,6 @@ import java.util.Set; import java.util.UUID; import org.jooq.DSLContext; -import org.jooq.Record; -import org.jooq.Result; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterAll; @@ -1232,7 +1229,7 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { // Update with refreshed catalog AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId); WebBackendConnectionUpdate update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); // Wait until the sync from the UpdateConnection is finished JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); @@ -1259,14 +1256,14 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); // We do not check that the source and the dest are in sync here because removing a stream doesn't // remove that - testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); assertStreamStateContainsStream(connection.getConnectionId(), List.of( new StreamDescriptor().name("id_and_name").namespace("public"), new StreamDescriptor().name(additionalTable).namespace("public"))); @@ -1288,39 +1285,20 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { sourceId = testHarness.createPostgresSource().getSourceId(); refreshedCatalog = testHarness.discoverSourceSchema(sourceId); update = testHarness.getUpdateInput(connection, refreshedCatalog, operation); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); // We do not check that the source and the dest are in sync here because removing a stream doesn't // remove that - testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); assertStreamStateContainsStream(connection.getConnectionId(), List.of( new StreamDescriptor().name("id_and_name").namespace("public"), new StreamDescriptor().name(additionalTable).namespace("public"))); } - // can be helpful for debugging - private void printDbs() throws SQLException { - final Database sourceDb = testHarness.getSourceDatabase(); - Set pairs = testHarness.listAllTables(sourceDb); - LOGGER.info("Printing source tables"); - for (final SchemaTableNamePair pair : pairs) { - final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); - LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); - } - - final Database destDb = testHarness.getDestinationDatabase(); - pairs = testHarness.listAllTables(destDb); - LOGGER.info("Printing destination tables"); - for (final SchemaTableNamePair pair : pairs) { - final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); - LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); - } - } - private void assertStreamStateContainsStream(final UUID connectionId, final List expectedStreamDescriptors) throws ApiException { final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); final List streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList(); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index c82b0ac99d79..21218ef7b111 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -53,10 +53,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.UUID; -import org.jooq.Record; -import org.jooq.Result; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -499,7 +496,8 @@ private SourceRead createCdcSource() throws ApiException { Jsons.jsonNode(sourceDbConfigMap)); } - private void assertDestinationMatches(final String streamName, final List expectedDestRecordMatchers) throws Exception { + private void assertDestinationMatches(final String streamName, final List expectedDestRecordMatchers) + throws Exception { final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName)); if (destRecords.size() != expectedDestRecordMatchers.size()) { final String errorMessage = String.format( @@ -566,23 +564,19 @@ private void assertNoState(final UUID connectionId) throws ApiException { assertNull(state.getGlobalState()); } - // can be helpful for debugging - private void printDbs() throws SQLException { - final Database sourceDb = testHarness.getSourceDatabase(); - Set pairs = testHarness.listAllTables(sourceDb); - LOGGER.info("Printing source tables"); - for (final SchemaTableNamePair pair : pairs) { - final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); - LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); - } - - final Database destDb = testHarness.getDestinationDatabase(); - pairs = testHarness.listAllTables(destDb); - LOGGER.info("Printing destination tables"); - for (final SchemaTableNamePair pair : pairs) { - final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); - LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); - } - } + /* + * // can be helpful for debugging private void printDbs() throws SQLException { final Database + * sourceDb = testHarness.getSourceDatabase(); Set pairs = + * testHarness.listAllTables(sourceDb); LOGGER.info("Printing source tables"); for (final + * SchemaTableNamePair pair : pairs) { final Result result = sourceDb.query(context -> + * context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + * LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); } + * + * final Database destDb = testHarness.getDestinationDatabase(); pairs = + * testHarness.listAllTables(destDb); LOGGER.info("Printing destination tables"); for (final + * SchemaTableNamePair pair : pairs) { final Result result = destDb.query(context -> + * context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + * LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); } } + */ } From 778a3612d3287d9b2e5aa25b1bdfe109100fb8ae Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 18 Jul 2022 09:57:46 -0700 Subject: [PATCH 22/23] Add log --- .../airbyte/test/acceptance/BasicAcceptanceTests.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 8ebd05e4d572..6883bec7f507 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -1187,7 +1187,9 @@ public void testMultipleSchemasAndTablesSyncAndReset() throws Exception { } @Test - public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { + public void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throws Exception { + LOGGER.info("Running: " + testInfo.getDisplayName()); + // Add Table final String additionalTable = "additional_table"; final Database sourceDb = testHarness.getSourceDatabase(); @@ -1221,6 +1223,8 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { new StreamDescriptor().name("id_and_name").namespace("public"), new StreamDescriptor().name(additionalTable).namespace("public"))); + LOGGER.info("Initial sync ran, now running an update with a stream being removed."); + /** * Remove stream */ @@ -1240,6 +1244,8 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { assertStreamStateContainsStream(connection.getConnectionId(), List.of( new StreamDescriptor().name("id_and_name").namespace("public"))); + LOGGER.info("Remove done, now running an update with a stream being added."); + /** * Add a stream -- the value of in the table are different than the initial import to ensure that it * is properly reset. @@ -1268,6 +1274,8 @@ public void testPartialResetResetAllWhenSchemaIsModified() throws Exception { new StreamDescriptor().name("id_and_name").namespace("public"), new StreamDescriptor().name(additionalTable).namespace("public"))); + LOGGER.info("Addition done, now running an update with a stream being updated."); + // Update sourceDb.query(ctx -> { ctx.dropTableIfExists(additionalTable).execute(); From edb4e20cf6158ab6d0f26135f3b04ffe4764ec96 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 18 Jul 2022 09:59:52 -0700 Subject: [PATCH 23/23] remove unwanted changes --- .../airbyte/server/handlers/WebBackendConnectionsHandler.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index df5a0b4eedab..413a4df9b7c1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -52,7 +52,6 @@ import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.client.EventRunner; import io.airbyte.server.converters.ProtocolConverters; @@ -393,8 +392,7 @@ public WebBackendConnectionRead webBackendUpdateConnectionNew(final WebBackendCo final Boolean skipReset = webBackendConnectionUpdate.getSkipReset() != null ? webBackendConnectionUpdate.getSkipReset() : false; if (!skipReset) { - final io.airbyte.protocol.models.AirbyteCatalog existingCatalog = CatalogHelpers.configuredCatalogToCatalog(existingConfiguredCatalog); - final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingCatalog); + final AirbyteCatalog apiExistingCatalog = CatalogConverter.toApi(existingConfiguredCatalog); final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog(); final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog); final List apiStreamsToReset = getStreamsToReset(catalogDiff);