Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination connectors: Improved "SecondSync" checks in Standard Destination Acceptance tests #14184

Merged
merged 3 commits into from
Jun 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public abstract class DestinationAcceptanceTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;

private static final String DUMMY_CATALOG_NAME = "DummyCatalog";

private static final Logger LOGGER = LoggerFactory.getLogger(DestinationAcceptanceTest.class);

private TestDestinationEnv testEnv;
Expand Down Expand Up @@ -415,11 +417,26 @@ public void testSecondSync() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false);

// We need to make sure that other streams\tables\files in the same location will not be
// affected\deleted\overridden by our activities during first, second or any future sync.
// So let's create a dummy data that will be checked after all sync. It should remain the same
final AirbyteCatalog dummyCatalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
dummyCatalog.getStreams().get(0).setName(DUMMY_CATALOG_NAME);
final ConfiguredAirbyteCatalog configuredDummyCatalog = CatalogHelpers.toDefaultConfiguredCatalog(dummyCatalog);
// update messages to set new dummy stream name
firstSyncMessages.stream().filter(message -> message.getRecord() != null)
.forEach(message -> message.getRecord().setStream(DUMMY_CATALOG_NAME));
// sync dummy data
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredDummyCatalog, false);

// Run second sync
final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
Expand All @@ -442,6 +459,10 @@ public void testSecondSync() throws Exception {
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, false);
final String defaultSchema = getDefaultSchema(config);
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);

// verify that other streams in the same location were not affected. If something fails here,
// then this need to be fixed in connectors logic to override only required streams
retrieveRawRecordsAndAssertSameMessages(dummyCatalog, firstSyncMessages, defaultSchema);
}

/**
Expand Down