Skip to content

Commit

Permalink
Add acceptance tests for per-stream state updates (#14263)
Browse files Browse the repository at this point in the history
* Add acceptance tests for per-stream state updates

* PR feedback

* Formatting

* More PR feedback

* PR feedback

* Remove unused constant
  • Loading branch information
jdpgrailsdev authored Jun 30, 2022
1 parent cfea528 commit 6dadd1b
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.api.client.model.generated.SourceCreate;
import io.airbyte.api.client.model.generated.SourceDefinitionCreate;
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
import io.airbyte.api.client.model.generated.SourceDefinitionUpdate;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.api.client.model.generated.SourceRead;
Expand Down Expand Up @@ -113,6 +114,8 @@ public class AirbyteAcceptanceTestHarness {
private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.1";
private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.1";

public static final String POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION = "0.4.26";

private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_";
private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}";
private static final String OUTPUT_STREAM_PREFIX = "output_table_";
Expand Down Expand Up @@ -284,7 +287,8 @@ private void assignEnvVars() {
isGke = System.getenv().containsKey("IS_GKE");
isMac = System.getProperty("os.name").startsWith("Mac");
useExternalDeployment =
System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
System.getenv("USE_EXTERNAL_DEPLOYMENT") != null &&
System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
}

private WorkflowClient getWorkflowClient() {
Expand Down Expand Up @@ -636,6 +640,11 @@ public UUID getPostgresSourceDefinitionId() throws ApiException {
.getSourceDefinitionId();
}

public void updateSourceDefinitionVersion(final UUID sourceDefinitionId, final String dockerImageTag) throws ApiException {
apiClient.getSourceDefinitionApi().updateSourceDefinition(new SourceDefinitionUpdate()
.sourceDefinitionId(sourceDefinitionId).dockerImageTag(dockerImageTag));
}

private void clearSourceDbData() throws SQLException {
final Database database = getSourceDatabase();
final Set<SchemaTableNamePair> pairs = listAllTables(database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class BasicAcceptanceTests {

private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class);

private static final Boolean WITH_SCD_TABLE = true;

private static final Boolean WITHOUT_SCD_TABLE = false;

private static AirbyteAcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static UUID workspaceId;
Expand Down Expand Up @@ -324,7 +328,7 @@ public void testScheduledSync() throws Exception {
sleep(Duration.ofSeconds(30).toMillis());
}

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

@Test
Expand Down Expand Up @@ -369,7 +373,7 @@ public void testMultipleSchemasSameTablesSync() throws Exception {

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

@Test
Expand All @@ -395,7 +399,7 @@ public void testIncrementalDedupeSync() throws Exception {
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());

testHarness.assertSourceAndDestinationDbInSync(true);
testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE);

// add new records and run again.
final Database source = testHarness.getSourceDatabase();
Expand Down Expand Up @@ -448,7 +452,7 @@ public void testIncrementalSync() throws Exception {
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// add new records and run again.
final Database source = testHarness.getSourceDatabase();
Expand Down Expand Up @@ -489,7 +493,7 @@ public void testIncrementalSync() throws Exception {
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

}

Expand Down Expand Up @@ -566,12 +570,10 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception {
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing connection update when temporal is in a terminal state");
Expand Down Expand Up @@ -611,12 +613,10 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing manual sync when temporal is in a terminal state");
Expand Down Expand Up @@ -653,12 +653,10 @@ public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws E
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing reset connection when temporal is in a terminal state");
Expand Down Expand Up @@ -712,7 +710,150 @@ public void testResetCancelsRunningSync() throws Exception {
assertEquals(JobStatus.CANCELLED, connectionSyncReadAfterReset.getStatus());
}

// This test is disabled because it takes a couple minutes to run, as it is testing timeouts.
@Test
public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());
final String connectionName = "test-connection";
final SourceRead source = testHarness.createPostgresSource();
final UUID sourceId = source.getSourceId();
final UUID sourceDefinitionId = source.getSourceDefinitionId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);

// Fetch the current/most recent source definition version
final SourceDefinitionRead sourceDefinitionRead =
apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId));
final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag();

// Set the source to a version that does not support per-stream state
LOGGER.info("Setting source connector to pre-per-stream state version {}...",
AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);

catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(DestinationSyncMode.APPEND));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName());

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// Set source to a version that supports per-stream state
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion);
LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion);

// add new records and run again.
final Database sourceDatabase = testHarness.getSourceDatabase();
// get contents of source before mutating records.
final List<JsonNode> expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME);
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()));
// add a new record
sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
// mutate a record that was already synced with out updating its cursor value. if we are actually
// full refreshing, this record will appear in the output and cause the test to fail. if we are,
// correctly, doing incremental, we will not find this value in the destination.
sourceDatabase.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2"));

LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME));

// reset back to no data.
LOGGER.info("Starting {} reset", testInfo.getDisplayName());
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public",
STREAM_NAME));

// sync one more time. verify it is the equivalent of a full refresh.
final String expectedState =
"{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}";
LOGGER.info("Starting {} sync 3", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
LOGGER.info("state after sync 3: {}", state);

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
assertEquals(Jsons.deserialize(expectedState), state.getState());
}

@Test
public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());
final String connectionName = "test-connection";
final SourceRead source = testHarness.createPostgresSource();
final UUID sourceId = source.getSourceId();
final UUID sourceDefinitionId = source.getSourceDefinitionId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);

// Fetch the current/most recent source definition version
final SourceDefinitionRead sourceDefinitionRead =
apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId));
final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag();

// Set the source to a version that does not support per-stream state
LOGGER.info("Setting source connector to pre-per-stream state version {}...",
AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);

catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(DestinationSyncMode.APPEND));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName());

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// Set source to a version that supports per-stream state
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion);
LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion);

// sync one more time. verify that nothing has been synced
LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead2.getJob().getId()));
final Optional<AttemptInfoRead> result = syncJob.getAttempts().stream()
.sorted((a, b) -> Long.compare(b.getAttempt().getEndedAt(), a.getAttempt().getEndedAt()))
.findFirst();

assertTrue(result.isPresent());
assertEquals(0, result.get().getAttempt().getRecordsSynced());
assertEquals(0, result.get().getAttempt().getTotalStats().getRecordsEmitted());
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

// This test is disabled because it takes a couple of minutes to run, as it is testing timeouts.
// It should be re-enabled when the @SlowIntegrationTest can be applied to it.
// See relevant issue: https://github.com/airbytehq/airbyte/issues/8397
@Disabled
Expand Down

0 comments on commit 6dadd1b

Please sign in to comment.