diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java index a8793f7b9de..16d2ca9e397 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java @@ -16,6 +16,8 @@ import java.time.Instant; import java.time.ZoneId; +import static io.deephaven.util.QueryConstants.NULL_LONG; + /** * Parquet {@link ToPage} implementation for {@link Instant}s stored as Int96s representing an Impala * format Timestamp (nanoseconds of day and Julian date encoded as 8 bytes and 4 bytes, respectively) @@ -91,6 +93,10 @@ public final long[] convertResult(@NotNull final Object result) { final long[] resultLongs = new long[resultLength]; for (int ri = 0; ri < resultLength; ++ri) { + if (results[ri] == null) { + resultLongs[ri] = NULL_LONG; + continue; + } final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe()); resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); final long nanos = resultBuffer.getLong(); diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index a77ec408ccb..de706e08d4d 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -6,19 +6,17 @@ import shutil import unittest import tempfile - import pandas +import pyarrow.parquet + from deephaven.pandas import to_pandas, to_table from deephaven import empty_table, dtypes, new_table from deephaven.column import InputColumn from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction -from deephaven.table import Table -from deephaven.time import epoch_nanos_to_instant from tests.testbase import BaseTestCase - class ParquetTestCase(BaseTestCase): """ Test cases for the deephaven.ParquetTools module (performed locally) """ @@ -147,6 +145,29 @@ def test_big_decimal(self): self.assertTrue(os.path.exists(file_location)) shutil.rmtree(base_dir) + def test_int96_timestamps(self): + """ Tests for int96 timestamp values """ + dh_table = empty_table(5).update(formulas=[ + "nullInstantColumn = (Instant)null", + "someInstantColumn = DateTimeUtils.now() + i", + ]) + # Writing Int96 based timestamps are not supported in deephaven parquet code, therefore we use pyarrow to do that + dataframe = to_pandas(dh_table) + table = pyarrow.Table.from_pandas(dataframe) + pyarrow.parquet.write_table(table, 'data_from_pa.parquet', use_deprecated_int96_timestamps=True) + from_disk_int96 = read('data_from_pa.parquet') + self.assert_table_equals(dh_table, from_disk_int96) + + # Read the parquet file as a pandas dataframe, and ensure all values are written as null + dataframe = pandas.read_parquet("data_from_pa.parquet") + dataframe_null_columns = dataframe[["nullInstantColumn"]] + self.assertTrue(dataframe_null_columns.isnull().values.all()) + + # Write the timestamps as int64 using deephaven writing code and compare with int96 table + write(dh_table, "data_from_dh.parquet") + from_disk_int64 = read('data_from_dh.parquet') + self.assert_table_equals(from_disk_int64, from_disk_int96) + def get_table_data(self): # create a table with columns to test different types and edge cases dh_table = empty_table(20).update(formulas=[ @@ -272,5 +293,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c # result_table = read('data_from_pandas.parquet') # self.assert_table_equals(dh_table, result_table) + + if __name__ == '__main__': unittest.main()