Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/add non cdc partial reset acceptance test #14764

Merged
merged 26 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fd1d874
set per stream feature flag to true for testing
lmossman Jul 8, 2022
c6431de
add a second table to cdc acceptance tests
lmossman Jul 9, 2022
d3d615e
add partial reset test
lmossman Jul 9, 2022
832d944
format
lmossman Jul 9, 2022
36c0e28
add partial reset cdc tests
lmossman Jul 11, 2022
676bfe8
test incremental after partial reset
lmossman Jul 11, 2022
904a7ba
remove dev image from acceptance test
lmossman Jul 12, 2022
42007eb
fix flag and add comment
lmossman Jul 12, 2022
624b747
Revert "set per stream feature flag to true for testing"
lmossman Jul 14, 2022
b04742b
set USE_STREAM_CAPABLE_STATE flag to true in acceptance test script
lmossman Jul 14, 2022
16340d4
call new update endpoint
lmossman Jul 14, 2022
e67ec0e
use methods in test harness instead
lmossman Jul 14, 2022
c71b0ba
remove comment
lmossman Jul 14, 2022
28c6757
add env var to worker container
lmossman Jul 14, 2022
ce315b5
format
lmossman Jul 14, 2022
f5cf0f7
fix state check in basic acceptance test
lmossman Jul 15, 2022
dbd4974
use test info for test name logging
lmossman Jul 15, 2022
5937694
Re-add acceptance test
benmoriceau Jul 15, 2022
16a2196
Re-adda acceptance test
benmoriceau Jul 15, 2022
a375fbf
Format
benmoriceau Jul 15, 2022
9f83989
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/add…
benmoriceau Jul 15, 2022
70a3415
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/add…
benmoriceau Jul 18, 2022
9f3639e
Fix acceptance test
benmoriceau Jul 18, 2022
f95ee64
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/add…
benmoriceau Jul 18, 2022
778a361
Add log
benmoriceau Jul 18, 2022
edb4e20
remove unwanted changes
benmoriceau Jul 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate;
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.JobConfigType;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.api.client.model.generated.JobListRequestBody;
Expand All @@ -50,6 +51,9 @@
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.api.client.model.generated.SourceRead;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate;
import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreProperties;
Expand Down Expand Up @@ -717,7 +721,7 @@ private void deleteOperation(final UUID destinationId) throws ApiException {
apiClient.getOperationApi().deleteOperation(new OperationIdRequestBody().operationId(destinationId));
}

public JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception {
public JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception {
return apiClient.getJobsApi()
.listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC)))
.getJobs()
Expand Down Expand Up @@ -799,4 +803,50 @@ public enum Type {
DESTINATION
}

public void assertDestinationDbEmpty(final boolean withScdTable) throws Exception {
final Database source = getSourceDatabase();
final Set<SchemaTableNamePair> sourceTables = listAllTables(source);
final Set<SchemaTableNamePair> sourceTablesWithRawTablesAdded = addAirbyteGeneratedTables(withScdTable, sourceTables);
final Database destination = getDestinationDatabase();
final Set<SchemaTableNamePair> destinationTables = listAllTables(destination);
assertEquals(sourceTablesWithRawTablesAdded, destinationTables,
String.format("streams did not match.\n source stream names: %s\n destination stream names: %s\n", sourceTables, destinationTables));

for (final SchemaTableNamePair pair : sourceTables) {
final List<JsonNode> sourceRecords = retrieveRawDestinationRecords(pair);
assertTrue(sourceRecords.isEmpty());
}
}

public void setIncrementalAppendSyncMode(final AirbyteCatalog airbyteCatalog, final List<String> cursorField) {
airbyteCatalog.getStreams().forEach(stream -> {
stream.getConfig().syncMode(SyncMode.INCREMENTAL)
.destinationSyncMode(DestinationSyncMode.APPEND)
.cursorField(cursorField);
});
}

public WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) {
setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID));

return new WebBackendConnectionUpdate()
.connectionId(connection.getConnectionId())
.name(connection.getName())
.operationIds(connection.getOperationIds())
.operations(List.of(new WebBackendOperationCreateOrUpdate()
.name(operation.getName())
.operationId(operation.getOperationId())
.workspaceId(operation.getWorkspaceId())
.operatorConfiguration(operation.getOperatorConfiguration())))
.namespaceDefinition(connection.getNamespaceDefinition())
.namespaceFormat(connection.getNamespaceFormat())
.syncCatalog(catalog)
.schedule(connection.getSchedule())
.sourceCatalogId(connection.getSourceCatalogId())
.status(connection.getStatus())
.prefix(connection.getPrefix())
.withRefreshedCatalog(true)
.skipReset(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob;
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobHasStatus;
import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -26,7 +30,45 @@
import io.airbyte.api.client.generated.WebBackendApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.*;
import io.airbyte.api.client.model.generated.AirbyteCatalog;
import io.airbyte.api.client.model.generated.AirbyteStream;
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.generated.AttemptInfoRead;
import io.airbyte.api.client.model.generated.AttemptStatus;
import io.airbyte.api.client.model.generated.CheckConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionSchedule;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.DataType;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdWithWorkspaceId;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
import io.airbyte.api.client.model.generated.DestinationDefinitionSpecificationRead;
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.JobConfigType;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.api.client.model.generated.JobInfoRead;
import io.airbyte.api.client.model.generated.JobListRequestBody;
import io.airbyte.api.client.model.generated.JobRead;
import io.airbyte.api.client.model.generated.JobStatus;
import io.airbyte.api.client.model.generated.JobWithAttemptsRead;
import io.airbyte.api.client.model.generated.OperationRead;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
import io.airbyte.api.client.model.generated.SourceDefinitionSpecificationRead;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.api.client.model.generated.SourceRead;
import io.airbyte.api.client.model.generated.StreamDescriptor;
import io.airbyte.api.client.model.generated.StreamState;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate;
import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.test.utils.AirbyteAcceptanceTestHarness;
Expand All @@ -37,11 +79,26 @@
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1105,6 +1162,179 @@ public void testIncrementalSyncMultipleStreams() throws Exception {

}

@Test
public void testMultipleSchemasAndTablesSyncAndReset() throws Exception {
// create tables in another schema
PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_second_schema_multiple_tables.sql"), sourcePsql);

final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);

final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
testHarness.assertSourceAndDestinationDbInSync(false);
final JobInfoRead connectionResetRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionResetRead.getJob());
testHarness.assertDestinationDbEmpty(false);
}

@Test
public void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throws Exception {
LOGGER.info("Running: " + testInfo.getDisplayName());

// Add Table
final String additionalTable = "additional_table";
final Database sourceDb = testHarness.getSourceDatabase();
sourceDb.query(ctx -> {
ctx.createTableIfNotExists(additionalTable)
.columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute();
ctx.truncate(additionalTable).execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(1, "1").execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(2, "2").execute();
return null;
});
UUID sourceId = testHarness.createPostgresSource().getSourceId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final UUID destinationId = testHarness.createDestination().getDestinationId();
final OperationRead operation = testHarness.createOperation();
final UUID operationId = operation.getOperationId();
final String name = "test_reset_when_schema_is_modified_" + UUID.randomUUID();

testHarness.setIncrementalAppendSyncMode(catalog, List.of(COLUMN_ID));

final ConnectionRead connection =
testHarness.createConnection(name, sourceId, destinationId, List.of(operationId), catalog, null);

// Run initial sync
final JobInfoRead syncRead =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
waitForSuccessfulJob(apiClient.getJobsApi(), syncRead.getJob());

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
assertStreamStateContainsStream(connection.getConnectionId(), List.of(
new StreamDescriptor().name("id_and_name").namespace("public"),
new StreamDescriptor().name(additionalTable).namespace("public")));

LOGGER.info("Initial sync ran, now running an update with a stream being removed.");

/**
* Remove stream
*/
sourceDb.query(ctx -> ctx.dropTableIfExists(additionalTable).execute());

// Update with refreshed catalog
AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchemaWithoutCache(sourceId);
WebBackendConnectionUpdate update = testHarness.getUpdateInput(connection, refreshedCatalog, operation);
webBackendApi.webBackendUpdateConnectionNew(update);

// Wait until the sync from the UpdateConnection is finished
JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId());
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

// We do not check that the source and the dest are in sync here because removing a stream doesn't
// remove that
assertStreamStateContainsStream(connection.getConnectionId(), List.of(
new StreamDescriptor().name("id_and_name").namespace("public")));

LOGGER.info("Remove done, now running an update with a stream being added.");

/**
* Add a stream -- the value of in the table are different than the initial import to ensure that it
* is properly reset.
*/
sourceDb.query(ctx -> {
ctx.createTableIfNotExists(additionalTable)
.columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR)).execute();
ctx.truncate(additionalTable).execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(3, "3").execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field")).values(4, "4").execute();
return null;
});

sourceId = testHarness.createPostgresSource().getSourceId();
refreshedCatalog = testHarness.discoverSourceSchema(sourceId);
update = testHarness.getUpdateInput(connection, refreshedCatalog, operation);
webBackendApi.webBackendUpdateConnectionNew(update);

syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId());
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

// We do not check that the source and the dest are in sync here because removing a stream doesn't
// remove that
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
assertStreamStateContainsStream(connection.getConnectionId(), List.of(
new StreamDescriptor().name("id_and_name").namespace("public"),
new StreamDescriptor().name(additionalTable).namespace("public")));

LOGGER.info("Addition done, now running an update with a stream being updated.");

// Update
sourceDb.query(ctx -> {
ctx.dropTableIfExists(additionalTable).execute();
ctx.createTableIfNotExists(additionalTable)
.columns(DSL.field("id", SQLDataType.INTEGER), DSL.field("field", SQLDataType.VARCHAR), DSL.field("another_field", SQLDataType.VARCHAR))
.execute();
ctx.truncate(additionalTable).execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(3, "3", "three")
.execute();
ctx.insertInto(DSL.table(additionalTable)).columns(DSL.field("id"), DSL.field("field"), DSL.field("another_field")).values(4, "4", "four")
.execute();
return null;
});

sourceId = testHarness.createPostgresSource().getSourceId();
refreshedCatalog = testHarness.discoverSourceSchema(sourceId);
update = testHarness.getUpdateInput(connection, refreshedCatalog, operation);
webBackendApi.webBackendUpdateConnectionNew(update);

syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId());
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

// We do not check that the source and the dest are in sync here because removing a stream doesn't
// remove that
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
assertStreamStateContainsStream(connection.getConnectionId(), List.of(
new StreamDescriptor().name("id_and_name").namespace("public"),
new StreamDescriptor().name(additionalTable).namespace("public")));

}

private void assertStreamStateContainsStream(final UUID connectionId, final List<StreamDescriptor> expectedStreamDescriptors) throws ApiException {
final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
final List<StreamDescriptor> streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList();

Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors));
}

private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception {
return apiClient.getJobsApi()
.listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC)))
.getJobs()
.stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow();
}

private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception {
final JobRead lastJob = getMostRecentSyncJobId(connectionId);
if (lastJob.getStatus() != JobStatus.SUCCEEDED) {
return lastJob;
}

JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
while (mostRecentSyncJob.getId().equals(lastJob.getId())) {
Thread.sleep(Duration.ofSeconds(10).toMillis());
mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
}
return mostRecentSyncJob;
}

// This test is disabled because it takes a couple of minutes to run, as it is testing timeouts.
// It should be re-enabled when the @SlowIntegrationTest can be applied to it.
// See relevant issue: https://github.com/airbytehq/airbyte/issues/8397
Expand Down
Loading