Skip to content

Commit

Permalink
🎉 Destination connectors: Improved "SecondSync" checks in Standard D…
Browse files Browse the repository at this point in the history
…estination Acceptance tests (#14184)

* [11731] Improved "SecondSync" checks in Standard Destination Acceptance tests
  • Loading branch information
etsybaev authored Jun 27, 2022
1 parent a0f5655 commit 26a35af
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public abstract class DestinationAcceptanceTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;

private static final String DUMMY_CATALOG_NAME = "DummyCatalog";

private static final Logger LOGGER = LoggerFactory.getLogger(DestinationAcceptanceTest.class);

private TestDestinationEnv testEnv;
Expand Down Expand Up @@ -415,11 +417,26 @@ public void testSecondSync() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false);

// We need to make sure that other streams\tables\files in the same location will not be
// affected\deleted\overridden by our activities during first, second or any future sync.
// So let's create a dummy data that will be checked after all sync. It should remain the same
final AirbyteCatalog dummyCatalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
dummyCatalog.getStreams().get(0).setName(DUMMY_CATALOG_NAME);
final ConfiguredAirbyteCatalog configuredDummyCatalog = CatalogHelpers.toDefaultConfiguredCatalog(dummyCatalog);
// update messages to set new dummy stream name
firstSyncMessages.stream().filter(message -> message.getRecord() != null)
.forEach(message -> message.getRecord().setStream(DUMMY_CATALOG_NAME));
// sync dummy data
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredDummyCatalog, false);

// Run second sync
final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
Expand All @@ -442,6 +459,10 @@ public void testSecondSync() throws Exception {
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, false);
final String defaultSchema = getDefaultSchema(config);
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);

// verify that other streams in the same location were not affected. If something fails here,
// then this need to be fixed in connectors logic to override only required streams
retrieveRawRecordsAndAssertSameMessages(dummyCatalog, firstSyncMessages, defaultSchema);
}

/**
Expand Down

0 comments on commit 26a35af

Please sign in to comment.