Skip to content

Commit

Permalink
Add breakingChange to catalogDiff (#17588)
Browse files Browse the repository at this point in the history
* Add breaking field to FieldTransform on catalogDiff
  • Loading branch information
alovew authored Oct 11, 2022
1 parent 62500af commit ca2605d
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 52 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4204,6 +4204,7 @@ components:
required:
- transformType
- fieldName
- breaking
properties:
transformType:
type: string
Expand All @@ -4213,6 +4214,8 @@ components:
- update_field_schema
fieldName:
$ref: "#/components/schemas/FieldName"
breaking:
type: boolean
addField:
$ref: "#/components/schemas/FieldAdd"
removeField:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -308,7 +309,9 @@ private static Map<StreamDescriptor, AirbyteStream> streamDescriptorToMap(final
* @param newCatalog - new catalog
* @return difference between old and new catalogs
*/
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog,
final AirbyteCatalog newCatalog,
final ConfiguredAirbyteCatalog configuredCatalog) {
final Set<StreamTransform> streamTransforms = new HashSet<>();

final Map<StreamDescriptor, AirbyteStream> descriptorToStreamOld = streamDescriptorToMap(oldCatalog);
Expand All @@ -322,16 +325,23 @@ public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalo
.forEach(descriptor -> {
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);

final Optional<ConfiguredAirbyteStream> stream = configuredCatalog.getStreams().stream()
.filter(s -> Objects.equals(s.getStream().getNamespace(), descriptor.getNamespace())
&& s.getStream().getName().equals(descriptor.getName()))
.findFirst();

if (!streamOld.equals(streamNew)) {
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, getStreamDiff(streamOld, streamNew)));
streamTransforms.add(StreamTransform.createUpdateStreamTransform(descriptor, getStreamDiff(streamOld, streamNew, stream)));
}
});

return streamTransforms;
}

private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld,
final AirbyteStream streamNew) {
final AirbyteStream streamNew,
final Optional<ConfiguredAirbyteStream> configuredStream) {
final Set<FieldTransform> fieldTransforms = new HashSet<>();
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema())
.stream()
Expand All @@ -347,7 +357,10 @@ private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld
CatalogHelpers::combineAccumulator);

Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName))));
.forEach(fieldName -> {
fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName),
transformBreaksConnection(configuredStream, fieldName)));
});
Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName))));
Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()).forEach(fieldName -> {
Expand All @@ -358,6 +371,7 @@ private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(fieldName, new UpdateFieldSchemaTransform(oldType, newType)));
}
});

return new UpdateStreamTransform(fieldTransforms);
}

Expand All @@ -384,4 +398,23 @@ static void combineAccumulator(final Map<List<String>, JsonNode> accumulatorLeft
});
}

static boolean transformBreaksConnection(final Optional<ConfiguredAirbyteStream> configuredStream, final List<String> fieldName) {
if (configuredStream.isEmpty()) {
return false;
}

final ConfiguredAirbyteStream streamConfig = configuredStream.get();

final SyncMode syncMode = streamConfig.getSyncMode();
if (SyncMode.INCREMENTAL == syncMode && streamConfig.getCursorField().equals(fieldName)) {
return true;
}

final DestinationSyncMode destinationSyncMode = streamConfig.getDestinationSyncMode();
if (DestinationSyncMode.APPEND_DEDUP == destinationSyncMode && streamConfig.getPrimaryKey().contains(fieldName)) {
return true;
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ public final class FieldTransform {
private final AddFieldTransform addFieldTransform;
private final RemoveFieldTransform removeFieldTransform;
private final UpdateFieldSchemaTransform updateFieldTransform;
private final boolean breaking;

public static FieldTransform createAddFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createAddFieldTransform(fieldName, new AddFieldTransform(schema));
}

public static FieldTransform createAddFieldTransform(final List<String> fieldName, final AddFieldTransform addFieldTransform) {
return new FieldTransform(FieldTransformType.ADD_FIELD, fieldName, addFieldTransform, null, null);
return new FieldTransform(FieldTransformType.ADD_FIELD, fieldName, addFieldTransform, null, null, false);
}

public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema) {
return createRemoveFieldTransform(fieldName, new RemoveFieldTransform(fieldName, schema));
public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema, final Boolean breaking) {
return createRemoveFieldTransform(fieldName, new RemoveFieldTransform(fieldName, schema), breaking);
}

public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final RemoveFieldTransform removeFieldTransform) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, fieldName, null, removeFieldTransform, null);
public static FieldTransform createRemoveFieldTransform(final List<String> fieldName,
final RemoveFieldTransform removeFieldTransform,
final Boolean breaking) {
return new FieldTransform(FieldTransformType.REMOVE_FIELD, fieldName, null, removeFieldTransform, null, breaking);
}

public static FieldTransform createUpdateFieldTransform(final List<String> fieldName, final UpdateFieldSchemaTransform updateFieldTransform) {
return new FieldTransform(FieldTransformType.UPDATE_FIELD_SCHEMA, fieldName, null, null, updateFieldTransform);
return new FieldTransform(FieldTransformType.UPDATE_FIELD_SCHEMA, fieldName, null, null, updateFieldTransform, false);
}

public FieldTransformType getTransformType() {
Expand All @@ -64,4 +67,8 @@ public UpdateFieldSchemaTransform getUpdateFieldTransform() {
return updateFieldTransform;
}

public boolean breaking() {
return breaking;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ class CatalogHelpersTest {
private static final String SOME_ARRAY = "someArray";
private static final String PROPERTIES = "properties";
private static final String USERS = "users";
private static final String DATE = "date";
private static final String SALES = "sales";
private static final String COMPANIES_VALID = "companies_schema.json";
private static final String COMPANIES_INVALID = "companies_schema_invalid.json";
private static final String VALID_SCHEMA_JSON = "valid_schema.json";

@Test
void testFieldToJsonSchema() {
Expand Down Expand Up @@ -93,10 +96,10 @@ void testGetTopLevelFieldNames() {

@Test
void testGetFieldNames() throws IOException {
final JsonNode node = Jsons.deserialize(MoreResources.readResource("valid_schema.json"));
final JsonNode node = Jsons.deserialize(MoreResources.readResource(VALID_SCHEMA_JSON));
final Set<String> actualFieldNames = CatalogHelpers.getAllFieldNames(node);
final List<String> expectedFieldNames =
List.of(CAD, "DKK", "HKD", "HUF", "ISK", "PHP", "date", "nestedkey", "somekey", "something", "something2", "文", SOME_ARRAY, ITEMS,
List.of("id", CAD, "DKK", "HKD", "HUF", "ISK", "PHP", DATE, "nestedkey", "somekey", "something", "something2", "文", SOME_ARRAY, ITEMS,
"oldName");

// sort so that the diff is easier to read.
Expand All @@ -105,23 +108,28 @@ void testGetFieldNames() throws IOException {

@Test
void testGetCatalogDiff() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource("valid_schema.json"));
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource(VALID_SCHEMA_JSON));
final JsonNode schema2 = Jsons.deserialize(MoreResources.readResource("valid_schema2.json"));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1),
new AirbyteStream().withName("accounts").withJsonSchema(Jsons.emptyObject())));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema2),
new AirbyteStream().withName("sales").withJsonSchema(Jsons.emptyObject())));
new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject())));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);
final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createAddStreamTransform(new StreamDescriptor().withName("sales")),
StreamTransform.createAddStreamTransform(new StreamDescriptor().withName(SALES)),
StreamTransform.createRemoveStreamTransform(new StreamDescriptor().withName("accounts")),
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName(USERS), new UpdateStreamTransform(Set.of(
FieldTransform.createAddFieldTransform(List.of("COD"), schema2.get(PROPERTIES).get("COD")),
FieldTransform.createRemoveFieldTransform(List.of("something2"), schema1.get(PROPERTIES).get("something2")),
FieldTransform.createRemoveFieldTransform(List.of("HKD"), schema1.get(PROPERTIES).get("HKD")),
FieldTransform.createRemoveFieldTransform(List.of("something2"), schema1.get(PROPERTIES).get("something2"), false),
FieldTransform.createRemoveFieldTransform(List.of("HKD"), schema1.get(PROPERTIES).get("HKD"), false),
FieldTransform.createUpdateFieldTransform(List.of(CAD), new UpdateFieldSchemaTransform(
schema1.get(PROPERTIES).get(CAD),
schema2.get(PROPERTIES).get(CAD))),
Expand All @@ -132,7 +140,7 @@ void testGetCatalogDiff() throws IOException {
schema1.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS),
schema2.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS))),
FieldTransform.createRemoveFieldTransform(List.of(SOME_ARRAY, ITEMS, "oldName"),
schema1.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS).get(PROPERTIES).get("oldName")),
schema1.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS).get(PROPERTIES).get("oldName"), false),
FieldTransform.createAddFieldTransform(List.of(SOME_ARRAY, ITEMS, "newName"),
schema2.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS).get(PROPERTIES).get("newName"))))))
.sorted(STREAM_TRANSFORM_COMPARATOR)
Expand Down Expand Up @@ -195,7 +203,12 @@ void testGetCatalogDiffWithInvalidSchema() throws IOException {
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Assertions.assertThat(actualDiff).hasSize(1);
Assertions.assertThat(actualDiff).first()
Expand All @@ -212,9 +225,62 @@ void testGetCatalogDiffWithBothInvalidSchema() throws IOException {
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Assertions.assertThat(actualDiff).hasSize(0);
}

@Test
void testCatalogDiffWithBreakingChanges() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource(VALID_SCHEMA_JSON));
final JsonNode breakingSchema = Jsons.deserialize(MoreResources.readResource("breaking_change_schema.json"));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1)));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(breakingSchema)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id")))));

final Set<StreamTransform> diff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName(USERS), new UpdateStreamTransform(Set.of(
FieldTransform.createRemoveFieldTransform(List.of(DATE), schema1.get(PROPERTIES).get(DATE), true),
FieldTransform.createRemoveFieldTransform(List.of("id"), schema1.get(PROPERTIES).get("id"), true)))))
.toList();

Assertions.assertThat(diff).containsAll(expectedDiff);
}

@Test
void testCatalogDiffWithoutStreamConfig() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource(VALID_SCHEMA_JSON));
final JsonNode breakingSchema = Jsons.deserialize(MoreResources.readResource("breaking_change_schema.json"));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1)));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(breakingSchema)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id")))));

final Set<StreamTransform> diff = CatalogHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName(USERS), new UpdateStreamTransform(Set.of(
FieldTransform.createRemoveFieldTransform(List.of(DATE), schema1.get(PROPERTIES).get(DATE), false),
FieldTransform.createRemoveFieldTransform(List.of("id"), schema1.get(PROPERTIES).get("id"), false)))))
.toList();

Assertions.assertThat(diff).containsAll(expectedDiff);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"type": "object",
"properties": {
"CAD": { "type": ["null", "number"] },
"HKD": { "type": ["null", "number"] },
"ISK": { "type": ["null", "number"] },
"PHP": { "type": ["null", "number"] },
"DKK": { "type": ["null", "number"] },
"HUF": { "type": ["null", "number"] },
"文": { "type": ["null", "number"] },
"something": {
"type": ["null", "object"],
"properties": {
"somekey": {
"type": ["null", "object"],
"properties": {
"nestedkey": {
"type": ["null", "number"]
}
}
}
},
"patternProperties": {
".+": {}
}
},
"something2": {
"oneOf": [
{
"type": "object",
"properties": {
"oneOfOne": {
"type": "string"
}
}
},
{
"type": "object",
"properties": {
"oneOfTwo": {
"type": "string"
}
}
}
]
},
"someArray": {
"type": ["array", "null"],
"items": {
"type": ["object", "null"],
"properties": {
"oldName": {
"type": ["string", "null"],
"maxLength": 100
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"type": "object",
"properties": {
"id": { "type": "number" },
"date": { "type": "string", "format": "date-time" },
"CAD": { "type": ["null", "number"] },
"HKD": { "type": ["null", "number"] },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"type": "object",
"properties": {
"id": { "type": "number" },
"date": { "type": "string", "format": "date-time" },
"CAD": { "type": ["null", "string"] },
"COD": { "type": ["null", "string"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static FieldTransform fieldTransformToApi(final io.airbyte.protocol.model
return new FieldTransform()
.transformType(Enums.convertTo(transform.getTransformType(), FieldTransform.TransformTypeEnum.class))
.fieldName(transform.getFieldName())
.breaking(transform.breaking())
.addField(addFieldToApi(transform).orElse(null))
.removeField(removeFieldToApi(transform).orElse(null))
.updateFieldSchema(updateFieldToApi(transform).orElse(null));
Expand Down
Loading

0 comments on commit ca2605d

Please sign in to comment.