From 3703342fe9f7a4e1b64aa2d80033ebea8d78c841 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 14 May 2024 11:18:53 -0600 Subject: [PATCH 1/3] Barrage Ingestion: Support TimeUnit Conversion on Instant and ZonedDateTime Columns --- .../chunk/ChunkInputStreamGenerator.java | 20 ++- .../extensions/barrage/util/BarrageUtil.java | 40 +++-- .../test/FlightMessageRoundTripTest.java | 169 +++++++++++++++++- 3 files changed, 211 insertions(+), 18 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index f2caf17d734..92c62dd1d00 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -235,7 +235,7 @@ static WritableChunk extractChunkFromInputStream( } return LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( Long.BYTES, options, - (long v) -> (v * factor), + (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Float: return FloatChunkInputStreamGenerator.extractChunkFromInputStream( @@ -289,13 +289,25 @@ static WritableChunk extractChunkFromInputStream( } if (type == Instant.class) { return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, io -> DateTimeUtils.epochNanosToInstant(io.readLong()), + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToInstant(value * factor); + }, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (type == ZonedDateTime.class) { return FixedWidthChunkInputStreamGenerator.extractChunkFromInputStreamWithTypeConversion( - Long.BYTES, options, - io -> DateTimeUtils.epochNanosToZonedDateTime(io.readLong(), DateTimeUtils.timeZone()), + Long.BYTES, options, io -> { + final long value = io.readLong(); + if (value == QueryConstants.NULL_LONG) { + return null; + } + return DateTimeUtils.epochNanosToZonedDateTime( + value * factor, DateTimeUtils.timeZone()); + }, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (type == Byte.class) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index c7910fcbe22..2911153a9d3 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -311,26 +311,32 @@ public static void putMetadata(final Map metadata, final String metadata.put(ATTR_DH_PREFIX + key, value); } - private static boolean maybeConvertForTimeUnit(final TimeUnit unit, final ConvertedArrowSchema result, - final int i) { + private static boolean maybeConvertForTimeUnit( + final TimeUnit unit, + final ConvertedArrowSchema result, + final int columnOffset) { switch (unit) { case NANOSECOND: return true; case MICROSECOND: - setConversionFactor(result, i, 1000); + setConversionFactor(result, columnOffset, 1000); return true; case MILLISECOND: - setConversionFactor(result, i, 1000 * 1000); + setConversionFactor(result, columnOffset, 1000 * 1000); return true; case SECOND: - setConversionFactor(result, i, 1000 * 1000 * 1000); + setConversionFactor(result, columnOffset, 1000 * 1000 * 1000); return true; default: return false; } } - private static Class getDefaultType(final ArrowType arrowType, final ConvertedArrowSchema result, final int i) { + private static Class getDefaultType( + final ArrowType arrowType, + final ConvertedArrowSchema result, + final int columnOffset, + final Class explicitType) { final String exMsg = "Schema did not include `" + ATTR_DH_PREFIX + ATTR_TYPE_TAG + "` metadata for field "; switch (arrowType.getTypeID()) { case Int: @@ -365,7 +371,7 @@ private static Class getDefaultType(final ArrowType arrowType, final Converte case Duration: final ArrowType.Duration durationType = (ArrowType.Duration) arrowType; final TimeUnit durationUnit = durationType.getUnit(); - if (maybeConvertForTimeUnit(durationUnit, result, i)) { + if (maybeConvertForTimeUnit(durationUnit, result, columnOffset)) { return long.class; } throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg + @@ -375,10 +381,13 @@ private static Class getDefaultType(final ArrowType arrowType, final Converte final String tz = timestampType.getTimezone(); final TimeUnit timestampUnit = timestampType.getUnit(); if (tz == null || "UTC".equals(tz)) { - if (maybeConvertForTimeUnit(timestampUnit, result, i)) { + if (maybeConvertForTimeUnit(timestampUnit, result, columnOffset)) { return Instant.class; } } + if (explicitType != null) { + return explicitType; + } throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg + " of timestampType(Timezone=" + tz + ", Unit=" + timestampUnit.toString() + ")"); @@ -397,6 +406,9 @@ private static Class getDefaultType(final ArrowType arrowType, final Converte case Utf8: return java.lang.String.class; default: + if (explicitType != null) { + return explicitType; + } throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, exMsg + " of type " + arrowType.getTypeID().toString()); } @@ -431,12 +443,15 @@ public Class[] computeWireComponentTypes() { } } - private static void setConversionFactor(final ConvertedArrowSchema result, final int i, final int factor) { + private static void setConversionFactor( + final ConvertedArrowSchema result, + final int columnOffset, + final int factor) { if (result.conversionFactors == null) { result.conversionFactors = new int[result.nCols]; Arrays.fill(result.conversionFactors, 1); } - result.conversionFactors[i] = factor; + result.conversionFactors[columnOffset] = factor; } public static TableDefinition convertTableDefinition(final ExportedTableCreationResponse response) { @@ -514,13 +529,14 @@ private static ConvertedArrowSchema convertArrowSchema( } }); + // this has side effects such as setting the conversion factor; must call even if dest type is well known + Class defaultType = getDefaultType(getArrowType.apply(i), result, i, type.getValue()); + if (type.getValue() == null) { - Class defaultType = getDefaultType(getArrowType.apply(i), result, i); type.setValue(defaultType); } else if (type.getValue() == boolean.class || type.getValue() == Boolean.class) { // check existing barrage clients that might be sending int8 instead of bool // TODO (deephaven-core#3403) widen this check for better assurances - Class defaultType = getDefaultType(getArrowType.apply(i), result, i); Assert.eq(Boolean.class, "deephaven column type", defaultType, "arrow inferred type"); // force to boxed boolean to allow nullability in the column sources type.setValue(Boolean.class); diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index a24db602c64..1ad661b1b1a 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -39,11 +39,10 @@ import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.util.BarrageChunkAppendingMarshaller; import io.deephaven.extensions.barrage.util.BarrageUtil; -import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.LogBuffer; import io.deephaven.io.logger.LogBufferGlobal; -import io.deephaven.io.logger.Logger; import io.deephaven.plugin.Registration; +import io.deephaven.proto.backplane.grpc.ExportNotification; import io.deephaven.proto.backplane.grpc.SortTableRequest; import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest; import io.deephaven.proto.backplane.script.grpc.BindTableToVariableRequest; @@ -63,6 +62,7 @@ import io.deephaven.server.table.TableModule; import io.deephaven.server.test.TestAuthModule.FakeBearer; import io.deephaven.server.util.Scheduler; +import io.deephaven.util.QueryConstants; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; import io.grpc.*; @@ -74,14 +74,22 @@ import org.apache.arrow.flight.grpc.CredentialCallOption; import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.DateMilliVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableInt; @@ -98,8 +106,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; +import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; @@ -1132,4 +1142,159 @@ public void testColumnsAsListFeature() throws Exception { } } } + + @Test + public void testLongColumnWithFactor() { + testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L); + testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L); + testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L); + testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L); + } + + private void testLongColumnWithFactor(org.apache.arrow.vector.types.TimeUnit timeUnit, long factor) { + final int exportId = nextTicket++; + final Field field = Field.notNullable("Duration", new ArrowType.Duration(timeUnit)); + try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + final BigIntVector vector = new BigIntVector(field, allocator); + final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) { + final FlightClient.ClientStreamListener stream = flightClient.startPut( + FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener()); + + final int numRows = 12; + vector.allocateNew(numRows); + for (int ii = 0; ii < numRows; ++ii) { + vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii); + } + vector.setValueCount(numRows); + + root.setRowCount(numRows); + stream.putNext(); + stream.completed(); + stream.getResult(); + + final SessionState.ExportObject result = currentSession.getExport(exportId); + Assert.eq(result.getState(), "result.getState()", + ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED"); + Assert.eq(result.get().size(), "result.get().size()", numRows); + final ColumnSource duration = result.get().getColumnSource("Duration"); + + for (int ii = 0; ii < numRows; ++ii) { + if (ii % 3 == 0) { + Assert.eq(duration.getLong(ii), "duration.getLong(ii)", QueryConstants.NULL_LONG, + "QueryConstants.NULL_LONG"); + } else { + Assert.eq(duration.getLong(ii), "duration.getLong(ii)", ii * factor, "ii * factor"); + } + } + } + } + + @Test + public void testInstantColumnWithFactor() { + testInstantColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new); + testInstantColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new); + testInstantColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new); + testInstantColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new); + } + + private interface TimeVectorFactory { + TimeStampVector create(Field field, BufferAllocator allocator); + } + + private void testInstantColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) { + final int exportId = nextTicket++; + final Field field = Field.notNullable("Time", new ArrowType.Timestamp(timeUnit, null)); + try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + final TimeStampVector vector = factory.create(field, allocator); + final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) { + final FlightClient.ClientStreamListener stream = flightClient.startPut( + FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener()); + + final int numRows = 12; + vector.allocateNew(numRows); + for (int ii = 0; ii < numRows; ++ii) { + vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii); + } + vector.setValueCount(numRows); + + root.setRowCount(numRows); + stream.putNext(); + stream.completed(); + stream.getResult(); + + final SessionState.ExportObject
result = currentSession.getExport(exportId); + Assert.eq(result.getState(), "result.getState()", + ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED"); + Assert.eq(result.get().size(), "result.get().size()", numRows); + final ColumnSource time = result.get().getColumnSource("Time"); + + for (int ii = 0; ii < numRows; ++ii) { + if (ii % 3 == 0) { + Assert.eqNull(time.get(ii), "time.get(ii)"); + } else { + final long value = time.get(ii).getEpochSecond() * 1_000_000_000 + time.get(ii).getNano(); + Assert.eq(value, "value", ii * factor, "ii * factor"); + } + } + } + } + + @Test + public void testZonedDateTimeColumnWithFactor() { + testZonedDateTimeColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.SECOND, 1_000_000_000L, TimeStampSecVector::new); + testZonedDateTimeColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.MILLISECOND, 1_000_000L, TimeStampMilliVector::new); + testZonedDateTimeColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.MICROSECOND, 1_000L, TimeStampMicroVector::new); + testZonedDateTimeColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit.NANOSECOND, 1L, TimeStampNanoVector::new); + } + + private void testZonedDateTimeColumnWithFactor( + org.apache.arrow.vector.types.TimeUnit timeUnit, long factor, TimeVectorFactory factory) { + final int exportId = nextTicket++; + final FieldType type = new FieldType( + false, new ArrowType.Timestamp(timeUnit, null), null, + Collections.singletonMap("deephaven:type", "java.time.ZonedDateTime")); + final Field field = new Field("Time", type, null); + try (final RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + final TimeStampVector vector = factory.create(field, allocator); + final VectorSchemaRoot root = new VectorSchemaRoot(List.of(field), List.of(vector))) { + final FlightClient.ClientStreamListener stream = flightClient.startPut( + FlightDescriptor.path("export", Integer.toString(exportId)), root, new SyncPutListener()); + + final int numRows = 12; + vector.allocateNew(numRows); + for (int ii = 0; ii < numRows; ++ii) { + vector.set(ii, ii % 3 == 0 ? QueryConstants.NULL_LONG : ii); + } + vector.setValueCount(numRows); + + root.setRowCount(numRows); + stream.putNext(); + stream.completed(); + stream.getResult(); + + final SessionState.ExportObject
result = currentSession.getExport(exportId); + Assert.eq(result.getState(), "result.getState()", + ExportNotification.State.EXPORTED, "ExportNotification.State.EXPORTED"); + Assert.eq(result.get().size(), "result.get().size()", numRows); + final ColumnSource time = result.get().getColumnSource("Time"); + + for (int ii = 0; ii < numRows; ++ii) { + if (ii % 3 == 0) { + Assert.eqNull(time.get(ii), "time.get(ii)"); + } else { + final long value = time.get(ii).toEpochSecond() * 1_000_000_000 + time.get(ii).getNano(); + Assert.eq(value, "value", ii * factor, "ii * factor"); + } + } + } + } } From 5bca609ea3ef46aff5bbdb3e619b69f45cb783c6 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 14 May 2024 11:27:39 -0600 Subject: [PATCH 2/3] Fix Conversion Factor when not UTC; even if TZ is lost --- .../io/deephaven/extensions/barrage/util/BarrageUtil.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 2911153a9d3..2bb0709898a 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -380,10 +380,9 @@ private static Class getDefaultType( final ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType; final String tz = timestampType.getTimezone(); final TimeUnit timestampUnit = timestampType.getUnit(); - if (tz == null || "UTC".equals(tz)) { - if (maybeConvertForTimeUnit(timestampUnit, result, columnOffset)) { - return Instant.class; - } + boolean conversionSuccess = maybeConvertForTimeUnit(timestampUnit, result, columnOffset); + if ((tz == null || "UTC".equals(tz)) && conversionSuccess) { + return Instant.class; } if (explicitType != null) { return explicitType; From aea1b6bb9ba78c78a0108edf1430f51bee887082 Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Tue, 14 May 2024 14:18:04 -0600 Subject: [PATCH 3/3] Require DoPut Export to Resolve Before gRPC Call --- .../io/deephaven/server/arrow/ArrowFlightUtil.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 0a447636d6f..d54600dd3f3 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -260,12 +260,13 @@ public void onCompleted() { } localResultTable.dropReference(); - // let's finally export the table to our listener - localExportBuilder.submit(() -> { - GrpcUtil.safelyComplete(observer); - session.removeOnCloseCallback(this); - return localResultTable; - }); + // let's finally export the table to our destination export + localExportBuilder + .onSuccess(() -> GrpcUtil.safelyComplete(observer)) + .submit(() -> { + session.removeOnCloseCallback(this); + return localResultTable; + }); } @Override