Skip to content

Commit

Permalink
Json to Avro converter supports capturing failures
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jul 22, 2024
1 parent 219b758 commit e93e688
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 99 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ JsonAvroConverter converter = JsonAvroConverter.builder()

By default, both `_ab_additional_properties` and `_airbyte_additional_properties` are the additional properties field names on the Json object.

### Field Conversion Failure Listener

A listener can be set to react to conversion failures at the field level. It will be called with metadata about the field and failure, and it may do any of the following:

* return a replacement value for the field
* register a post-processing method for the record to add metadata about the failure to the record's metadata
* (re)throw an exception if the failure is unrecoverable

Note that it may not edit the record itself. This is to avoid race conditions and other issues that might arise from modifying the record while it is being processed.

```java
JsonAvroConverter converter = JsonAvroConverter.builder()
.setFieldConversionFailureListener(listener)
.build();
```

## Build
- The build is upgraded to use Java 14 and Gradle 7.2 to match the build environment of Airbyte.
- Maven staging and publishing is removed because they are incompatible with the new build environment.
Expand Down
4 changes: 2 additions & 2 deletions converter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ buildscript {
}
}
dependencies {
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.0'
classpath 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.0.0'
}
}

Expand All @@ -20,7 +20,7 @@ plugins {
id 'pmd'
}

apply plugin: 'com.commercehub.gradle.plugin.avro'
apply plugin: 'com.github.davidmc24.gradle.plugin.avro'
apply plugin: 'idea'

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tech.allegro.schema.json2avro.converter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;

public abstract class FieldConversionFailureListener {
/**
* This is to support behavior like v2 destinations change capture.
*
* Specifically, when a field fails to convert:
* * the field, change, and reason are added to `_airbyte_metadata.changes[]`
* * the field is nulled or truncated
*
* At the time of failure, the _airbyte_metadata.changes[] field might
* * exist and be empty
* * exist and already contain changes
* * not have been parsed yet (meta == null)
* * have been parsed, but contain a changes field that has not been parsed (meta.changes == null)
*
* Therefore, the simplest general feature that will support the desired behavior is
* * listener may return a new value for the affected field only
* * listener may not mutate any other part of the record on failure
* * listener may only push post-processing actions for the record (after required fields definitely exist)
*
*/

private final List<Function<GenericData.Record, GenericData.Record>> postProcessingActions = new LinkedList<>();

protected final void pushPostProcessingAction(Function<GenericData.Record, GenericData.Record> action) {
postProcessingActions.add(action);
}

@Nullable
public abstract Object onFieldConversionFailure(@Nonnull String avroName,
@Nonnull String originalName,
@Nonnull Schema schema,
@Nonnull Object value,
@Nonnull String path,
@Nonnull Exception exception);

@Nonnull
public final GenericData.Record applyPostProcessingActions(@Nonnull GenericData.Record record) {
for (Function<GenericData.Record, GenericData.Record> action : postProcessingActions) {
record = action.apply(record);
}
postProcessingActions.clear();
return record;
}

public final void clearPostProcessingActions() {
postProcessingActions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Builder setAvroAdditionalPropsFieldName(String avroAdditionalPropsFieldNa
return this;
}

public Builder setFieldConversionFailureListener(FieldConversionFailureListener listener) {
recordReaderBuilder.setFieldConversionFailureListener(listener);
return this;
}

public JsonAvroConverter build() {
return new JsonAvroConverter(recordReaderBuilder.build());
}
Expand Down
Loading

0 comments on commit e93e688

Please sign in to comment.