Skip to content

Commit

Permalink
JSON configuration and Processor
Browse files Browse the repository at this point in the history
This PR adds a declarative JSON configuration object that allows users to specify the schema of a JSON message. It is meant to have good out-of-the-box defaults, while still allowing power users to modify some of the finer parsing details (should this int field be parseable from a string? should null values be allowed? what if a field is missing? etc). The JSON configuration layer is not tied to any specific implementation; it is introspectible, and could have alternative implementations with other parsing backends. (I could imagine a DHE use-case where they do code-generation based on the JSON configuration, somewhat like the DHE avro ObjectProcessor code generator.)

Out of the box, there's an ObjectProcessor implementation based on the Jackson streaming APIs; that is, the data flows from byte[]s (or InputStream, relevant for very-large-files) to the output WritableChunks without the need for the intermediating Jackson databind layer (TreeNode). This saves a large layer of allocation that our current kafka json_spec layer relies upon. The ObjectProcessor layer means that this can be used in other places that expose ObjectProcessor layers and want 1-to-1 record-to-row (currently, Kafka).

Part of #5222
  • Loading branch information
devinrsmith committed Apr 11, 2024
1 parent b1f657b commit 1320749
Show file tree
Hide file tree
Showing 127 changed files with 11,931 additions and 26 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Classpaths {

static final String JACKSON_GROUP = 'com.fasterxml.jackson'
static final String JACKSON_NAME = 'jackson-bom'
static final String JACKSON_VERSION = '2.14.1'
static final String JACKSON_VERSION = '2.17.0'

static final String SSLCONTEXT_GROUP = 'io.github.hakky54'
static final String SSLCONTEXT_VERSION = '8.1.1'
Expand Down
2 changes: 2 additions & 0 deletions engine/processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ dependencies {
api project(':qst-type')
api project(':engine-chunk')

Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.processor;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.qst.type.Type;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Immutable;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Immutable
@BuildableStyle
public abstract class NamedObjectProcessor<T> {

public static <T> Builder<T> builder() {
return ImmutableNamedObjectProcessor.builder();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<? super T> processor, String... names) {
return NamedObjectProcessor.<T>builder().processor(processor).addNames(names).build();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<? super T> processor, Iterable<String> names) {
return NamedObjectProcessor.<T>builder().processor(processor).addAllNames(names).build();
}

public static <T> NamedObjectProcessor<T> prefix(ObjectProcessor<T> processor, String prefix) {
final int size = processor.size();
if (size == 1) {
return of(processor, prefix);
}
return of(processor, IntStream.range(0, size).mapToObj(ix -> prefix + "_" + ix).collect(Collectors.toList()));
}

/**
* The name for each output of {@link #processor()}.
*/
public abstract List<String> names();

/**
* The object processor.
*/
public abstract ObjectProcessor<? super T> processor();

public interface Builder<T> {
Builder<T> processor(ObjectProcessor<? super T> processor);

Builder<T> addNames(String element);

Builder<T> addNames(String... elements);

Builder<T> addAllNames(Iterable<String> elements);

NamedObjectProcessor<T> build();
}

public interface Provider extends ObjectProcessor.Provider {

/**
* The name for each output of the processors. Equivalent to the named processors'
* {@link NamedObjectProcessor#names()}.
*
* @return the names
*/
List<String> names();

/**
* Creates a named object processor that can process the {@code inputType}. This will successfully create a
* named processor when {@code inputType} is one of, or extends from one of, {@link #inputTypes()}. Otherwise,
* an {@link IllegalArgumentException} will be thrown. Equivalent to
* {@code NamedObjectProcessor.of(processor(inputType), names())}.
*
* @param inputType the input type
* @return the object processor
* @param <T> the input type
*/
default <T> NamedObjectProcessor<? super T> named(Type<T> inputType) {
return NamedObjectProcessor.of(processor(inputType), names());
}
}

@Check
final void checkSizes() {
if (names().size() != processor().size()) {
throw new IllegalArgumentException(
String.format("Unmatched sizes; columnNames().size()=%d, processor().size()=%d",
names().size(), processor().size()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.deephaven.qst.type.ShortType;
import io.deephaven.qst.type.Type;

import java.time.Instant;
import java.util.List;
import java.util.Set;

/**
* An interface for processing data from one or more input objects into output chunks on a 1-to-1 input record to output
Expand Down Expand Up @@ -141,6 +141,15 @@ static ChunkType chunkType(Type<?> type) {
return ObjectProcessorTypes.of(type);
}

/**
* The number of outputs. Equivalent to {@code outputTypes().size()}.
*
* @return the number of outputs
*/
default int size() {
return outputTypes().size();
}

/**
* The logical output types {@code this} instance processes. The size and types correspond to the expected size and
* {@link io.deephaven.chunk.ChunkType chunk types} for {@link #processAll(ObjectChunk, List)} as specified by
Expand Down Expand Up @@ -168,4 +177,43 @@ static ChunkType chunkType(Type<?> type) {
* at least {@code in.size()}
*/
void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out);

/**
* An abstraction over {@link ObjectProcessor} that provides the same logical object processor for different input
* types.
*/
interface Provider {

/**
* The base input types for {@link #processor(Type)}.
*
* @return the input types
*/
Set<Type<?>> inputTypes();

/**
* The output types for the processors. Equivalent to the processors' {@link ObjectProcessor#outputTypes()}.
*
* @return the output types
*/
List<Type<?>> outputTypes();

/**
* The number of output types for the processors. Equivalent to the processors' {@link ObjectProcessor#size()}.
*
* @return the number of output types
*/
int size();

/**
* Creates an object processor that can process the {@code inputType}. This will successfully create a processor
* when {@code inputType} is one of, or extends from one of, {@link #inputTypes()}. Otherwise, an
* {@link IllegalArgumentException} will be thrown.
*
* @param inputType the input type
* @return the object processor
* @param <T> the input type
*/
<T> ObjectProcessor<? super T> processor(Type<T> inputType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ int rowLimit() {
return rowLimit;
}

@Override
public int size() {
return delegate.size();
}

@Override
public List<Type<?>> outputTypes() {
return delegate.outputTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ static <T> ObjectProcessor<T> create(ObjectProcessor<T> delegate) {
ObjectProcessorStrict(ObjectProcessor<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.outputTypes = List.copyOf(delegate.outputTypes());
if (delegate.size() != outputTypes.size()) {
throw new IllegalArgumentException(
String.format("Inconsistent size. delegate.size()=%d, delegate.outputTypes().size()=%d",
delegate.size(), outputTypes.size()));
}
}

@Override
public int size() {
return delegate.size();
}

@Override
Expand All @@ -40,12 +50,13 @@ public List<Type<?>> outputTypes() {

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
final int numColumns = delegate.outputTypes().size();
final int numColumns = delegate.size();
if (numColumns != out.size()) {
throw new IllegalArgumentException(String.format(
"Improper number of out chunks. Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d",
numColumns, out.size()));
}
final List<Type<?>> delegateOutputTypes = delegate.outputTypes();
final int[] originalSizes = new int[numColumns];
for (int chunkIx = 0; chunkIx < numColumns; ++chunkIx) {
final WritableChunk<?> chunk = out.get(chunkIx);
Expand All @@ -54,7 +65,7 @@ public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> ou
"out chunk does not have enough remaining capacity. chunkIx=%d, in.size()=%d, chunk.size()=%d, chunk.capacity()=%d",
chunkIx, in.size(), chunk.size(), chunk.capacity()));
}
final Type<?> type = delegate.outputTypes().get(chunkIx);
final Type<?> type = delegateOutputTypes.get(chunkIx);
final ChunkType expectedChunkType = ObjectProcessor.chunkType(type);
final ChunkType actualChunkType = chunk.getChunkType();
if (expectedChunkType != actualChunkType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public void testBadDelegateOutputTypes() {
ObjectProcessor<Object> strict = ObjectProcessor.strict(new ObjectProcessor<>() {
private final List<Type<?>> outputTypes = new ArrayList<>(List.of(Type.intType()));

@Override
public int size() {
return 1;
}

@Override
public List<Type<?>> outputTypes() {
try {
Expand Down Expand Up @@ -220,4 +225,29 @@ public void processAll(ObjectChunk<?, ?> in, List<WritableChunk<?>> out) {
}
}
}

@Test
public void testBadDelegateSize() {
try {
ObjectProcessor.strict(new ObjectProcessor<>() {
@Override
public int size() {
return 2;
}

@Override
public List<Type<?>> outputTypes() {
return List.of(Type.intType());
}

@Override
public void processAll(ObjectChunk<?, ?> in, List<WritableChunk<?>> out) {
// ignore
}
});
failBecauseExceptionWasNotThrown(IllegalAccessException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Inconsistent size. delegate.size()=2, delegate.outputTypes().size()=1");
}
}
}
23 changes: 23 additions & 0 deletions extensions/bson-jackson/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

dependencies {
api project(':extensions-json-jackson')
api project(':engine-processor')
api 'de.undercouch:bson4jackson:2.15.1'

Classpaths.inheritImmutables(project)
compileOnly 'com.google.code.findbugs:jsr305:3.0.2'

Classpaths.inheritJacksonPlatform(project, 'testImplementation')
Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
}

test {
useJUnitPlatform()
}
1 change: 1 addition & 0 deletions extensions/bson-jackson/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.bson.jackson;

import com.fasterxml.jackson.core.ObjectCodec;
import de.undercouch.bson4jackson.BsonFactory;

import java.lang.reflect.InvocationTargetException;

final class JacksonBsonConfiguration {
private static final BsonFactory DEFAULT_FACTORY;

static {
// We'll attach an ObjectMapper if it's on the classpath, this allows parsing of AnyOptions
ObjectCodec objectCodec = null;
try {
final Class<?> clazz = Class.forName("com.fasterxml.jackson.databind.ObjectMapper");
objectCodec = (ObjectCodec) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
// ignore
}
DEFAULT_FACTORY = new BsonFactory(objectCodec);
}

static BsonFactory defaultFactory() {
return DEFAULT_FACTORY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.bson.jackson;

import de.undercouch.bson4jackson.BsonFactory;
import io.deephaven.json.ValueOptions;
import io.deephaven.json.jackson.JacksonProvider;

public final class JacksonBsonProvider {

/**
* Creates a jackson BSON provider using a default factory.
*
* @param options the object options
* @return the jackson BSON provider
* @see #of(ValueOptions, BsonFactory)
*/
public static JacksonProvider of(ValueOptions options) {
return of(options, JacksonBsonConfiguration.defaultFactory());
}

/**
* Creates a jackson BSON provider using the provided {@code factory}.
*
* @param options the object options
* @param factory the jackson BSON factory
* @return the jackson BSON provider
*/
public static JacksonProvider of(ValueOptions options, BsonFactory factory) {
return JacksonProvider.of(options, factory);
}
}
Loading

0 comments on commit 1320749

Please sign in to comment.