Skip to content

Commit

Permalink
scaffold for catalog diff, needs fixing on type handling and tests
Browse files Browse the repository at this point in the history
add catalog diff with tests

use lombok

lombok formatting

update JsonSchemas#traversal to use simplified path

reuse jsonschemas

add javadocs

tests for lists add
  • Loading branch information
cgardens committed Jun 19, 2022
1 parent becae16 commit 5da035c
Show file tree
Hide file tree
Showing 14 changed files with 556 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.JsonSchemas;
import io.airbyte.commons.json.JsonSchemas.FieldNameOrList;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

/**
* Helper class for Catalog and Stream related operations. Generally only used in tests.
Expand Down Expand Up @@ -124,31 +134,151 @@ public static Set<String> getTopLevelFieldNames(final ConfiguredAirbyteStream st
}

/**
* @param node any json node
* @param jsonSchema - a JSONSchema node
* @return a set of all keys for all objects within the node
*/
@VisibleForTesting
protected static Set<String> getAllFieldNames(final JsonNode node) {
final Set<String> allFieldNames = new HashSet<>();

if (node.has("properties")) {
final JsonNode properties = node.get("properties");
final Iterator<String> fieldNames = properties.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
allFieldNames.add(fieldName);
final JsonNode fieldValue = properties.get(fieldName);
if (fieldValue.isObject()) {
allFieldNames.addAll(getAllFieldNames(fieldValue));
}
protected static Set<String> getAllFieldNames(final JsonNode jsonSchema) {
return getFullyQualifiedFieldNamesWithTypes(jsonSchema)
.stream()
.map(Pair::getLeft)
// only need field name, not fully qualified name
.map(MoreLists::last)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
}

/**
* Extracts all fields and their schemas from a JSONSchema. This method returns values in
* depth-first search preorder. It short circuits at oneOfs--in other words, child fields of a oneOf
* are not returned.
*
* @param jsonSchema - a JSONSchema node
* @return a list of all keys for all objects within the node. ordered in depth-first search
* preorder.
*/
@VisibleForTesting
protected static List<Pair<List<String>, JsonNode>> getFullyQualifiedFieldNamesWithTypes(final JsonNode jsonSchema) {
// if this were ever a performance issue, it could be replaced with a trie. this seems unlikely
// however.
final Set<List<String>> fieldNamesThatAreOneOfs = new HashSet<>();

return JsonSchemas.traverseJsonSchemaWithCollector(jsonSchema, (node, basicPath) -> {
final List<String> fieldName = basicPath.stream().filter(fieldOrList -> !fieldOrList.isList()).map(FieldNameOrList::getFieldName).toList();
return Pair.of(fieldName, node);
})
.stream()
// first node is the original object.
.skip(1)
.filter(fieldWithSchema -> filterChildrenOfFoneOneOf(fieldWithSchema.getLeft(), fieldWithSchema.getRight(), fieldNamesThatAreOneOfs))
.toList();
}

/**
* Predicate that checks if a field is a CHILD of a oneOf field. If child of a oneOf, returns false.
* Otherwise, true. This method as side effects. It assumes that it will be run in order on field
* names returned in depth-first search preoorder. As it encounters oneOfs it adds them to a
* collection. It then checks if subsequent field names are prefix matches to the field that are
* oneOfs.
*
* @param fieldName - field to investigate
* @param schema - schema of field
* @param oneOfFieldNameAccumulator - collection of fields that are oneOfs
* @return If child of a oneOf, returns false. Otherwise, true.
*/
private static boolean filterChildrenOfFoneOneOf(final List<String> fieldName,
final JsonNode schema,
final Set<List<String>> oneOfFieldNameAccumulator) {
if (isOneOfField(schema)) {
oneOfFieldNameAccumulator.add(fieldName);
// return early because we know it is a oneOf and therefore cannot be a child of a oneOf.
return true;
}

// leverage that nodes are returned in depth-first search preorder. this means the parent field for
// the oneOf will be present in the list BEFORE any of its children.
for (final List<String> oneOfFieldName : oneOfFieldNameAccumulator) {
final String oneOfFieldNameString = String.join(".", oneOfFieldName);
final String fieldNameString = String.join(".", fieldName);

if (fieldNameString.startsWith(oneOfFieldNameString)) {
return false;
}
}
return true;
}

return allFieldNames;
private static boolean isOneOfField(final JsonNode schema) {
return !MoreIterators.toSet(schema.fieldNames()).contains("type");
}

private static boolean isObjectWithSubFields(Field field) {
private static boolean isObjectWithSubFields(final Field field) {
return field.getType() == JsonSchemaType.OBJECT && field.getSubFields() != null && !field.getSubFields().isEmpty();
}

public static StreamDescriptor extractStreamDescriptor(final AirbyteStream airbyteStream) {
return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace());
}

private static Map<StreamDescriptor, AirbyteStream> streamDescriptorToMap(final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.collect(Collectors.toMap(CatalogHelpers::extractStreamDescriptor, s -> s));
}

/**
* Returns difference between two provided catalogs.
*
* @param oldCatalog - old catalog
* @param newCatalog - new catalog
* @return difference between old and new catalogs
*/
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
final Set<StreamTransform> streamTransforms = new HashSet<>();

final Map<StreamDescriptor, AirbyteStream> descriptorToStreamOld = streamDescriptorToMap(oldCatalog);
final Map<StreamDescriptor, AirbyteStream> descriptorToStreamNew = streamDescriptorToMap(newCatalog);

Sets.difference(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> streamTransforms.add(StreamTransform.createRemoveStreamTransform(descriptor)));
Sets.difference(descriptorToStreamNew.keySet(), descriptorToStreamOld.keySet())
.forEach(descriptor -> streamTransforms.add(StreamTransform.createAddStreamTransform(descriptor)));
Sets.intersection(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> {
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);
if (!streamOld.equals(streamNew)) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(getStreamDiff(descriptor, streamOld, streamNew)));
}
});

return streamTransforms;
}

private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descriptor,
final AirbyteStream streamOld,
final AirbyteStream streamNew) {
final Set<FieldTransform> fieldTransforms = new HashSet<>();
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
final Map<List<String>, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(streamNew.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName))));
Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName))));
Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()).forEach(fieldName -> {
final JsonNode oldType = fieldNameToTypeOld.get(fieldName);
final JsonNode newType = fieldNameToTypeNew.get(fieldName);

if (!oldType.equals(newType)) {
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(fieldName, oldType, newType)));
}
});
return new UpdateStreamTransform(descriptor, fieldTransforms);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the addition of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class AddFieldTransform {

private final List<String> fieldName;
private final JsonNode schema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getSchema() {
return schema;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import io.airbyte.protocol.models.StreamDescriptor;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the addition of an {@link io.airbyte.protocol.models.AirbyteStream} to a
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class AddStreamTransform {

private final StreamDescriptor streamDescriptor;

public StreamDescriptor getStreamDescriptor() {
return streamDescriptor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the diff between two fields.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public final class FieldTransform {

private final FieldTransformType transformType;
private final AddFieldTransform addFieldTransform;
private final RemoveFieldTransform removeFieldTransform;
private final UpdateFieldTransform updateFieldTransform;

public static FieldTransform createAddFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createAddFieldTransform(new AddFieldTransform(fieldName, schema));
}

public static FieldTransform createAddFieldTransform(final AddFieldTransform addFieldTransform) {
return new FieldTransform(FieldTransformType.ADD_FIELD, addFieldTransform, null, null);
}

public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createRemoveFieldTransform(new RemoveFieldTransform(fieldName, schema));
}

public static FieldTransform createRemoveFieldTransform(final RemoveFieldTransform removeFieldTransform) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, null, removeFieldTransform, null);
}

public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransform updateFieldTransform) {
return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

/**
* Types of transformations possible for a field.
*/
public enum FieldTransformType {
ADD_FIELD,
REMOVE_FIELD,
UPDATE_FIELD
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the removal of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class RemoveFieldTransform {

private final List<String> fieldName;
private final JsonNode schema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getSchema() {
return schema;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import io.airbyte.protocol.models.StreamDescriptor;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the removal of an {@link io.airbyte.protocol.models.AirbyteStream} to a
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class RemoveStreamTransform {

private final StreamDescriptor streamDescriptor;

}
Loading

0 comments on commit 5da035c

Please sign in to comment.