Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog diff utility #13786

Merged
merged 1 commit into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.JsonSchemas;
import io.airbyte.commons.json.JsonSchemas.FieldNameOrList;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.UpdateFieldTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

/**
* Helper class for Catalog and Stream related operations. Generally only used in tests.
Expand Down Expand Up @@ -124,31 +134,151 @@ public static Set<String> getTopLevelFieldNames(final ConfiguredAirbyteStream st
}

/**
* @param node any json node
* @param jsonSchema - a JSONSchema node
* @return a set of all keys for all objects within the node
*/
@VisibleForTesting
protected static Set<String> getAllFieldNames(final JsonNode node) {
final Set<String> allFieldNames = new HashSet<>();

if (node.has("properties")) {
final JsonNode properties = node.get("properties");
final Iterator<String> fieldNames = properties.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
allFieldNames.add(fieldName);
final JsonNode fieldValue = properties.get(fieldName);
if (fieldValue.isObject()) {
allFieldNames.addAll(getAllFieldNames(fieldValue));
}
protected static Set<String> getAllFieldNames(final JsonNode jsonSchema) {
return getFullyQualifiedFieldNamesWithTypes(jsonSchema)
.stream()
.map(Pair::getLeft)
// only need field name, not fully qualified name
.map(MoreLists::last)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
}

/**
* Extracts all fields and their schemas from a JSONSchema. This method returns values in
* depth-first search preorder. It short circuits at oneOfs--in other words, child fields of a oneOf
* are not returned.
*
* @param jsonSchema - a JSONSchema node
* @return a list of all keys for all objects within the node. ordered in depth-first search
* preorder.
*/
@VisibleForTesting
protected static List<Pair<List<String>, JsonNode>> getFullyQualifiedFieldNamesWithTypes(final JsonNode jsonSchema) {
// if this were ever a performance issue, it could be replaced with a trie. this seems unlikely
// however.
final Set<List<String>> fieldNamesThatAreOneOfs = new HashSet<>();

return JsonSchemas.traverseJsonSchemaWithCollector(jsonSchema, (node, basicPath) -> {
final List<String> fieldName = basicPath.stream().filter(fieldOrList -> !fieldOrList.isList()).map(FieldNameOrList::getFieldName).toList();
return Pair.of(fieldName, node);
})
.stream()
// first node is the original object.
.skip(1)
.filter(fieldWithSchema -> filterChildrenOfFoneOneOf(fieldWithSchema.getLeft(), fieldWithSchema.getRight(), fieldNamesThatAreOneOfs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is an issue here but I think we should avoid using mutable variable in a lambda because of concurrency issues (for example with a parallel stream, it might fail).

.toList();
}

/**
* Predicate that checks if a field is a CHILD of a oneOf field. If child of a oneOf, returns false.
* Otherwise, true. This method as side effects. It assumes that it will be run in order on field
* names returned in depth-first search preoorder. As it encounters oneOfs it adds them to a
* collection. It then checks if subsequent field names are prefix matches to the field that are
* oneOfs.
*
* @param fieldName - field to investigate
* @param schema - schema of field
* @param oneOfFieldNameAccumulator - collection of fields that are oneOfs
* @return If child of a oneOf, returns false. Otherwise, true.
*/
private static boolean filterChildrenOfFoneOneOf(final List<String> fieldName,
final JsonNode schema,
final Set<List<String>> oneOfFieldNameAccumulator) {
if (isOneOfField(schema)) {
oneOfFieldNameAccumulator.add(fieldName);
// return early because we know it is a oneOf and therefore cannot be a child of a oneOf.
return true;
}

// leverage that nodes are returned in depth-first search preorder. this means the parent field for
// the oneOf will be present in the list BEFORE any of its children.
for (final List<String> oneOfFieldName : oneOfFieldNameAccumulator) {
final String oneOfFieldNameString = String.join(".", oneOfFieldName);
final String fieldNameString = String.join(".", fieldName);

if (fieldNameString.startsWith(oneOfFieldNameString)) {
return false;
}
}
return true;
}

return allFieldNames;
private static boolean isOneOfField(final JsonNode schema) {
return !MoreIterators.toSet(schema.fieldNames()).contains("type");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also be true for anyOf and allOf is it some field we want to filter as well? Or is it something not supported in the catalog.

}

private static boolean isObjectWithSubFields(Field field) {
private static boolean isObjectWithSubFields(final Field field) {
return field.getType() == JsonSchemaType.OBJECT && field.getSubFields() != null && !field.getSubFields().isEmpty();
}

public static StreamDescriptor extractStreamDescriptor(final AirbyteStream airbyteStream) {
return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace());
}

private static Map<StreamDescriptor, AirbyteStream> streamDescriptorToMap(final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.collect(Collectors.toMap(CatalogHelpers::extractStreamDescriptor, s -> s));
}

/**
* Returns difference between two provided catalogs.
*
* @param oldCatalog - old catalog
* @param newCatalog - new catalog
* @return difference between old and new catalogs
*/
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it something that we will want to mock? I think that the tests of the calling methods will be easier to test with a mock.

final Set<StreamTransform> streamTransforms = new HashSet<>();

final Map<StreamDescriptor, AirbyteStream> descriptorToStreamOld = streamDescriptorToMap(oldCatalog);
final Map<StreamDescriptor, AirbyteStream> descriptorToStreamNew = streamDescriptorToMap(newCatalog);

Sets.difference(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> streamTransforms.add(StreamTransform.createRemoveStreamTransform(descriptor)));
Sets.difference(descriptorToStreamNew.keySet(), descriptorToStreamOld.keySet())
.forEach(descriptor -> streamTransforms.add(StreamTransform.createAddStreamTransform(descriptor)));
Sets.intersection(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
.forEach(descriptor -> {
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);
if (!streamOld.equals(streamNew)) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(getStreamDiff(descriptor, streamOld, streamNew)));
}
});

return streamTransforms;
}

private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descriptor,
final AirbyteStream streamOld,
final AirbyteStream streamNew) {
final Set<FieldTransform> fieldTransforms = new HashSet<>();
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
final Map<List<String>, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(streamNew.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName))));
Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName))));
Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()).forEach(fieldName -> {
final JsonNode oldType = fieldNameToTypeOld.get(fieldName);
final JsonNode newType = fieldNameToTypeNew.get(fieldName);

if (!oldType.equals(newType)) {
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(fieldName, oldType, newType)));
}
});
return new UpdateStreamTransform(descriptor, fieldTransforms);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the addition of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class AddFieldTransform {
cgardens marked this conversation as resolved.
Show resolved Hide resolved
cgardens marked this conversation as resolved.
Show resolved Hide resolved

private final List<String> fieldName;
private final JsonNode schema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getSchema() {
return schema;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import io.airbyte.protocol.models.StreamDescriptor;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the addition of an {@link io.airbyte.protocol.models.AirbyteStream} to a
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class AddStreamTransform {
cgardens marked this conversation as resolved.
Show resolved Hide resolved

private final StreamDescriptor streamDescriptor;

public StreamDescriptor getStreamDescriptor() {
return streamDescriptor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the diff between two fields.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public final class FieldTransform {

private final FieldTransformType transformType;
private final AddFieldTransform addFieldTransform;
private final RemoveFieldTransform removeFieldTransform;
private final UpdateFieldTransform updateFieldTransform;

public static FieldTransform createAddFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createAddFieldTransform(new AddFieldTransform(fieldName, schema));
}

public static FieldTransform createAddFieldTransform(final AddFieldTransform addFieldTransform) {
return new FieldTransform(FieldTransformType.ADD_FIELD, addFieldTransform, null, null);
}

public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createRemoveFieldTransform(new RemoveFieldTransform(fieldName, schema));
}

public static FieldTransform createRemoveFieldTransform(final RemoveFieldTransform removeFieldTransform) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, null, removeFieldTransform, null);
}

public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransform updateFieldTransform) {
return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

/**
* Types of transformations possible for a field.
*/
public enum FieldTransformType {
ADD_FIELD,
REMOVE_FIELD,
UPDATE_FIELD
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the removal of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class RemoveFieldTransform {

private final List<String> fieldName;
private final JsonNode schema;

public List<String> getFieldName() {
return new ArrayList<>(fieldName);
}

public JsonNode getSchema() {
return schema;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.protocol.models.transform_models;

import io.airbyte.protocol.models.StreamDescriptor;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Represents the removal of an {@link io.airbyte.protocol.models.AirbyteStream} to a
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
*/
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class RemoveStreamTransform {

private final StreamDescriptor streamDescriptor;

}
Loading