From 278785baefc09a5d125061f1aa9102efef0fc4c1 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 31 Jul 2023 20:01:49 -0500 Subject: [PATCH 1/5] Initial fix --- .../topage/ToInstantPageFromInt96.java | 19 ++++++++++++++----- py/server/tests/test_parquet.py | 6 ++++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java index a8793f7b9de..6d6d7d5e77b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java @@ -16,6 +16,8 @@ import java.time.Instant; import java.time.ZoneId; +import static io.deephaven.util.QueryConstants.NULL_LONG_BOXED; + /** * Parquet {@link ToPage} implementation for {@link Instant}s stored as Int96s representing an Impala * format Timestamp (nanoseconds of day and Julian date encoded as 8 bytes and 4 bytes, respectively) @@ -91,11 +93,18 @@ public final long[] convertResult(@NotNull final Object result) { final long[] resultLongs = new long[resultLength]; for (int ri = 0; ri < resultLength; ++ri) { - final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe()); - resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); - final long nanos = resultBuffer.getLong(); - final int julianDate = resultBuffer.getInt(); - resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; + if (results[ri] != null) { + final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe()); + resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + final long nanos = resultBuffer.getLong(); + final int julianDate = resultBuffer.getInt(); + resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; + } + else { + // TODO What is the difference between NULL_LONG and BOXED? + // BOXED version is used for int64 instants, so using the same here + resultLongs[ri] = NULL_LONG_BOXED; + } } return resultLongs; } diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index a77ec408ccb..9cb6c03a1b4 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -147,6 +147,10 @@ def test_big_decimal(self): self.assertTrue(os.path.exists(file_location)) shutil.rmtree(base_dir) + # def test_int96_timestamps(self): + # TODO Write a test for writing null and non-null int96 timestamp values and verifying the values + # Also, compare it with regular int64 timestamp values and make sure both are similar + def get_table_data(self): # create a table with columns to test different types and edge cases dh_table = empty_table(20).update(formulas=[ @@ -272,5 +276,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c # result_table = read('data_from_pandas.parquet') # self.assert_table_equals(dh_table, result_table) + + if __name__ == '__main__': unittest.main() From d52926ebbab068681baeb47ffccc471eba4486ae Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 1 Aug 2023 10:58:34 -0500 Subject: [PATCH 2/5] Added tests --- py/server/tests/test_parquet.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 9cb6c03a1b4..de706e08d4d 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -6,19 +6,17 @@ import shutil import unittest import tempfile - import pandas +import pyarrow.parquet + from deephaven.pandas import to_pandas, to_table from deephaven import empty_table, dtypes, new_table from deephaven.column import InputColumn from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction -from deephaven.table import Table -from deephaven.time import epoch_nanos_to_instant from tests.testbase import BaseTestCase - class ParquetTestCase(BaseTestCase): """ Test cases for the deephaven.ParquetTools module (performed locally) """ @@ -147,9 +145,28 @@ def test_big_decimal(self): self.assertTrue(os.path.exists(file_location)) shutil.rmtree(base_dir) - # def test_int96_timestamps(self): - # TODO Write a test for writing null and non-null int96 timestamp values and verifying the values - # Also, compare it with regular int64 timestamp values and make sure both are similar + def test_int96_timestamps(self): + """ Tests for int96 timestamp values """ + dh_table = empty_table(5).update(formulas=[ + "nullInstantColumn = (Instant)null", + "someInstantColumn = DateTimeUtils.now() + i", + ]) + # Writing Int96 based timestamps are not supported in deephaven parquet code, therefore we use pyarrow to do that + dataframe = to_pandas(dh_table) + table = pyarrow.Table.from_pandas(dataframe) + pyarrow.parquet.write_table(table, 'data_from_pa.parquet', use_deprecated_int96_timestamps=True) + from_disk_int96 = read('data_from_pa.parquet') + self.assert_table_equals(dh_table, from_disk_int96) + + # Read the parquet file as a pandas dataframe, and ensure all values are written as null + dataframe = pandas.read_parquet("data_from_pa.parquet") + dataframe_null_columns = dataframe[["nullInstantColumn"]] + self.assertTrue(dataframe_null_columns.isnull().values.all()) + + # Write the timestamps as int64 using deephaven writing code and compare with int96 table + write(dh_table, "data_from_dh.parquet") + from_disk_int64 = read('data_from_dh.parquet') + self.assert_table_equals(from_disk_int64, from_disk_int96) def get_table_data(self): # create a table with columns to test different types and edge cases From 2ecf28a0926e914772c9e93c368464467df17d8b Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 1 Aug 2023 11:00:13 -0500 Subject: [PATCH 3/5] Minor changes --- .../table/pagestore/topage/ToInstantPageFromInt96.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java index 6d6d7d5e77b..007f133ec89 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java @@ -16,7 +16,7 @@ import java.time.Instant; import java.time.ZoneId; -import static io.deephaven.util.QueryConstants.NULL_LONG_BOXED; +import static io.deephaven.util.QueryConstants.NULL_LONG; /** * Parquet {@link ToPage} implementation for {@link Instant}s stored as Int96s representing an Impala @@ -101,9 +101,7 @@ public final long[] convertResult(@NotNull final Object result) { resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; } else { - // TODO What is the difference between NULL_LONG and BOXED? - // BOXED version is used for int64 instants, so using the same here - resultLongs[ri] = NULL_LONG_BOXED; + resultLongs[ri] = NULL_LONG; } } return resultLongs; From e8c3021c0f067ad1610c0a12278e706826552756 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 1 Aug 2023 11:04:59 -0500 Subject: [PATCH 4/5] Spotless --- .../parquet/table/pagestore/topage/ToInstantPageFromInt96.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java index 007f133ec89..8ab2b322d05 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java @@ -99,8 +99,7 @@ public final long[] convertResult(@NotNull final Object result) { final long nanos = resultBuffer.getLong(); final int julianDate = resultBuffer.getInt(); resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; - } - else { + } else { resultLongs[ri] = NULL_LONG; } } From 36f8c9ad8d66c58732953f57fbb33ba34d345d4b Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 1 Aug 2023 11:41:37 -0500 Subject: [PATCH 5/5] Review update --- .../pagestore/topage/ToInstantPageFromInt96.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java index 8ab2b322d05..16d2ca9e397 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToInstantPageFromInt96.java @@ -93,15 +93,15 @@ public final long[] convertResult(@NotNull final Object result) { final long[] resultLongs = new long[resultLength]; for (int ri = 0; ri < resultLength; ++ri) { - if (results[ri] != null) { - final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe()); - resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); - final long nanos = resultBuffer.getLong(); - final int julianDate = resultBuffer.getInt(); - resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; - } else { + if (results[ri] == null) { resultLongs[ri] = NULL_LONG; + continue; } + final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe()); + resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + final long nanos = resultBuffer.getLong(); + final int julianDate = resultBuffer.getInt(); + resultLongs[ri] = (julianDate - JULIAN_OFFSET_TO_UNIX_EPOCH_DAYS) * (NANOS_PER_DAY) + nanos + offset; } return resultLongs; }