diff --git a/README.md b/README.md index fdcb473..79e726f 100644 --- a/README.md +++ b/README.md @@ -234,6 +234,22 @@ JsonAvroConverter converter = JsonAvroConverter.builder() By default, both `_ab_additional_properties` and `_airbyte_additional_properties` are the additional properties field names on the Json object. +### Field Conversion Failure Listener + +A listener can be set to react to conversion failures at the field level. It will be called with metadata about the field and failure, and it may do any of the following: + +* return a replacement value for the field +* call `pushPostProcessingAction` to register a function to apply to the record (eg, to add metadata about the failure) +* (re)throw an exception if the failure is unrecoverable + +Note that it may not edit the record itself. This is to avoid race conditions and other issues that might arise from modifying the record while it is being processed. + +```java +JsonAvroConverter converter = JsonAvroConverter.builder() + .setFieldConversionFailureListener(listener) + .build(); +``` + ## Build - The build is upgraded to use Java 14 and Gradle 7.2 to match the build environment of Airbyte. - Maven staging and publishing is removed because they are incompatible with the new build environment. diff --git a/converter/build.gradle b/converter/build.gradle index 274458e..434ae68 100644 --- a/converter/build.gradle +++ b/converter/build.gradle @@ -6,7 +6,7 @@ buildscript { } } dependencies { - classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.0' + classpath 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.0.0' } } @@ -20,7 +20,7 @@ plugins { id 'pmd' } -apply plugin: 'com.commercehub.gradle.plugin.avro' +apply plugin: 'com.github.davidmc24.gradle.plugin.avro' apply plugin: 'idea' configurations { diff --git a/converter/src/main/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListener.java b/converter/src/main/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListener.java new file mode 100644 index 0000000..85182b4 --- /dev/null +++ b/converter/src/main/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListener.java @@ -0,0 +1,59 @@ +package tech.allegro.schema.json2avro.converter; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Function; + +public abstract class FieldConversionFailureListener { + /** + * This is to support behavior like v2 destinations change capture. + * + * Specifically, when a field fails to convert: + * * the field, change, and reason are added to `_airbyte_metadata.changes[]` + * * the field is nulled or truncated + * + * At the time of failure, the _airbyte_metadata.changes[] field might + * * exist and be empty + * * exist and already contain changes + * * not have been parsed yet (meta == null) + * * have been parsed, but contain a changes field that has not been parsed (meta.changes == null) + * + * Therefore, the simplest general feature that will support the desired behavior is + * * listener may return a new value for the affected field only + * * listener may not mutate any other part of the record on failure + * * listener may only push post-processing actions for the record (after required fields definitely exist) + * + */ + + private final List> postProcessingActions = new LinkedList<>(); + + protected final void pushPostProcessingAction(Function action) { + postProcessingActions.add(action); + } + + @Nullable + public abstract Object onFieldConversionFailure(@Nonnull String avroName, + @Nonnull String originalName, + @Nonnull Schema schema, + @Nonnull Object value, + @Nonnull String path, + @Nonnull Exception exception); + + @Nonnull + public final GenericData.Record applyPostProcessingActions(@Nonnull GenericData.Record record) { + for (Function action : postProcessingActions) { + record = action.apply(record); + } + postProcessingActions.clear(); + return record; + } + + public final void clearPostProcessingActions() { + postProcessingActions.clear(); + } +} diff --git a/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java b/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java index fa0867b..fbc6e5b 100644 --- a/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java +++ b/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java @@ -57,6 +57,11 @@ public Builder setAvroAdditionalPropsFieldName(String avroAdditionalPropsFieldNa return this; } + public Builder setFieldConversionFailureListener(FieldConversionFailureListener listener) { + recordReaderBuilder.setFieldConversionFailureListener(listener); + return this; + } + public JsonAvroConverter build() { return new JsonAvroConverter(recordReaderBuilder.build()); } diff --git a/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonGenericRecordReader.java b/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonGenericRecordReader.java index 1e970ea..27d3e0e 100644 --- a/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonGenericRecordReader.java +++ b/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonGenericRecordReader.java @@ -40,6 +40,7 @@ public class JsonGenericRecordReader { private final ObjectMapper mapper; private final UnknownFieldListener unknownFieldListener; + private final FieldConversionFailureListener fieldConversionFailureListener; private final Function nameTransformer; // fields from the input json object that carry additional properties; // properties inside these fields will be added to the output extra props field @@ -51,6 +52,7 @@ public class JsonGenericRecordReader { public static final class Builder { private ObjectMapper mapper = new ObjectMapper(); private UnknownFieldListener unknownFieldListener; + private FieldConversionFailureListener fieldConversionFailureListener; private Function nameTransformer = Function.identity(); private Set jsonExtraPropsFields = DEFAULT_JSON_FIELD_NAMES; private String avroExtraPropsField = DEFAULT_AVRO_FIELD_NAME; @@ -68,6 +70,11 @@ public Builder setUnknownFieldListener(UnknownFieldListener unknownFieldListener return this; } + public Builder setFieldConversionFailureListener(FieldConversionFailureListener fieldConversionFailureListener) { + this.fieldConversionFailureListener = fieldConversionFailureListener; + return this; + } + public Builder setNameTransformer(Function nameTransformer) { this.nameTransformer = nameTransformer; return this; @@ -84,7 +91,8 @@ public Builder setAvroAdditionalPropsFieldName(String avroAdditionalPropsFieldNa } public JsonGenericRecordReader build() { - return new JsonGenericRecordReader(mapper, unknownFieldListener, nameTransformer, jsonExtraPropsFields, avroExtraPropsField); + return new JsonGenericRecordReader(mapper, unknownFieldListener, fieldConversionFailureListener, + nameTransformer, jsonExtraPropsFields, avroExtraPropsField); } } @@ -103,11 +111,13 @@ public static Builder builder() { */ private JsonGenericRecordReader(ObjectMapper mapper, UnknownFieldListener unknownFieldListener, + FieldConversionFailureListener fieldConversionFailureListener, Function nameTransformer, Set jsonExtraPropsFieldNames, String avroExtraPropsFieldName) { this.mapper = mapper; this.unknownFieldListener = unknownFieldListener; + this.fieldConversionFailureListener = fieldConversionFailureListener; this.nameTransformer = nameTransformer; this.jsonExtraPropsFieldNames = jsonExtraPropsFieldNames; this.avroExtraPropsFieldName = avroExtraPropsFieldName; @@ -117,7 +127,14 @@ private JsonGenericRecordReader(ObjectMapper mapper, @SuppressWarnings("unchecked") public GenericData.Record read(byte[] data, Schema schema) { try { - return read(mapper.readValue(data, Map.class), schema); + if (fieldConversionFailureListener != null) { + fieldConversionFailureListener.clearPostProcessingActions(); + } + GenericData.Record result = read(mapper.readValue(data, Map.class), schema); + if (fieldConversionFailureListener != null) { + return fieldConversionFailureListener.applyPostProcessingActions(result); + } + return result; } catch (IOException ex) { throw new AvroConversionException("Failed to parse json to map format.", ex); } @@ -151,7 +168,7 @@ private GenericData.Record readRecord(Map json, Schema schema, D if (jsonExtraPropsFieldNames.contains(fieldName)) { additionalProps.putAll(AdditionalPropertyField.getObjectValues((Map) value)); } else if (field != null) { - record.set(fieldName, read(field, field.schema(), value, path, false)); + record.set(fieldName, read(field, key, field.schema(), value, path, false)); } else if (allowAdditionalProps) { additionalProps.put(fieldName, AdditionalPropertyField.getValue(value)); } else if (unknownFieldListener != null) { @@ -162,97 +179,112 @@ private GenericData.Record readRecord(Map json, Schema schema, D if (allowAdditionalProps && additionalProps.size() > 0) { record.set( avroExtraPropsFieldName, - read(avroExtraPropsField, AdditionalPropertyField.FIELD_SCHEMA, additionalProps, path, false)); + read(avroExtraPropsField, avroExtraPropsFieldName, AdditionalPropertyField.FIELD_SCHEMA, additionalProps, path, false)); } return record.build(); } - private Object read(Schema.Field field, Schema schema, Object value, Deque path, boolean silently) { - return read(field, schema, value, path, silently, false); + private Object read(Schema.Field field, String originalName, Schema schema, Object value, Deque path, boolean silently) { + return read(field, originalName, schema, value, path, silently, false); } /** * @param enforceString if this parameter is true and the schema type is string, any field value will be converted to string. */ @SuppressWarnings("unchecked") - private Object read(Schema.Field field, Schema schema, Object value, Deque path, boolean silently, boolean enforceString) { - String fieldName = nameTransformer.apply(field.name()); + private Object read(Schema.Field field, String originalName, Schema schema, Object value, Deque path, boolean silently, boolean enforceString) { + String fieldName = nameTransformer.apply(field.name()); // Always redundant? boolean pushed = !fieldName.equals(path.peekLast()); if (pushed) { path.addLast(fieldName); } Object result; LogicalType logicalType = schema.getLogicalType(); - switch (schema.getType()) { - case RECORD: - result = onValidType(value, Map.class, path, silently, map -> readRecord(map, schema, path)); - break; - case ARRAY: - result = onValidType(value, List.class, path, silently, list -> readArray(field, schema, list, path)); - break; - case MAP: - result = onValidType(value, Map.class, path, silently, map -> readMap(field, schema, map, path)); - break; - case UNION: - result = readUnion(field, schema, value, path, enforceString); - break; - case INT: - // Only "date" logical type is expected here, because the Avro schema is converted from a Json schema, - // and this logical types corresponds to the Json "date" format. - if (logicalType != null && logicalType.equals(LogicalTypes.date())) { - result = onValidType(value, String.class, path, silently, DateTimeUtils::getEpochDay); - } else { - result = value instanceof String valueString? // implicit cast to String - onValidStringNumber(valueString, path, silently, Integer::parseInt) : - onValidNumber(value, path, silently, Number::intValue); - } - break; - case LONG: - // Only "time-micros" and "timestamp-micros" logical types are expected here, because - // the Avro schema is converted from a Json schema, and the two logical types corresponds - // to the Json "time" and "date-time" formats. - if (logicalType != null && logicalType.equals(LogicalTypes.timestampMicros())) { - result = onValidType(value, String.class, path, silently, DateTimeUtils::getEpochMicros); - } else if (logicalType != null && logicalType.equals(LogicalTypes.timeMicros())) { - result = onValidType(value, String.class, path, silently, DateTimeUtils::getMicroSeconds); - } else { + try { + switch (schema.getType()) { + case RECORD: + result = onValidType(value, Map.class, path, silently, map -> readRecord(map, schema, path)); + break; + case ARRAY: + result = onValidType(value, List.class, path, silently, list -> readArray(field, originalName, schema, list, path)); + break; + case MAP: + result = onValidType(value, Map.class, path, silently, map -> readMap(field, originalName, schema, map, path)); + break; + case UNION: + result = readUnion(field, originalName, schema, value, path, enforceString); + break; + case INT: + // Only "date" logical type is expected here, because the Avro schema is converted from a Json schema, + // and this logical types corresponds to the Json "date" format. + if (logicalType != null && logicalType.equals(LogicalTypes.date())) { + result = onValidType(value, String.class, path, silently, DateTimeUtils::getEpochDay); + } else { + result = value instanceof String valueString ? // implicit cast to String + onValidStringNumber(valueString, path, silently, Integer::parseInt) : + onValidNumber(value, path, silently, Number::intValue); + } + break; + case LONG: + // Only "time-micros" and "timestamp-micros" logical types are expected here, because + // the Avro schema is converted from a Json schema, and the two logical types corresponds + // to the Json "time" and "date-time" formats. + if (logicalType != null && logicalType.equals(LogicalTypes.timestampMicros())) { + result = onValidType(value, String.class, path, silently, DateTimeUtils::getEpochMicros); + } else if (logicalType != null && logicalType.equals(LogicalTypes.timeMicros())) { + result = onValidType(value, String.class, path, silently, DateTimeUtils::getMicroSeconds); + } else { + result = value instanceof String stringValue ? // implicit cast to String + onValidStringNumber(stringValue, path, silently, Long::parseLong) : + onValidNumber(value, path, silently, Number::longValue); + } + break; + case FLOAT: result = value instanceof String stringValue ? // implicit cast to String - onValidStringNumber(stringValue, path, silently, Long::parseLong) : - onValidNumber(value, path, silently, Number::longValue); - } - break; - case FLOAT: - result = value instanceof String stringValue ? // implicit cast to String - onValidStringNumber(stringValue, path, silently, Float::parseFloat) : - onValidNumber(value, path, silently, Number::floatValue); - break; - case DOUBLE: - result = value instanceof String stringValue ? // implicit cast to String - onValidStringNumber(stringValue, path, silently, Double::parseDouble) : - onValidNumber(value, path, silently, Number::doubleValue); - break; - case BOOLEAN: - result = onValidType(value, Boolean.class, path, silently, bool -> bool); - break; - case ENUM: - result = onValidType(value, String.class, path, silently, string -> ensureEnum(schema, string, path)); - break; - case STRING: - if (enforceString) { - result = value == null ? INCOMPATIBLE : AdditionalPropertyField.getValue(value); - } else { - result = onValidType(value, String.class, path, silently, string -> string); - } - break; - case BYTES: - result = onValidType(value, String.class, path, silently, this::bytesForString); - break; - case NULL: - result = value == null ? value : INCOMPATIBLE; - break; - default: - throw new AvroTypeException("Unsupported type: " + field.schema().getType()); + onValidStringNumber(stringValue, path, silently, Float::parseFloat) : + onValidNumber(value, path, silently, Number::floatValue); + break; + case DOUBLE: + result = value instanceof String stringValue ? // implicit cast to String + onValidStringNumber(stringValue, path, silently, Double::parseDouble) : + onValidNumber(value, path, silently, Number::doubleValue); + break; + case BOOLEAN: + result = onValidType(value, Boolean.class, path, silently, bool -> bool); + break; + case ENUM: + result = onValidType(value, String.class, path, silently, string -> ensureEnum(schema, string, path)); + break; + case STRING: + if (enforceString) { + result = value == null ? INCOMPATIBLE : AdditionalPropertyField.getValue(value); + } else { + result = onValidType(value, String.class, path, silently, string -> string); + } + break; + case BYTES: + result = onValidType(value, String.class, path, silently, this::bytesForString); + break; + case NULL: + result = value == null ? value : INCOMPATIBLE; + break; + default: + throw new AvroTypeException("Unsupported type: " + field.schema().getType()); + } + } catch (Exception exception) { + if (fieldConversionFailureListener != null) { + result = fieldConversionFailureListener.onFieldConversionFailure( + fieldName, + originalName, + schema, + value, + PathsPrinter.print(path), + exception + ); + } else { + throw exception; + } } if (pushed) { @@ -261,7 +293,7 @@ private Object read(Schema.Field field, Schema schema, Object value, Deque readArray(Schema.Field field, Schema schema, List items, Deque path) { + private List readArray(Schema.Field field, String originalName, Schema schema, List items, Deque path) { // When all array elements are supposed to be null or string, we enforce array values to be string. // This is to properly handle Json arrays that do not follow the schema. Set nonNullElementTypes; @@ -276,21 +308,21 @@ private List readArray(Schema.Field field, Schema schema, List i } boolean enforceString = nonNullElementTypes.size() == 1 && nonNullElementTypes.contains(Type.STRING); return items.stream() - .map(item -> read(field, schema.getElementType(), item, path, false, enforceString)) + .map(item -> read(field, originalName, schema.getElementType(), item, path, false, enforceString)) .collect(toList()); } - private Map readMap(Schema.Field field, Schema schema, Map map, Deque path) { + private Map readMap(Schema.Field field, String originalName, Schema schema, Map map, Deque path) { Map result = new HashMap<>(map.size()); - map.forEach((k, v) -> result.put(k, read(field, schema.getValueType(), v, path, false))); + map.forEach((k, v) -> result.put(k, read(field, originalName, schema.getValueType(), v, path, false))); return result; } - private Object readUnion(Schema.Field field, Schema schema, Object value, Deque path, boolean enforceString) { + private Object readUnion(Schema.Field field, String originalName, Schema schema, Object value, Deque path, boolean enforceString) { List types = schema.getTypes(); for (Schema type : types) { try { - Object nestedValue = read(field, type, value, path, true, enforceString); + Object nestedValue = read(field, originalName, type, value, path, true, enforceString); if (nestedValue == INCOMPATIBLE) { continue; } else { diff --git a/converter/src/test/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListenerTest.java b/converter/src/test/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListenerTest.java new file mode 100644 index 0000000..7d579ff --- /dev/null +++ b/converter/src/test/java/tech/allegro/schema/json2avro/converter/FieldConversionFailureListenerTest.java @@ -0,0 +1,86 @@ +package tech.allegro.schema.json2avro.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static tech.allegro.schema.json2avro.converter.TestUtils.readResource; +import static tech.allegro.schema.json2avro.converter.TestUtils.toList; + +public class FieldConversionFailureListenerTest { + public static class FieldConversionFailureListenerTestProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) throws Exception { + final JsonNode desc = JsonHelper.deserialize(readResource("field_conversion_failure_listener.json")); + return toList(desc.get("testCases").elements()).stream().map(testCase -> Arguments.of( + testCase.get("name").asText(), + desc.get("avroSchema"), + testCase.get("records"), + testCase.get("expectedOutput"))); + } + } + + @ParameterizedTest + @ArgumentsSource(FieldConversionFailureListenerTestProvider.class) + public void testFieldConversionFailureListener(String testCaseName, + JsonNode avroSchemaJson, + JsonNode recordsJson, + JsonNode expectedOutputJson){ + final Schema avroSchema = new Schema.Parser().parse(JsonHelper.serialize(avroSchemaJson)); + final Schema metaSchema = avroSchema.getField("META").schema(); + final Schema changesSchema = metaSchema.getField("CHANGES").schema().getElementType().getTypes().get(0); + final List expectedRecords = toList(expectedOutputJson.elements()).stream().map(JsonNode::toString).toList(); + + final JsonAvroConverter converter = JsonAvroConverter.builder() + .setNameTransformer(String::toUpperCase) + .setFieldConversionFailureListener(new FieldConversionFailureListener() { + @Override + public Object onFieldConversionFailure(String avroName, + String originalName, + Schema schema, + Object value, + String path, + Exception exception) { + pushPostProcessingAction(record -> { + GenericData.Record change = new GenericRecordBuilder(changesSchema) + .set("FIELD", originalName) + .set("CHANGE", "NULLED") + .set("REASON", exception.getMessage()) + .build(); + GenericData.Record meta = (GenericData.Record) record.get("META"); + Object changesObj = meta.get("CHANGES"); + @SuppressWarnings("unchecked") + List changes = (List) changesObj; + changes.add(change); + + return record; + }); + return null; + } + }) + .build(); + + // Run twice to guard against state leaking across runs + for (int run: List.of(1, 2)) { + int i = 0; + for (JsonNode recordJson : recordsJson) { + final GenericData.Record record = converter.convertToGenericDataRecord( + JsonHelper.serialize(recordJson).getBytes(), + avroSchema); + String recordReformatted = JsonHelper.serialize(JsonHelper.deserialize(record.toString())); + assertEquals(expectedRecords.get(i), recordReformatted, String.format("Test run %d for %s failed", run, testCaseName)); + i++; + } + } + } +} diff --git a/converter/src/test/java/tech/allegro/schema/json2avro/converter/JsonAvroConverterTest.java b/converter/src/test/java/tech/allegro/schema/json2avro/converter/JsonAvroConverterTest.java index 87a9fff..1ace49e 100644 --- a/converter/src/test/java/tech/allegro/schema/json2avro/converter/JsonAvroConverterTest.java +++ b/converter/src/test/java/tech/allegro/schema/json2avro/converter/JsonAvroConverterTest.java @@ -1,6 +1,8 @@ package tech.allegro.schema.json2avro.converter; import static org.junit.jupiter.api.Assertions.assertEquals; +import static tech.allegro.schema.json2avro.converter.TestUtils.readResource; +import static tech.allegro.schema.json2avro.converter.TestUtils.toList; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -18,8 +20,12 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Test; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -31,20 +37,6 @@ public class JsonAvroConverterTest { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ObjectWriter WRITER = MAPPER.writer(); - @SuppressWarnings("UnstableApiUsage") - public static String readResource(final String name) throws IOException { - final URL resource = Resources.getResource(name); - return Resources.toString(resource, StandardCharsets.UTF_8); - } - - private static List toList(final Iterator iterator) { - final List list = new ArrayList<>(); - while (iterator.hasNext()) { - list.add(iterator.next()); - } - return list; - } - private static Set toSet(final Iterator iterator) { final Set set = new HashSet<>(); while (iterator.hasNext()) { @@ -108,9 +100,9 @@ public void testJsonToAvroConverter(String testCaseName, .setJsonAdditionalPropsFieldNames(jsonExtraPropsFieldNames) .setAvroAdditionalPropsFieldName(avroExtraPropsFieldName) .build(); + final Schema schema = new Schema.Parser().parse(JsonHelper.serialize(avroSchema)); final GenericData.Record actualAvroObject = converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonObject), schema); assertEquals(avroObject, JsonHelper.deserialize(actualAvroObject.toString()), String.format("Test for %s failed", testCaseName)); } - } diff --git a/converter/src/test/java/tech/allegro/schema/json2avro/converter/TestUtils.java b/converter/src/test/java/tech/allegro/schema/json2avro/converter/TestUtils.java new file mode 100644 index 0000000..f56d7f4 --- /dev/null +++ b/converter/src/test/java/tech/allegro/schema/json2avro/converter/TestUtils.java @@ -0,0 +1,26 @@ +package tech.allegro.schema.json2avro.converter; + +import com.google.common.io.Resources; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class TestUtils { + @SuppressWarnings("UnstableApiUsage") + public static String readResource(final String name) throws IOException { + final URL resource = Resources.getResource(name); + return Resources.toString(resource, StandardCharsets.UTF_8); + } + + public static List toList(final Iterator iterator) { + final List list = new ArrayList<>(); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + return list; + } +} diff --git a/converter/src/test/resources/field_conversion_failure_listener.json b/converter/src/test/resources/field_conversion_failure_listener.json new file mode 100644 index 0000000..6648ad6 --- /dev/null +++ b/converter/src/test/resources/field_conversion_failure_listener.json @@ -0,0 +1,238 @@ +{ + "avroSchema": { + "type": "record", + "name": "message", + "fields": [ + { + "name": "META", + "type": { + "type": "record", + "name": "meta", + "fields": [ + { + "name": "CHANGES", + "type": { + "type": "array", + "items": [ + { + "type": "record", + "name": "change", + "fields": [ + { + "name": "FIELD", + "type": "string" + }, + { + "name": "CHANGE", + "type": "string" + }, + { + "name": "REASON", + "type": "string" + } + ] + } + ] + } + } + ] + } + }, + { + "name": "DATA", + "type": { + "type": "record", + "name": "data", + "fields": [ + { + "name": "NAME", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ID", + "type": [ + "null", + "int" + ], + "default": null + } + ] + } + } + ] + }, + "testCases": [ + { + "name": "no bad records, some existing changes", + "records": [ + { + "meta": { + "changes": [ + { + "field": "name", + "change": "TRUNCATED", + "reason": "name was ridiculously long" + } + ] + }, + "data": { + "name": "Bob", + "id": 1 + } + }, + { + "meta": { + "changes": [] + }, + "data": { + "name": "Alice", + "id": 2 + } + } + ], + "expectedOutput": [ + { + "META": { + "CHANGES": [ + { + "FIELD": "name", + "CHANGE": "TRUNCATED", + "REASON": "name was ridiculously long" + } + ] + }, + "DATA": { + "NAME": "Bob", + "ID": 1 + } + }, + { + "META": { + "CHANGES": [] + }, + "DATA": { + "NAME": "Alice", + "ID": 2 + } + } + ] + }, + { + "name": "bad record with empty changes", + "records": [ + { + "meta": { + "changes": [] + }, + "data": { + "name": [ + "B", + "o", + "b" + ], + "id": 1 + } + }, + { + "meta": { + "changes": [] + }, + "data": { + "name": "Alice", + "id": 2 + } + } + ], + "expectedOutput": [ + { + "META": { + "CHANGES": [ + { + "FIELD": "name", + "CHANGE": "NULLED", + "REASON": "Could not evaluate union, field NAME is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: DATA.NAME) adheres to schema: [B, o, b]" + } + ] + }, + "DATA": { + "NAME": null, + "ID": 1 + } + }, + { + "META": { + "CHANGES": [] + }, + "DATA": { + "NAME": "Alice", + "ID": 2 + } + } + ] + }, + { + "name": "bad record with existing changes", + "records": [ + { + "meta": { + "changes": [ + { + "field": "id", + "change": "CONVERTED", + "reason": "was string" + } + ] + }, + "data": { + "name": 808, + "id": 2 + } + }, + { + "meta": { + "changes": [] + }, + "data": { + "name": "Alice", + "id": 2 + } + } + ], + "expectedOutput": [ + { + "META": { + "CHANGES": [ + { + "FIELD": "id", + "CHANGE": "CONVERTED", + "REASON": "was string" + }, + { + "FIELD": "name", + "CHANGE": "NULLED", + "REASON": "Could not evaluate union, field NAME is expected to be one of these: NULL, STRING. If this is a complex type, check if offending field (path: DATA.NAME) adheres to schema: 808" + } + ] + }, + "DATA": { + "NAME": null, + "ID": 2 + } + }, + { + "META": { + "CHANGES": [] + }, + "DATA": { + "NAME": "Alice", + "ID": 2 + } + } + ] + } + ] +} \ No newline at end of file