Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barrage Ingestion: Support TimeUnit Conversion on Instant and ZonedDateTime Columns #5488

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static WritableChunk<Values> extractChunkFromInputStream(
}
return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion(
Long.BYTES, options,
(long v) -> (v * factor),
(long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor),
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Float:
return FloatChunkInputStreamGenerator.extractChunkFromInputStream(
Expand Down Expand Up @@ -289,13 +289,25 @@ static WritableChunk<Values> extractChunkFromInputStream(
}
if (type == Instant.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options, io -> DateTimeUtils.epochNanosToInstant(io.readLong()),
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToInstant(value * factor);
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == ZonedDateTime.class) {
return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion(
Long.BYTES, options,
io -> DateTimeUtils.epochNanosToZonedDateTime(io.readLong(), DateTimeUtils.timeZone()),
Long.BYTES, options, io -> {
final long value = io.readLong();
if (value == QueryConstants.NULL_LONG) {
return null;
}
return DateTimeUtils.epochNanosToZonedDateTime(
value * factor, DateTimeUtils.timeZone());
},
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
if (type == Byte.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,26 +311,32 @@ public static void putMetadata(final Map<String, String> metadata, final String
metadata.put(ATTR_DH_PREFIX + key, value);
}

private static boolean maybeConvertForTimeUnit(final TimeUnit unit, final ConvertedArrowSchema result,
final int i) {
private static boolean maybeConvertForTimeUnit(
final TimeUnit unit,
final ConvertedArrowSchema result,
final int columnOffset) {
switch (unit) {
case NANOSECOND:
return true;
case MICROSECOND:
setConversionFactor(result, i, 1000);
setConversionFactor(result, columnOffset, 1000);
return true;
case MILLISECOND:
setConversionFactor(result, i, 1000 * 1000);
setConversionFactor(result, columnOffset, 1000 * 1000);
return true;
case SECOND:
setConversionFactor(result, i, 1000 * 1000 * 1000);
setConversionFactor(result, columnOffset, 1000 * 1000 * 1000);
return true;
default:
return false;
}
}

private static Class<?> getDefaultType(final ArrowType arrowType, final ConvertedArrowSchema result, final int i) {
private static Class<?> getDefaultType(
final ArrowType arrowType,
final ConvertedArrowSchema result,
final int columnOffset,
final Class<?> explicitType) {
final String exMsg = "Schema did not include `" + ATTR_DH_PREFIX + ATTR_TYPE_TAG + "` metadata for field ";
switch (arrowType.getTypeID()) {
case Int:
Expand Down Expand Up @@ -365,7 +371,7 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
case Duration:
final ArrowType.Duration durationType = (ArrowType.Duration) arrowType;
final TimeUnit durationUnit = durationType.getUnit();
if (maybeConvertForTimeUnit(durationUnit, result, i)) {
if (maybeConvertForTimeUnit(durationUnit, result, columnOffset)) {
return long.class;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
Expand All @@ -374,10 +380,12 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType;
final String tz = timestampType.getTimezone();
final TimeUnit timestampUnit = timestampType.getUnit();
if (tz == null || "UTC".equals(tz)) {
if (maybeConvertForTimeUnit(timestampUnit, result, i)) {
return Instant.class;
}
boolean conversionSuccess = maybeConvertForTimeUnit(timestampUnit, result, columnOffset);
if ((tz == null || "UTC".equals(tz)) && conversionSuccess) {
return Instant.class;
}
if (explicitType != null) {
return explicitType;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of timestampType(Timezone=" + tz +
Expand All @@ -397,6 +405,9 @@ private static Class<?> getDefaultType(final ArrowType arrowType, final Converte
case Utf8:
return java.lang.String.class;
default:
if (explicitType != null) {
return explicitType;
}
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg +
" of type " + arrowType.getTypeID().toString());
}
Expand Down Expand Up @@ -431,12 +442,15 @@ public Class<?>[] computeWireComponentTypes() {
}
}

private static void setConversionFactor(final ConvertedArrowSchema result, final int i, final int factor) {
private static void setConversionFactor(
final ConvertedArrowSchema result,
final int columnOffset,
final int factor) {
if (result.conversionFactors == null) {
result.conversionFactors = new int[result.nCols];
Arrays.fill(result.conversionFactors, 1);
}
result.conversionFactors[i] = factor;
result.conversionFactors[columnOffset] = factor;
}

public static TableDefinition convertTableDefinition(final ExportedTableCreationResponse response) {
Expand Down Expand Up @@ -514,13 +528,14 @@ private static ConvertedArrowSchema convertArrowSchema(
}
});

// this has side effects such as setting the conversion factor; must call even if dest type is well known
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i, type.getValue());

if (type.getValue() == null) {
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i);
type.setValue(defaultType);
} else if (type.getValue() == boolean.class || type.getValue() == Boolean.class) {
// check existing barrage clients that might be sending int8 instead of bool
// TODO (deephaven-core#3403) widen this check for better assurances
Class<?> defaultType = getDefaultType(getArrowType.apply(i), result, i);
Assert.eq(Boolean.class, "deephaven column type", defaultType, "arrow inferred type");
// force to boxed boolean to allow nullability in the column sources
type.setValue(Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,13 @@ public void onCompleted() {
}
localResultTable.dropReference();

// let's finally export the table to our listener
localExportBuilder.submit(() -> {
GrpcUtil.safelyComplete(observer);
session.removeOnCloseCallback(this);
return localResultTable;
});
// let's finally export the table to our destination export
localExportBuilder
.onSuccess(() -> GrpcUtil.safelyComplete(observer))
.submit(() -> {
session.removeOnCloseCallback(this);
return localResultTable;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.util.BarrageChunkAppendingMarshaller;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.io.logger.Logger;
import io.deephaven.plugin.Registration;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.SortTableRequest;
import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest;
import io.deephaven.proto.backplane.script.grpc.BindTableToVariableRequest;
Expand All @@ -63,6 +62,7 @@
import io.deephaven.server.table.TableModule;
import io.deephaven.server.test.TestAuthModule.FakeBearer;
import io.deephaven.server.util.Scheduler;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import io.deephaven.auth.AuthContext;
import io.grpc.*;
Expand All @@ -74,14 +74,22 @@
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
Expand All @@ -98,8 +106,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1132,4 +1142,159 @@ public void testColumnsAsListFeature() throws Exception {
}
}
}

@Test
public void testLongColumnWithFactor() {
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L);
testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L);
}

private void testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit timeUnit, long factor) {
final int exportId = nextTicket++;
final Field field = Field.notNullable("Duration", new ArrowType.Duration(timeUnit));
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final BigIntVector vector = new BigIntVector(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<Long> duration = result.get().getColumnSource("Duration");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eq(duration.getLong(ii), "duration.getLong(ii)", QueryConstants.NULL_LONG,
"QueryConstants.NULL_LONG");
} else {
Assert.eq(duration.getLong(ii), "duration.getLong(ii)", ii * factor, "ii * factor");
}
}
}
}

@Test
public void testInstantColumnWithFactor() {
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new);
testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new);
}

private interface TimeVectorFactory {
TimeStampVector create(Field field, BufferAllocator allocator);
}

private void testInstantColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) {
final int exportId = nextTicket++;
final Field field = Field.notNullable("Time", new ArrowType.Timestamp(timeUnit, null));
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final TimeStampVector vector = factory.create(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<Instant> time = result.get().getColumnSource("Time");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eqNull(time.get(ii), "time.get(ii)");
} else {
final long value = time.get(ii).getEpochSecond() * 1_000_000_000 + time.get(ii).getNano();
Assert.eq(value, "value", ii * factor, "ii * factor");
}
}
}
}

@Test
public void testZonedDateTimeColumnWithFactor() {
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new);
testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new);
}

private void testZonedDateTimeColumnWithFactor(
org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) {
final int exportId = nextTicket++;
final FieldType type = new FieldType(
false, new ArrowType.Timestamp(timeUnit, null), null,
Collections.singletonMap("deephaven:type", "java.time.ZonedDateTime"));
final Field field = new Field("Time", type, null);
try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final TimeStampVector vector = factory.create(field, allocator);
final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) {
final FlightClient.ClientStreamListener stream = flightClient.startPut(
FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener());

final int numRows = 12;
vector.allocateNew(numRows);
for (int ii = 0; ii < numRows; ++ii) {
vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii);
}
vector.setValueCount(numRows);

root.setRowCount(numRows);
stream.putNext();
stream.completed();
stream.getResult();

final SessionState.ExportObject<Table> result = currentSession.getExport(exportId);
Assert.eq(result.getState(), "result.getState()",
ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED");
Assert.eq(result.get().size(), "result.get().size()", numRows);
final ColumnSource<ZonedDateTime> time = result.get().getColumnSource("Time");

for (int ii = 0; ii < numRows; ++ii) {
if (ii % 3 == 0) {
Assert.eqNull(time.get(ii), "time.get(ii)");
} else {
final long value = time.get(ii).toEpochSecond() * 1_000_000_000 + time.get(ii).getNano();
Assert.eq(value, "value", ii * factor, "ii * factor");
}
}
}
}
}
Loading