Skip to content

Commit

Permalink
Arrow Encoding for LocalDate and LocalTime (#5446)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored and stanbrub committed May 3, 2024
1 parent c98f997 commit c33218a
Show file tree
Hide file tree
Showing 14 changed files with 685 additions and 113 deletions.
1 change: 1 addition & 0 deletions extensions/barrage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':Base')
implementation project(':Util')
implementation project(':Configuration')
implementation project(':util-function')
implementation depCommonsLang3

api project(':engine-table')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
// @formatter:off
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToByteFunction;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
Expand All @@ -27,6 +30,8 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -35,12 +40,19 @@ public class ByteChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static ByteChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Byte, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> ByteChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToByteFunction<T> transform) {
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Byte value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
T value = inChunk.get(i);
outChunk.set(i, transform.applyAsByte(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -163,7 +175,7 @@ public interface ByteConversion {
ByteConversion IDENTITY = (byte a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableByteChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -177,7 +189,41 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Byte, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableByteChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableByteChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final ByteConversion conversion,
Expand All @@ -192,14 +238,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableByteChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableByteChunk::makeWritableChunk,
WritableChunk::asWritableByteChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -244,6 +287,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final ByteConversion conversion,
final DataInput is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import com.google.common.io.LittleEndianDataOutputStream;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.function.ToCharFunction;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.WritableCharChunk;
Expand All @@ -23,6 +26,8 @@
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand All @@ -31,12 +36,19 @@ public class CharChunkInputStreamGenerator extends BaseChunkInputStreamGenerator

public static CharChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Character, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
return convertWithTransform(inChunk, rowOffset, TypeUtils::unbox);
}

public static <T> CharChunkInputStreamGenerator convertWithTransform(
final ObjectChunk<T, Values> inChunk, final long rowOffset, final ToCharFunction<T> transform) {
// This code path is utilized for arrays and vectors of DateTimes, LocalDate, and LocalTime, which cannot be
// reinterpreted.
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
final Character value = inChunk.get(i);
outChunk.set(i, TypeUtils.unbox(value));
T value = inChunk.get(i);
outChunk.set(i, transform.applyAsChar(value));
}
// inChunk is a transfer of ownership to us, but we've converted what we need, so we must close it now
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
Expand Down Expand Up @@ -159,7 +171,7 @@ public interface CharConversion {
CharConversion IDENTITY = (char a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
static WritableCharChunk<Values> extractChunkFromInputStream(
final int elementSize,
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
Expand All @@ -173,7 +185,41 @@ static WritableChunk<Values> extractChunkFromInputStream(
totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final int elementSize,
final StreamReaderOptions options,
final Function<Character, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableCharChunk<Values> inner = extractChunkFromInputStream(
elementSize, options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
char value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableCharChunk<Values> extractChunkFromInputStreamWithConversion(
final int elementSize,
final StreamReaderOptions options,
final CharConversion conversion,
Expand All @@ -188,14 +234,11 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableCharChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableCharChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableCharChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}
final WritableCharChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableCharChunk::makeWritableChunk,
WritableChunk::asWritableCharChunk);

if (nodeInfo.numElements == 0) {
return chunk;
Expand Down Expand Up @@ -240,6 +283,19 @@ static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final CharConversion conversion,
final DataInput is,
Expand Down
Loading

0 comments on commit c33218a

Please sign in to comment.