Skip to content

Commit

Permalink
Json to Avro converter supports capturing failures (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jul 22, 2024
1 parent 219b758 commit 2e7d28b
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 99 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ JsonAvroConverter converter = JsonAvroConverter.builder()

By default, both `_ab_additional_properties` and `_airbyte_additional_properties` are the additional properties field names on the Json object.

### Field Conversion Failure Listener

A listener can be set to react to conversion failures at the field level. It will be called with metadata about the field and failure, and it may do any of the following:

* return a replacement value for the field
* call `pushPostProcessingAction` to register a function to apply to the record (eg, to add metadata about the failure)
* (re)throw an exception if the failure is unrecoverable

Note that it may not edit the record itself. This is to avoid race conditions and other issues that might arise from modifying the record while it is being processed.

```java
JsonAvroConverter converter = JsonAvroConverter.builder()
.setFieldConversionFailureListener(listener)
.build();
```

## Build
- The build is upgraded to use Java 14 and Gradle 7.2 to match the build environment of Airbyte.
- Maven staging and publishing is removed because they are incompatible with the new build environment.
Expand Down
4 changes: 2 additions & 2 deletions converter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ buildscript {
}
}
dependencies {
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.14.0'
classpath 'com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.0.0'
}
}

Expand All @@ -20,7 +20,7 @@ plugins {
id 'pmd'
}

apply plugin: 'com.commercehub.gradle.plugin.avro'
apply plugin: 'com.github.davidmc24.gradle.plugin.avro'
apply plugin: 'idea'

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tech.allegro.schema.json2avro.converter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;

public abstract class FieldConversionFailureListener {
/**
* This is to support behavior like v2 destinations change capture.
*
* Specifically, when a field fails to convert:
* * the field, change, and reason are added to `_airbyte_metadata.changes[]`
* * the field is nulled or truncated
*
* At the time of failure, the _airbyte_metadata.changes[] field might
* * exist and be empty
* * exist and already contain changes
* * not have been parsed yet (meta == null)
* * have been parsed, but contain a changes field that has not been parsed (meta.changes == null)
*
* Therefore, the simplest general feature that will support the desired behavior is
* * listener may return a new value for the affected field only
* * listener may not mutate any other part of the record on failure
* * listener may only push post-processing actions for the record (after required fields definitely exist)
*
*/

private final List<Function<GenericData.Record, GenericData.Record>> postProcessingActions = new LinkedList<>();

protected final void pushPostProcessingAction(Function<GenericData.Record, GenericData.Record> action) {
postProcessingActions.add(action);
}

@Nullable
public abstract Object onFieldConversionFailure(@Nonnull String avroName,
@Nonnull String originalName,
@Nonnull Schema schema,
@Nonnull Object value,
@Nonnull String path,
@Nonnull Exception exception);

@Nonnull
public final GenericData.Record applyPostProcessingActions(@Nonnull GenericData.Record record) {
for (Function<GenericData.Record, GenericData.Record> action : postProcessingActions) {
record = action.apply(record);
}
postProcessingActions.clear();
return record;
}

public final void clearPostProcessingActions() {
postProcessingActions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Builder setAvroAdditionalPropsFieldName(String avroAdditionalPropsFieldNa
return this;
}

public Builder setFieldConversionFailureListener(FieldConversionFailureListener listener) {
recordReaderBuilder.setFieldConversionFailureListener(listener);
return this;
}

public JsonAvroConverter build() {
return new JsonAvroConverter(recordReaderBuilder.build());
}
Expand Down
Loading

0 comments on commit 2e7d28b

Please sign in to comment.