Skip to content

Commit

Permalink
Barrage Ingestion: Support TimeUnit Conversion on Instant and ZonedDa…
Browse files Browse the repository at this point in the history
…teTime Columns (#5488)

Also fixes:
- deephaven null handling on conversion factor affected columns
- always look up conversion factor even if deephaven:type is provided
  • Loading branch information
nbauernfeind authored and stanbrub committed May 17, 2024
1 parent 61a11f4 commit b3d1d61
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static WritableChunk<Values> extractChunkFromInputStream(
}
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion(
Long.BYTES, options,
(long v) -> (v * factor),
(long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Float:
return FloatChunkInputStreamGenerator.extractChunkFromInputStream(
Expand Down Expand Up @@ -289,13 +289,25 @@ static WritableChunk<Values> extractChunkFromInputStream(
}
if (type == Instant.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options, io -> DateTimeUtils.epochNanosToInstant(io.readLong()),
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToInstant(value * factor);
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == ZonedDateTime.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options,
io -> DateTimeUtils.epochNanosToZonedDateTime(io.readLong(), DateTimeUtils.timeZone()),
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToZonedDateTime(
value * factor, DateTimeUtils.timeZone());
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Byte.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,26 +311,32 @@ public static void putMetadata(final Map<String, String> metadata, final String
metadata.put(ATTR_DH_PREFIX + key, value);
}

private static boolean maybeConvertForTimeUnit(final TimeUnit unit, final ConvertedArrowSchema result,
final int i) {
private static boolean maybeConvertForTimeUnit(
final TimeUnit unit,
final ConvertedArrowSchema result,
final int columnOffset) {
switch (unit) {
case NANOSECOND:
return true;
case MICROSECOND:
setConversionFactor(result, i, 1000);
setConversionFactor(result, columnOffset, 1000);
return true;
case MILLISECOND:
setConversionFactor(result, i, 1000 * 1000);
setConversionFactor(result, columnOffset, 1000 * 1000);
return true;
case SECOND:
setConversionFactor(result, i, 1000 * 1000 * 1000);
setConversionFactor(result, columnOffset, 1000 * 1000 * 1000);
return true;
default:
return false;
}
}

private static Class<?> getDefaultType(final ArrowType arrowType, final ConvertedArrowSchema result, final int i) {
private static Class<?> getDefaultType(
final ArrowType arrowType,
final ConvertedArrowSchema result,
final int columnOffset,
final Class<?> explicitType) {
final String exMsg = "Schema did not include `" + ATTR_DH_PREFIX + ATTR_TYPE_TAG + "` metadata for field ";
switch (arrowType.getTypeID()) {
case Int:
Expand Down Expand Up @@ -365,7 +371,7 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
case Duration:
final ArrowType.Duration durationType = (ArrowType.Duration) arrowType;
final TimeUnit durationUnit = durationType.getUnit();
if (maybeConvertForTimeUnit(durationUnit, result, i)) {
if (maybeConvertForTimeUnit(durationUnit, result, columnOffset)) {
return long.class;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
Expand All @@ -374,10 +380,12 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType;
final String tz = timestampType.getTimezone();
final TimeUnit timestampUnit = timestampType.getUnit();
if (tz == null || "UTC".equals(tz)) {
if (maybeConvertForTimeUnit(timestampUnit, result, i)) {
return Instant.class;
}
boolean conversionSuccess = maybeConvertForTimeUnit(timestampUnit, result, columnOffset);
if ((tz == null || "UTC".equals(tz)) && conversionSuccess) {
return Instant.class;
}
if (explicitType != null) {
return explicitType;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of timestampType(Timezone=" + tz +
Expand All @@ -397,6 +405,9 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
case Utf8:
return java.lang.String.class;
default:
if (explicitType != null) {
return explicitType;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of type " + arrowType.getTypeID().toString());
}
Expand Down Expand Up @@ -431,12 +442,15 @@ public Class<?>[] computeWireComponentTypes() {
}
}

private static void setConversionFactor(final ConvertedArrowSchema result, final int i, final int factor) {
private static void setConversionFactor(
final ConvertedArrowSchema result,
final int columnOffset,
final int factor) {
if (result.conversionFactors == null) {
result.conversionFactors = new int[result.nCols];
Arrays.fill(result.conversionFactors, 1);
}
result.conversionFactors[i] = factor;
result.conversionFactors[columnOffset] = factor;
}

public static TableDefinition convertTableDefinition(final ExportedTableCreationResponse response) {
Expand Down Expand Up @@ -514,13 +528,14 @@ private static ConvertedArrowSchema convertArrowSchema(
}
});

// this has side effects such as setting the conversion factor; must call even if dest type is well known
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i, type.getValue());

if (type.getValue() == null) {
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i);
type.setValue(defaultType);
} else if (type.getValue() == boolean.class || type.getValue() == Boolean.class) {
// check existing barrage clients that might be sending int8 instead of bool
// TODO (deephaven-core#3403) widen this check for better assurances
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i);
Assert.eq(Boolean.class, "deephaven column type", defaultType, "arrow inferred type");
// force to boxed boolean to allow nullability in the column sources
type.setValue(Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,13 @@ public void onCompleted() {
}
localResultTable.dropReference();

// let's finally export the table to our listener
localExportBuilder.submit(() -> {
GrpcUtil.safelyComplete(observer);
session.removeOnCloseCallback(this);
return localResultTable;
});
// let's finally export the table to our destination export
localExportBuilder
.onSuccess(() -> GrpcUtil.safelyComplete(observer))
.submit(() -> {
session.removeOnCloseCallback(this);
return localResultTable;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.util.BarrageChunkAppendingMarshaller;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.io.logger.Logger;
import io.deephaven.plugin.Registration;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.SortTableRequest;
import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest;
import io.deephaven.proto.backplane.script.grpc.BindTableToVariableRequest;
Expand All @@ -63,6 +62,7 @@
import io.deephaven.server.table.TableModule;
import io.deephaven.server.test.TestAuthModule.FakeBearer;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import io.deephaven.auth.AuthContext;
import io.grpc.*;
Expand All @@ -74,14 +74,22 @@
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
Expand All @@ -98,8 +106,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1132,4 +1142,159 @@ public void testColumnsAsListFeature() throws Exception {
}
}
}

@Test
public void testLongColumnWithFactor() {
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L);
}

private void testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit timeUnit, long factor) {
final int exportId = nextTicket++;
final Field field = Field.notNullable("Duration", new ArrowType.Duration(timeUnit));
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final BigIntVector vector = new BigIntVector(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<Long> duration = result.get().getColumnSource("Duration");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eq(duration.getLong(ii), "duration.getLong(ii)", QueryConstants.NULL_LONG,
"QueryConstants.NULL_LONG");
} else {
Assert.eq(duration.getLong(ii), "duration.getLong(ii)", ii * factor, "ii * factor");
}
}
}
}

@Test
public void testInstantColumnWithFactor() {
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new);
}

private interface TimeVectorFactory {
TimeStampVector create(Field field, BufferAllocator allocator);
}

private void testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) {
final int exportId = nextTicket++;
final Field field = Field.notNullable("Time", new ArrowType.Timestamp(timeUnit, null));
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final TimeStampVector vector = factory.create(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<Instant> time = result.get().getColumnSource("Time");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eqNull(time.get(ii), "time.get(ii)");
} else {
final long value = time.get(ii).getEpochSecond() * 1_000_000_000 + time.get(ii).getNano();
Assert.eq(value, "value", ii * factor, "ii * factor");
}
}
}
}

@Test
public void testZonedDateTimeColumnWithFactor() {
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new);
}

private void testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) {
final int exportId = nextTicket++;
final FieldType type = new FieldType(
false, new ArrowType.Timestamp(timeUnit, null), null,
Collections.singletonMap("deephaven:type", "java.time.ZonedDateTime"));
final Field field = new Field("Time", type, null);
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final TimeStampVector vector = factory.create(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<ZonedDateTime> time = result.get().getColumnSource("Time");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eqNull(time.get(ii), "time.get(ii)");
} else {
final long value = time.get(ii).toEpochSecond() * 1_000_000_000 + time.get(ii).getNano();
Assert.eq(value, "value", ii * factor, "ii * factor");
}
}
}
}
}

0 comments on commit b3d1d61

Please sign in to comment.