Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON configuration and Jackson Streaming Object Processor #5225

Merged
merged 56 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
1320749
JSON configuration and Processor
devinrsmith Apr 11, 2024
bb1d02e
spotless
devinrsmith Apr 11, 2024
5c7dde9
review response
devinrsmith Apr 12, 2024
44f4bc9
spotless
devinrsmith Apr 15, 2024
c567c21
Boxed builders
devinrsmith Apr 15, 2024
60fad28
Make universe implementation / documentation detail
devinrsmith Apr 15, 2024
2358a24
f
devinrsmith Apr 16, 2024
50ec4cb
Rename to Value
devinrsmith Apr 16, 2024
5dc5aa8
unmodifiable set
devinrsmith Apr 16, 2024
26ff817
f
devinrsmith Apr 16, 2024
14eadba
f
devinrsmith Apr 16, 2024
5c65852
f
devinrsmith Apr 17, 2024
979bdfa
ValueInnerRepeaterProcessor
devinrsmith Apr 17, 2024
fd3f6ed
ValueProcessor knows types
devinrsmith Apr 17, 2024
a16b30f
f
devinrsmith Apr 18, 2024
f564bc4
f
devinrsmith May 13, 2024
f4c674e
f
devinrsmith May 21, 2024
6f45aa7
some review responses
devinrsmith May 22, 2024
6d4ebef
f
devinrsmith May 23, 2024
32841a5
f
devinrsmith May 24, 2024
7a1a2d3
Merge remote-tracking branch 'upstream/main' into json-config-review
devinrsmith May 24, 2024
2d73895
exceptions
devinrsmith May 24, 2024
c6f098b
tests
devinrsmith May 24, 2024
cf81769
Rename tests
devinrsmith May 28, 2024
8d1e377
f
devinrsmith May 28, 2024
5828f1a
stuff
devinrsmith May 28, 2024
5022c2d
Tests and stuff
devinrsmith May 28, 2024
88db86d
Typed object improvements
devinrsmith May 28, 2024
f4b60c2
Array maths
devinrsmith May 29, 2024
52e5932
deeply nested array test
devinrsmith May 29, 2024
dbb66de
array stuff
devinrsmith May 29, 2024
0c0760e
kv mixin
devinrsmith May 29, 2024
c9c20e0
mixin consolidation
devinrsmith May 29, 2024
f979b56
discriminated object
devinrsmith May 29, 2024
2338fa0
outputSize
devinrsmith May 29, 2024
34dda0c
cleanup
devinrsmith May 29, 2024
9fccd9d
Instant array types
devinrsmith May 29, 2024
c4cae88
Merge remote-tracking branch 'upstream/main' into json-config-review
devinrsmith May 29, 2024
a389e88
Introduce ObjectChunkDeepEquals
devinrsmith May 30, 2024
91c2990
Move LongRepeaterImpl to LongMixin
devinrsmith May 30, 2024
1526eb4
Remove some exceptions
devinrsmith May 30, 2024
30ef952
math util
devinrsmith May 30, 2024
c0ab02a
Update python
devinrsmith May 30, 2024
08e44de
Review responses
devinrsmith May 30, 2024
aeb4628
Merge remote-tracking branch 'upstream/main' into json-config-review
devinrsmith Jun 11, 2024
6a6f383
Add python json tests
devinrsmith Jun 12, 2024
c7fc14b
Fix test
devinrsmith Jun 12, 2024
10b7794
Undo ChunkEquals interface changes in light of #5605
devinrsmith Jun 12, 2024
c26304e
Fix on_null, on_missing, add tests
devinrsmith Jun 13, 2024
fb9d759
object_processor_spec, jackson provider python construction test
devinrsmith Jun 13, 2024
258a8b8
Add JacksonProvider documentation
devinrsmith Jun 13, 2024
1683d9e
Review points
devinrsmith Jun 13, 2024
c53262c
Python _val
devinrsmith Jun 13, 2024
3cf7e9f
rename ObjectKvValue to ObjectEntriesValue
devinrsmith Jun 13, 2024
7e81e41
Review responses
devinrsmith Jun 14, 2024
b319512
Add high-level example documentation is python json module
devinrsmith Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Classpaths {

static final String JACKSON_GROUP = 'com.fasterxml.jackson'
static final String JACKSON_NAME = 'jackson-bom'
static final String JACKSON_VERSION = '2.14.1'
static final String JACKSON_VERSION = '2.17.0'

static final String SSLCONTEXT_GROUP = 'io.github.hakky54'
static final String SSLCONTEXT_VERSION = '8.1.1'
Expand Down
2 changes: 2 additions & 0 deletions engine/processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ dependencies {
api project(':qst-type')
api project(':engine-chunk')

Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.processor;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.qst.type.Type;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Immutable;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Immutable
@BuildableStyle
public abstract class NamedObjectProcessor<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It throws me off that that this "has an" ObjectProcessor rather than "is an" ObjectProcessor.


public static <T> Builder<T> builder() {
return ImmutableNamedObjectProcessor.builder();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<? super T> processor, String... names) {
return NamedObjectProcessor.<T>builder().processor(processor).addNames(names).build();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<? super T> processor, Iterable<String> names) {
return NamedObjectProcessor.<T>builder().processor(processor).addAllNames(names).build();
}

public static <T> NamedObjectProcessor<T> prefix(ObjectProcessor<T> processor, String prefix) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final int size = processor.size();
if (size == 1) {
return of(processor, prefix);
}
return of(processor, IntStream.range(0, size).mapToObj(ix -> prefix + "_" + ix).collect(Collectors.toList()));
}

/**
* The name for each output of {@link #processor()}.
*/
public abstract List<String> names();

/**
* The object processor.
*/
public abstract ObjectProcessor<? super T> processor();

public interface Builder<T> {
Builder<T> processor(ObjectProcessor<? super T> processor);

Builder<T> addNames(String element);

Builder<T> addNames(String... elements);

Builder<T> addAllNames(Iterable<String> elements);

NamedObjectProcessor<T> build();
}

public interface Provider extends ObjectProcessor.Provider {

/**
* The name for each output of the processors. Equivalent to the named processors'
* {@link NamedObjectProcessor#names()}.
*
* @return the names
*/
List<String> names();

/**
* Creates a named object processor that can process the {@code inputType}. This will successfully create a
* named processor when {@code inputType} is one of, or extends from one of, {@link #inputTypes()}. Otherwise,
* an {@link IllegalArgumentException} will be thrown. Equivalent to
* {@code NamedObjectProcessor.of(processor(inputType), names())}.
*
* @param inputType the input type
* @return the object processor
* @param <T> the input type
*/
default <T> NamedObjectProcessor<? super T> named(Type<T> inputType) {
return NamedObjectProcessor.of(processor(inputType), names());
}
}

@Check
final void checkSizes() {
if (names().size() != processor().size()) {
throw new IllegalArgumentException(
String.format("Unmatched sizes; columnNames().size()=%d, processor().size()=%d",
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
names().size(), processor().size()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.deephaven.qst.type.ShortType;
import io.deephaven.qst.type.Type;

import java.time.Instant;
import java.util.List;
import java.util.Set;

/**
* An interface for processing data from one or more input objects into output chunks on a 1-to-1 input record to output
Expand Down Expand Up @@ -141,6 +141,15 @@ static ChunkType chunkType(Type<?> type) {
return ObjectProcessorTypes.of(type);
}

/**
* The number of outputs. Equivalent to {@code outputTypes().size()}.
*
* @return the number of outputs
*/
default int size() {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
return outputTypes().size();
}

/**
* The logical output types {@code this} instance processes. The size and types correspond to the expected size and
* {@link io.deephaven.chunk.ChunkType chunk types} for {@link #processAll(ObjectChunk, List)} as specified by
Expand Down Expand Up @@ -168,4 +177,43 @@ static ChunkType chunkType(Type<?> type) {
* at least {@code in.size()}
*/
void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out);

/**
* An abstraction over {@link ObjectProcessor} that provides the same logical object processor for different input
* types.
*/
interface Provider {

/**
* The base input types for {@link #processor(Type)}.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
*
* @return the input types
*/
Set<Type<?>> inputTypes();

/**
* The output types for the processors. Equivalent to the processors' {@link ObjectProcessor#outputTypes()}.
*
* @return the output types
*/
List<Type<?>> outputTypes();

/**
* The number of output types for the processors. Equivalent to the processors' {@link ObjectProcessor#size()}.
*
* @return the number of output types
*/
int size();

/**
* Creates an object processor that can process the {@code inputType}. This will successfully create a processor
* when {@code inputType} is one of, or extends from one of, {@link #inputTypes()}. Otherwise, an
* {@link IllegalArgumentException} will be thrown.
*
* @param inputType the input type
* @return the object processor
* @param <T> the input type
*/
<T> ObjectProcessor<? super T> processor(Type<T> inputType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ int rowLimit() {
return rowLimit;
}

@Override
public int size() {
return delegate.size();
}

@Override
public List<Type<?>> outputTypes() {
return delegate.outputTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ static <T> ObjectProcessor<T> create(ObjectProcessor<T> delegate) {
ObjectProcessorStrict(ObjectProcessor<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.outputTypes = List.copyOf(delegate.outputTypes());
if (delegate.size() != outputTypes.size()) {
throw new IllegalArgumentException(
String.format("Inconsistent size. delegate.size()=%d, delegate.outputTypes().size()=%d",
delegate.size(), outputTypes.size()));
}
}

@Override
public int size() {
return delegate.size();
}

@Override
Expand All @@ -40,12 +50,13 @@ public List<Type<?>> outputTypes() {

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
final int numColumns = delegate.outputTypes().size();
final int numColumns = delegate.size();
if (numColumns != out.size()) {
throw new IllegalArgumentException(String.format(
"Improper number of out chunks. Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d",
numColumns, out.size()));
}
final List<Type<?>> delegateOutputTypes = delegate.outputTypes();
final int[] originalSizes = new int[numColumns];
for (int chunkIx = 0; chunkIx < numColumns; ++chunkIx) {
final WritableChunk<?> chunk = out.get(chunkIx);
Expand All @@ -54,7 +65,7 @@ public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> ou
"out chunk does not have enough remaining capacity. chunkIx=%d, in.size()=%d, chunk.size()=%d, chunk.capacity()=%d",
chunkIx, in.size(), chunk.size(), chunk.capacity()));
}
final Type<?> type = delegate.outputTypes().get(chunkIx);
final Type<?> type = delegateOutputTypes.get(chunkIx);
final ChunkType expectedChunkType = ObjectProcessor.chunkType(type);
final ChunkType actualChunkType = chunk.getChunkType();
if (expectedChunkType != actualChunkType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public void testBadDelegateOutputTypes() {
ObjectProcessor<Object> strict = ObjectProcessor.strict(new ObjectProcessor<>() {
private final List<Type<?>> outputTypes = new ArrayList<>(List.of(Type.intType()));

@Override
public int size() {
return 1;
}

@Override
public List<Type<?>> outputTypes() {
try {
Expand Down Expand Up @@ -220,4 +225,29 @@ public void processAll(ObjectChunk<?, ?> in, List<WritableChunk<?>> out) {
}
}
}

@Test
public void testBadDelegateSize() {
try {
ObjectProcessor.strict(new ObjectProcessor<>() {
@Override
public int size() {
return 2;
}

@Override
public List<Type<?>> outputTypes() {
return List.of(Type.intType());
}

@Override
public void processAll(ObjectChunk<?, ?> in, List<WritableChunk<?>> out) {
// ignore
}
});
failBecauseExceptionWasNotThrown(IllegalAccessException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Inconsistent size. delegate.size()=2, delegate.outputTypes().size()=1");
}
}
}
23 changes: 23 additions & 0 deletions extensions/bson-jackson/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

dependencies {
api project(':extensions-json-jackson')
api project(':engine-processor')
api 'de.undercouch:bson4jackson:2.15.1'

Classpaths.inheritImmutables(project)
compileOnly 'com.google.code.findbugs:jsr305:3.0.2'

Classpaths.inheritJacksonPlatform(project, 'testImplementation')
Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
}

test {
useJUnitPlatform()
}
1 change: 1 addition & 0 deletions extensions/bson-jackson/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.bson.jackson;

import com.fasterxml.jackson.core.ObjectCodec;
import de.undercouch.bson4jackson.BsonFactory;

import java.lang.reflect.InvocationTargetException;

final class JacksonBsonConfiguration {
private static final BsonFactory DEFAULT_FACTORY;

static {
// We'll attach an ObjectMapper if it's on the classpath, this allows parsing of AnyOptions
ObjectCodec objectCodec = null;
try {
final Class<?> clazz = Class.forName("com.fasterxml.jackson.databind.ObjectMapper");
objectCodec = (ObjectCodec) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
// ignore
}
DEFAULT_FACTORY = new BsonFactory(objectCodec);
}

static BsonFactory defaultFactory() {
return DEFAULT_FACTORY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.bson.jackson;

import de.undercouch.bson4jackson.BsonFactory;
import io.deephaven.json.ValueOptions;
import io.deephaven.json.jackson.JacksonProvider;

public final class JacksonBsonProvider {

/**
* Creates a jackson BSON provider using a default factory.
*
* @param options the object options
* @return the jackson BSON provider
* @see #of(ValueOptions, BsonFactory)
*/
public static JacksonProvider of(ValueOptions options) {
return of(options, JacksonBsonConfiguration.defaultFactory());
}

/**
* Creates a jackson BSON provider using the provided {@code factory}.
*
* @param options the object options
* @param factory the jackson BSON factory
* @return the jackson BSON provider
*/
public static JacksonProvider of(ValueOptions options, BsonFactory factory) {
return JacksonProvider.of(options, factory);
}
}
Loading
Loading