Skip to content

Commit

Permalink
Barrage Ingestion: Fix ClassCastExceptions on Empty Primitive Arrays (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored and stanbrub committed May 17, 2024
1 parent 19654e7 commit 0ece8ef
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.chunk.attributes.Any;
import io.deephaven.chunk.attributes.ChunkPositions;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.ObjectChunk;
Expand Down Expand Up @@ -83,14 +82,12 @@ public <T, A extends Any> WritableObjectChunk<T, A> contract(
int lenRead = 0;
for (int i = 0; i < itemsInBatch; ++i) {
final int rowLen = perElementLengthDest.get(i + 1) - perElementLengthDest.get(i);
if (rowLen == 0) {
result.set(outOffset + i, CollectionUtil.ZERO_LENGTH_OBJECT_ARRAY);
} else {
final Object[] row = (Object[]) Array.newInstance(componentType, rowLen);
final Object[] row = (Object[]) Array.newInstance(componentType, rowLen);
if (rowLen != 0) {
typedSource.copyToArray(lenRead, row, 0, rowLen);
lenRead += rowLen;
result.set(outOffset + i, row);
}
result.set(outOffset + i, row);
}

// noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand Down Expand Up @@ -1297,4 +1300,165 @@ private void testZonedDateTimeColumnWithFactor(
}
}
}

@Test
public void testNullNestedPrimitiveArray() {
final int exportId = nextTicket++;

final FieldType listType = new FieldType(true, Types.MinorType.LIST.getType(), null, Map.of(
"deephaven:type", "double[][]"));
final Field payload = new Field("", new FieldType(false, Types.MinorType.FLOAT8.getType(), null), null);
final Field inner = new Field("", listType, Collections.singletonList(payload));
final Field outer = new Field("data", listType, Collections.singletonList(inner));
final Schema schema = new Schema(Collections.singletonList(outer));

try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {

final ListVector outerVector = (ListVector) root.getVector(0);
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

outerVector.allocateNew();
UnionListWriter listWriter = new UnionListWriter(outerVector);

final int numRows = 1;
listWriter.writeNull();
listWriter.setValueCount(numRows);
root.setRowCount(numRows);

stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<?> data = result.get().getColumnSource("data");

Assert.eqNull(data.get(0), "data.get(0)");
}
}

@Test
public void testEmptyNestedPrimitiveArray() {
final int exportId = nextTicket++;

final FieldType listType = new FieldType(true, Types.MinorType.LIST.getType(), null, Map.of(
"deephaven:type", "double[][]"));
final Field payload = new Field("", new FieldType(false, Types.MinorType.FLOAT8.getType(), null), null);
final Field inner = new Field("", listType, Collections.singletonList(payload));
final Field outer = new Field("data", listType, Collections.singletonList(inner));
final Schema schema = new Schema(Collections.singletonList(outer));

try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {

final ListVector outerVector = (ListVector) root.getVector(0);
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

outerVector.allocateNew();
UnionListWriter listWriter = new UnionListWriter(outerVector);

final int numRows = 1;
listWriter.startList();
listWriter.endList();
listWriter.setValueCount(numRows);

root.setRowCount(numRows);

stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<?> data = result.get().getColumnSource("data");

Assert.eqTrue(data.get(0) instanceof double[][], "data.get(0) instanceof double[][]");
final double[][] arr = (double[][]) data.get(0);
Assert.eq(arr.length, "arr.length", 0);
}
}

@Test
public void testInterestingNestedPrimitiveArray() {
final int exportId = nextTicket++;

final FieldType listType = new FieldType(true, Types.MinorType.LIST.getType(), null, Map.of(
"deephaven:type", "double[][]"));
final Field payload = new Field("", new FieldType(false, Types.MinorType.FLOAT8.getType(), null), null);
final Field inner = new Field("", listType, Collections.singletonList(payload));
final Field outer = new Field("data", listType, Collections.singletonList(inner));
final Schema schema = new Schema(Collections.singletonList(outer));

try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {

final ListVector outerVector = (ListVector) root.getVector(0);
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

outerVector.allocateNew();
UnionListWriter listWriter = new UnionListWriter(outerVector);

final int numRows = 1;
// We want to recreate this structure:
// new double[][] { null, new double[] {}, new double[] { 42.42f, 43.43f } }

listWriter.startList();
BaseWriter.ListWriter innerListWriter = listWriter.list();

// null inner list
innerListWriter.writeNull();

// empty inner list
innerListWriter.startList();
innerListWriter.endList();

// inner list with two values
innerListWriter.startList();
innerListWriter.float8().writeFloat8(42.42);
innerListWriter.float8().writeFloat8(43.43);
innerListWriter.endList();

listWriter.endList();
listWriter.setValueCount(numRows);
root.setRowCount(numRows);

stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<?> data = result.get().getColumnSource("data");

Assert.eqTrue(data.get(0) instanceof double[][], "data.get(0) instanceof double[][]");
final double[][] arr = (double[][]) data.get(0);

final int numInnerItems = 3;
Assert.eq(arr.length, "arr.length", numInnerItems);

for (int ii = 0; ii < numInnerItems; ++ii) {
if (ii == 0) {
Assert.eqNull(arr[0], "arr[0]");
} else {
Assert.neqNull(arr[ii], "arr[ii]");
}
}

Assert.eq(arr[1].length, "arr[1].length", 0);

Assert.eq(arr[2].length, "arr[2].length", 2);
Assert.eq(arr[2][0], "arr[2][0]", 42.42);
Assert.eq(arr[2][1], "arr[2][1]", 43.43);
}
}
}

0 comments on commit 0ece8ef

Please sign in to comment.