From 0080a1c3532edc5caa7d09f28bc2b777c28e2722 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 14 Jun 2022 22:29:48 -0700 Subject: [PATCH] scaffold for catalog diff, needs fixing on type handling and tests add catalog diff with tests use lombok lombok formatting update JsonSchemas#traversal to use simplified path reuse jsonschemas add javadocs tests for lists add --- .../protocol/models/CatalogHelpers.java | 164 ++++++++++++++++-- .../transform_models/AddFieldTransform.java | 33 ++++ .../transform_models/AddStreamTransform.java | 27 +++ .../transform_models/FieldTransform.java | 46 +++++ .../transform_models/FieldTransformType.java | 14 ++ .../RemoveFieldTransform.java | 33 ++++ .../RemoveStreamTransform.java | 23 +++ .../transform_models/StreamTransform.java | 61 +++++++ .../transform_models/StreamTransformType.java | 14 ++ .../UpdateFieldTransform.java | 38 ++++ .../UpdateStreamTransform.java | 29 ++++ .../protocol/models/CatalogHelpersTest.java | 46 ++++- .../src/test/resources/valid_schema.json | 20 +++ .../src/test/resources/valid_schema2.json | 29 ++++ 14 files changed, 556 insertions(+), 21 deletions(-) create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddFieldTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddStreamTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransformType.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveFieldTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransformType.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateFieldTransform.java create mode 100644 airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateStreamTransform.java create mode 100644 airbyte-protocol/protocol-models/src/test/resources/valid_schema2.json diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 29d540454fd4..f3044709bc90 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -8,15 +8,25 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.airbyte.commons.json.JsonSchemas; +import io.airbyte.commons.json.JsonSchemas.FieldNameOrList; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.commons.util.MoreLists; +import io.airbyte.protocol.models.transform_models.FieldTransform; +import io.airbyte.protocol.models.transform_models.StreamTransform; +import io.airbyte.protocol.models.transform_models.UpdateFieldTransform; +import io.airbyte.protocol.models.transform_models.UpdateStreamTransform; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; /** * Helper class for Catalog and Stream related operations. Generally only used in tests. @@ -124,31 +134,151 @@ public static Set getTopLevelFieldNames(final ConfiguredAirbyteStream st } /** - * @param node any json node + * @param jsonSchema - a JSONSchema node * @return a set of all keys for all objects within the node */ @VisibleForTesting - protected static Set getAllFieldNames(final JsonNode node) { - final Set allFieldNames = new HashSet<>(); - - if (node.has("properties")) { - final JsonNode properties = node.get("properties"); - final Iterator fieldNames = properties.fieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - allFieldNames.add(fieldName); - final JsonNode fieldValue = properties.get(fieldName); - if (fieldValue.isObject()) { - allFieldNames.addAll(getAllFieldNames(fieldValue)); - } + protected static Set getAllFieldNames(final JsonNode jsonSchema) { + return getFullyQualifiedFieldNamesWithTypes(jsonSchema) + .stream() + .map(Pair::getLeft) + // only need field name, not fully qualified name + .map(MoreLists::last) + .flatMap(Optional::stream) + .collect(Collectors.toSet()); + } + + /** + * Extracts all fields and their schemas from a JSONSchema. This method returns values in + * depth-first search preorder. It short circuits at oneOfs--in other words, child fields of a oneOf + * are not returned. + * + * @param jsonSchema - a JSONSchema node + * @return a list of all keys for all objects within the node. ordered in depth-first search + * preorder. + */ + @VisibleForTesting + protected static List, JsonNode>> getFullyQualifiedFieldNamesWithTypes(final JsonNode jsonSchema) { + // if this were ever a performance issue, it could be replaced with a trie. this seems unlikely + // however. + final Set> fieldNamesThatAreOneOfs = new HashSet<>(); + + return JsonSchemas.traverseJsonSchemaWithCollector(jsonSchema, (node, basicPath) -> { + final List fieldName = basicPath.stream().filter(fieldOrList -> !fieldOrList.isList()).map(FieldNameOrList::getFieldName).toList(); + return Pair.of(fieldName, node); + }) + .stream() + // first node is the original object. + .skip(1) + .filter(fieldWithSchema -> filterChildrenOfFoneOneOf(fieldWithSchema.getLeft(), fieldWithSchema.getRight(), fieldNamesThatAreOneOfs)) + .toList(); + } + + /** + * Predicate that checks if a field is a CHILD of a oneOf field. If child of a oneOf, returns false. + * Otherwise, true. This method as side effects. It assumes that it will be run in order on field + * names returned in depth-first search preoorder. As it encounters oneOfs it adds them to a + * collection. It then checks if subsequent field names are prefix matches to the field that are + * oneOfs. + * + * @param fieldName - field to investigate + * @param schema - schema of field + * @param oneOfFieldNameAccumulator - collection of fields that are oneOfs + * @return If child of a oneOf, returns false. Otherwise, true. + */ + private static boolean filterChildrenOfFoneOneOf(final List fieldName, + final JsonNode schema, + final Set> oneOfFieldNameAccumulator) { + if (isOneOfField(schema)) { + oneOfFieldNameAccumulator.add(fieldName); + // return early because we know it is a oneOf and therefore cannot be a child of a oneOf. + return true; + } + + // leverage that nodes are returned in depth-first search preorder. this means the parent field for + // the oneOf will be present in the list BEFORE any of its children. + for (final List oneOfFieldName : oneOfFieldNameAccumulator) { + final String oneOfFieldNameString = String.join(".", oneOfFieldName); + final String fieldNameString = String.join(".", fieldName); + + if (fieldNameString.startsWith(oneOfFieldNameString)) { + return false; } } + return true; + } - return allFieldNames; + private static boolean isOneOfField(final JsonNode schema) { + return !MoreIterators.toSet(schema.fieldNames()).contains("type"); } - private static boolean isObjectWithSubFields(Field field) { + private static boolean isObjectWithSubFields(final Field field) { return field.getType() == JsonSchemaType.OBJECT && field.getSubFields() != null && !field.getSubFields().isEmpty(); } + public static StreamDescriptor extractStreamDescriptor(final AirbyteStream airbyteStream) { + return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace()); + } + + private static Map streamDescriptorToMap(final AirbyteCatalog catalog) { + return catalog.getStreams() + .stream() + .collect(Collectors.toMap(CatalogHelpers::extractStreamDescriptor, s -> s)); + } + + /** + * Returns difference between two provided catalogs. + * + * @param oldCatalog - old catalog + * @param newCatalog - new catalog + * @return difference between old and new catalogs + */ + public static Set getCatalogDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) { + final Set streamTransforms = new HashSet<>(); + + final Map descriptorToStreamOld = streamDescriptorToMap(oldCatalog); + final Map descriptorToStreamNew = streamDescriptorToMap(newCatalog); + + Sets.difference(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet()) + .forEach(descriptor -> streamTransforms.add(StreamTransform.createRemoveStreamTransform(descriptor))); + Sets.difference(descriptorToStreamNew.keySet(), descriptorToStreamOld.keySet()) + .forEach(descriptor -> streamTransforms.add(StreamTransform.createAddStreamTransform(descriptor))); + Sets.intersection(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet()) + .forEach(descriptor -> { + final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor); + final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor); + if (!streamOld.equals(streamNew)) { + streamTransforms.add(StreamTransform.createUpdateStreamTransform(getStreamDiff(descriptor, streamOld, streamNew))); + } + }); + + return streamTransforms; + } + + private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descriptor, + final AirbyteStream streamOld, + final AirbyteStream streamNew) { + final Set fieldTransforms = new HashSet<>(); + final Map, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema()) + .stream() + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + final Map, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(streamNew.getJsonSchema()) + .stream() + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()) + .forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName)))); + Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet()) + .forEach(fieldName -> fieldTransforms.add(FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName)))); + Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()).forEach(fieldName -> { + final JsonNode oldType = fieldNameToTypeOld.get(fieldName); + final JsonNode newType = fieldNameToTypeNew.get(fieldName); + + if (!oldType.equals(newType)) { + fieldTransforms.add(FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(fieldName, oldType, newType))); + } + }); + return new UpdateStreamTransform(descriptor, fieldTransforms); + } + } diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddFieldTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddFieldTransform.java new file mode 100644 index 000000000000..86abccf64106 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddFieldTransform.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the addition of a field to an {@link io.airbyte.protocol.models.AirbyteStream}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class AddFieldTransform { + + private final List fieldName; + private final JsonNode schema; + + public List getFieldName() { + return new ArrayList<>(fieldName); + } + + public JsonNode getSchema() { + return schema; + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddStreamTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddStreamTransform.java new file mode 100644 index 000000000000..804ad13ced39 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/AddStreamTransform.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import io.airbyte.protocol.models.StreamDescriptor; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the addition of an {@link io.airbyte.protocol.models.AirbyteStream} to a + * {@link io.airbyte.protocol.models.AirbyteCatalog}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class AddStreamTransform { + + private final StreamDescriptor streamDescriptor; + + public StreamDescriptor getStreamDescriptor() { + return streamDescriptor; + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java new file mode 100644 index 000000000000..af5d9b48037d --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransform.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the diff between two fields. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public final class FieldTransform { + + private final FieldTransformType transformType; + private final AddFieldTransform addFieldTransform; + private final RemoveFieldTransform removeFieldTransform; + private final UpdateFieldTransform updateFieldTransform; + + public static FieldTransform createAddFieldTransform(final List fieldName, final JsonNode schema) { + return createAddFieldTransform(new AddFieldTransform(fieldName, schema)); + } + + public static FieldTransform createAddFieldTransform(final AddFieldTransform addFieldTransform) { + return new FieldTransform(FieldTransformType.ADD_FIELD, addFieldTransform, null, null); + } + + public static FieldTransform createRemoveFieldTransform(final List fieldName, final JsonNode schema) { + return createRemoveFieldTransform(new RemoveFieldTransform(fieldName, schema)); + } + + public static FieldTransform createRemoveFieldTransform(final RemoveFieldTransform removeFieldTransform) { + return new FieldTransform(FieldTransformType.REMOVE_FIELD, null, removeFieldTransform, null); + } + + public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransform updateFieldTransform) { + return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform); + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransformType.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransformType.java new file mode 100644 index 000000000000..10c2227a39f9 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/FieldTransformType.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +/** + * Types of transformations possible for a field. + */ +public enum FieldTransformType { + ADD_FIELD, + REMOVE_FIELD, + UPDATE_FIELD +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveFieldTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveFieldTransform.java new file mode 100644 index 000000000000..a48314c3fa81 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveFieldTransform.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the removal of a field to an {@link io.airbyte.protocol.models.AirbyteStream}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class RemoveFieldTransform { + + private final List fieldName; + private final JsonNode schema; + + public List getFieldName() { + return new ArrayList<>(fieldName); + } + + public JsonNode getSchema() { + return schema; + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java new file mode 100644 index 000000000000..a5839ab0568c --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/RemoveStreamTransform.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import io.airbyte.protocol.models.StreamDescriptor; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the removal of an {@link io.airbyte.protocol.models.AirbyteStream} to a + * {@link io.airbyte.protocol.models.AirbyteCatalog}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class RemoveStreamTransform { + + private final StreamDescriptor streamDescriptor; + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransform.java new file mode 100644 index 000000000000..bf824323a5d0 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransform.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import io.airbyte.protocol.models.StreamDescriptor; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the diff between two {@link io.airbyte.protocol.models.AirbyteStream}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public final class StreamTransform { + + private final StreamTransformType transformType; + private final AddStreamTransform addStreamTransform; + private final RemoveStreamTransform removeStreamTransform; + private final UpdateStreamTransform updateStreamTransform; + + public static StreamTransform createAddStreamTransform(final StreamDescriptor streamDescriptor) { + return createAddStreamTransform(new AddStreamTransform(streamDescriptor)); + } + + public static StreamTransform createAddStreamTransform(final AddStreamTransform addStreamTransform) { + return new StreamTransform(StreamTransformType.ADD_STREAM, addStreamTransform, null, null); + } + + public static StreamTransform createRemoveStreamTransform(final StreamDescriptor streamDescriptor) { + return createRemoveStreamTransform(new RemoveStreamTransform(streamDescriptor)); + } + + public static StreamTransform createRemoveStreamTransform(final RemoveStreamTransform removeStreamTransform) { + return new StreamTransform(StreamTransformType.REMOVE_STREAM, null, removeStreamTransform, null); + } + + public static StreamTransform createUpdateStreamTransform(final UpdateStreamTransform updateStreamTransform) { + return new StreamTransform(StreamTransformType.UPDATE_STREAM, null, null, updateStreamTransform); + } + + public StreamTransformType getTransformType() { + return transformType; + } + + public AddStreamTransform getAddStreamTransform() { + return addStreamTransform; + } + + public RemoveStreamTransform getRemoveStreamTransform() { + return removeStreamTransform; + } + + public UpdateStreamTransform getUpdateStreamTransform() { + return updateStreamTransform; + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransformType.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransformType.java new file mode 100644 index 000000000000..297bff7e87a9 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/StreamTransformType.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +/** + * Types of transformations possible for a stream. + */ +public enum StreamTransformType { + ADD_STREAM, + REMOVE_STREAM, + UPDATE_STREAM +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateFieldTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateFieldTransform.java new file mode 100644 index 000000000000..7be3c6c0c39f --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateFieldTransform.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the update of a field. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class UpdateFieldTransform { + + private final List fieldName; + private final JsonNode oldSchema; + private final JsonNode newSchema; + + public List getFieldName() { + return new ArrayList<>(fieldName); + } + + public JsonNode getOldSchema() { + return oldSchema; + } + + public JsonNode getNewSchema() { + return newSchema; + } + +} diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateStreamTransform.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateStreamTransform.java new file mode 100644 index 000000000000..f9f43d3038d9 --- /dev/null +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/transform_models/UpdateStreamTransform.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.protocol.models.transform_models; + +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.HashSet; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Represents the update of an {@link io.airbyte.protocol.models.AirbyteStream}. + */ +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class UpdateStreamTransform { + + private final StreamDescriptor streamDescriptor; + private final Set fieldTransforms; + + public Set getFieldTransforms() { + return new HashSet<>(fieldTransforms); + } + +} diff --git a/airbyte-protocol/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 43cd93aa8d78..29adfe7d7d87 100644 --- a/airbyte-protocol/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/protocol-models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -7,17 +7,26 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.transform_models.FieldTransform; +import io.airbyte.protocol.models.transform_models.StreamTransform; +import io.airbyte.protocol.models.transform_models.UpdateFieldTransform; +import io.airbyte.protocol.models.transform_models.UpdateStreamTransform; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; class CatalogHelpersTest { + // handy for debugging test only. + private static final Comparator STREAM_TRANSFORM_COMPARATOR = + Comparator.comparing(StreamTransform::getTransformType); + @Test void testFieldToJsonSchema() { final String expected = """ @@ -72,10 +81,39 @@ void testGetTopLevelFieldNames() { void testGetFieldNames() throws IOException { final JsonNode node = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); final Set actualFieldNames = CatalogHelpers.getAllFieldNames(node); - final Set expectedFieldNames = - ImmutableSet.of("date", "CAD", "HKD", "ISK", "PHP", "DKK", "HUF", "文", "somekey", "something", "nestedkey"); + final List expectedFieldNames = + List.of("CAD", "DKK", "HKD", "HUF", "ISK", "PHP", "date", "nestedkey", "somekey", "something", "something2", "文"); + + // sort so that the diff is easier to read. + assertEquals(expectedFieldNames.stream().sorted().toList(), actualFieldNames.stream().sorted().toList()); + } + + @Test + void testGetCatalogDiff() throws IOException { + final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); + final JsonNode schema2 = Jsons.deserialize(MoreResources.readResource("valid_schema2.json")); + final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of( + new AirbyteStream().withName("users").withJsonSchema(schema1), + new AirbyteStream().withName("accounts").withJsonSchema(Jsons.emptyObject()))); + final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of( + new AirbyteStream().withName("users").withJsonSchema(schema2), + new AirbyteStream().withName("sales").withJsonSchema(Jsons.emptyObject()))); - assertEquals(expectedFieldNames, actualFieldNames); + final Set actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2); + final List expectedDiff = Stream.of( + StreamTransform.createAddStreamTransform(new StreamDescriptor().withName("sales")), + StreamTransform.createRemoveStreamTransform(new StreamDescriptor().withName("accounts")), + StreamTransform.createUpdateStreamTransform(new UpdateStreamTransform(new StreamDescriptor().withName("users"), Set.of( + FieldTransform.createAddFieldTransform(List.of("COD"), schema2.get("properties").get("COD")), + FieldTransform.createRemoveFieldTransform(List.of("something2"), schema1.get("properties").get("something2")), + FieldTransform.createRemoveFieldTransform(List.of("HKD"), schema1.get("properties").get("HKD")), + FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform( + List.of("CAD"), + schema1.get("properties").get("CAD"), + schema2.get("properties").get("CAD"))))))) + .sorted(STREAM_TRANSFORM_COMPARATOR) + .toList(); + assertEquals(expectedDiff, actualDiff.stream().sorted(STREAM_TRANSFORM_COMPARATOR).toList()); } } diff --git a/airbyte-protocol/protocol-models/src/test/resources/valid_schema.json b/airbyte-protocol/protocol-models/src/test/resources/valid_schema.json index 0a87904fafd2..a5b7b656f3e2 100644 --- a/airbyte-protocol/protocol-models/src/test/resources/valid_schema.json +++ b/airbyte-protocol/protocol-models/src/test/resources/valid_schema.json @@ -24,6 +24,26 @@ "patternProperties": { ".+": {} } + }, + "something2": { + "oneOf": [ + { + "type": "object", + "properties": { + "oneOfOne": { + "type": "string" + } + } + }, + { + "type": "object", + "properties": { + "oneOfTwo": { + "type": "string" + } + } + } + ] } } } diff --git a/airbyte-protocol/protocol-models/src/test/resources/valid_schema2.json b/airbyte-protocol/protocol-models/src/test/resources/valid_schema2.json new file mode 100644 index 000000000000..f84e8458be7c --- /dev/null +++ b/airbyte-protocol/protocol-models/src/test/resources/valid_schema2.json @@ -0,0 +1,29 @@ +{ + "type": "object", + "properties": { + "date": { "type": "string", "format": "date-time" }, + "CAD": { "type": ["null", "string"] }, + "COD": { "type": ["null", "string"] }, + "ISK": { "type": ["null", "number"] }, + "PHP": { "type": ["null", "number"] }, + "DKK": { "type": ["null", "number"] }, + "HUF": { "type": ["null", "number"] }, + "文": { "type": ["null", "number"] }, + "something": { + "type": ["null", "object"], + "properties": { + "somekey": { + "type": ["null", "object"], + "properties": { + "nestedkey": { + "type": ["null", "number"] + } + } + } + }, + "patternProperties": { + ".+": {} + } + } + } +}