Skip to content

Commit

Permalink
test for behavior when a column is removed in an incremental sync (#2…
Browse files Browse the repository at this point in the history
…0033)

* test for behavior when a column is removed in an incremental sync

* fixes in dat test for dropping a column

* only run drop-one-column test for the exchange rates dataset

* re-enable tests that were disabled during development

* remove unused import

* update test to new method for checking spec capabilities

* use config directly instead of parameterized test

Co-authored-by: Michael Siega <michael@airbyte.io>
Co-authored-by: Michael Siega <109092231+mfsiega-airbyte@users.noreply.github.com>
  • Loading branch information
3 people authored and jbfbell committed Jan 13, 2023
1 parent 3673aab commit de8a177
Showing 1 changed file with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -650,6 +649,74 @@ public void testIncrementalSync() throws Exception {
defaultSchema);
}

@ArgumentsSource(DataArgumentsProvider.class)
@Test
public void testIncrementalSyncWithNormalizationDropOneColumn()
throws Exception {
if (!normalizationFromDefinition()) {
return;
}

final AirbyteCatalog catalog = Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(ProtocolVersion.V0)),
AirbyteCatalog.class);

if (!catalog.getStreams().get(0).getName().equals("exchange_rate")) {
// This test is only implemented for the exchange rate catalog.
return;
}

final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
configuredCatalog.getStreams().forEach(s -> {
s.withSyncMode(SyncMode.INCREMENTAL);
s.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP);
s.withCursorField(Collections.emptyList());
// use composite primary key of various types (string, float)
s.withPrimaryKey(
List.of(List.of("id"), List.of("currency"), List.of("date"), List.of("NZD"), List.of("USD")));
});

List<AirbyteMessage> messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(ProtocolVersion.V0))
.lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());

final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, true);

final String defaultSchema = getDefaultSchema(config);
List<AirbyteRecordMessage> actualMessages = retrieveNormalizedRecords(catalog,
defaultSchema);
assertSameMessages(messages, actualMessages, true);

// remove one field
final JsonNode jsonSchema = configuredCatalog.getStreams().get(0).getStream().getJsonSchema();
((ObjectNode) jsonSchema.findValue("properties")).remove("HKD");
// insert more messages
// NOTE: we re-read the messages because `assertSameMessages` above pruned the emittedAt timestamps.
messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(ProtocolVersion.V0)).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
messages.add(Jsons.deserialize(
"{\"type\": \"RECORD\", \"record\": {\"stream\": \"exchange_rate\", \"emitted_at\": 1602637989500, \"data\": { \"id\": 2, \"currency\": \"EUR\", \"date\": \"2020-09-02T00:00:00Z\", \"NZD\": 1.14, \"USD\": 10.16}}}\n",
AirbyteMessage.class));

runSyncAndVerifyStateOutput(config, messages, configuredCatalog, true);

// assert the removed field is missing on the new messages
actualMessages = retrieveNormalizedRecords(catalog, defaultSchema);

// We expect all the of messages to be missing the removed column after normalization.
final List<AirbyteMessage> expectedMessages = messages.stream().map((message) -> {
if (message.getRecord() != null) {
((ObjectNode) message.getRecord().getData()).remove("HKD");
}
return message;
}).collect(Collectors.toList());
assertSameMessages(expectedMessages, actualMessages, true);
}

/**
* Verify that the integration successfully writes records successfully both raw and normalized.
* Tests a wide variety of messages an schemas (aspirationally, anyway).
Expand Down Expand Up @@ -1468,7 +1535,6 @@ public String toString() {
* your_containers_id" (ex. docker container attach 18cc929f44c8) to see the container's output
*/
@Test
@Disabled
public void testStressPerformance() throws Exception {
final int streamsSize = 5; // number of generated streams
final int messagesNumber = 300; // number of msg to be written to each generated stream
Expand Down

0 comments on commit de8a177

Please sign in to comment.