Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly read nulls in int96 based timestamps in Parquet #4253

Merged
merged 5 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.time.Instant;
import java.time.ZoneId;

import static io.deephaven.util.QueryConstants.NULL_LONG;

/**
* Parquet {@link ToPage} implementation for {@link Instant}s stored as Int96s representing an Impala
* format Timestamp (nanoseconds of day and Julian date encoded as 8 bytes and 4 bytes, respectively)
Expand Down Expand Up @@ -91,6 +93,10 @@ public final long[] convertResult(@NotNull final Object result) {
final long[] resultLongs = new long[resultLength];

for (int ri = 0; ri < resultLength; ++ri) {
if (results[ri] == null) {
resultLongs[ri] = NULL_LONG;
continue;
}
final ByteBuffer resultBuffer = ByteBuffer.wrap(results[ri].getBytesUnsafe());
resultBuffer.order(java.nio.ByteOrder.LITTLE_ENDIAN);
final long nanos = resultBuffer.getLong();
Expand Down
31 changes: 27 additions & 4 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@
import shutil
import unittest
import tempfile

import pandas
import pyarrow.parquet

from deephaven.pandas import to_pandas, to_table

from deephaven import empty_table, dtypes, new_table
from deephaven.column import InputColumn
from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction
from deephaven.table import Table
from deephaven.time import epoch_nanos_to_instant

from tests.testbase import BaseTestCase


class ParquetTestCase(BaseTestCase):
""" Test cases for the deephaven.ParquetTools module (performed locally) """

Expand Down Expand Up @@ -147,6 +145,29 @@ def test_big_decimal(self):
self.assertTrue(os.path.exists(file_location))
shutil.rmtree(base_dir)

def test_int96_timestamps(self):
""" Tests for int96 timestamp values """
dh_table = empty_table(5).update(formulas=[
"nullInstantColumn = (Instant)null",
"someInstantColumn = DateTimeUtils.now() + i",
])
# Writing Int96 based timestamps are not supported in deephaven parquet code, therefore we use pyarrow to do that
dataframe = to_pandas(dh_table)
table = pyarrow.Table.from_pandas(dataframe)
pyarrow.parquet.write_table(table, 'data_from_pa.parquet', use_deprecated_int96_timestamps=True)
from_disk_int96 = read('data_from_pa.parquet')
self.assert_table_equals(dh_table, from_disk_int96)

# Read the parquet file as a pandas dataframe, and ensure all values are written as null
dataframe = pandas.read_parquet("data_from_pa.parquet")
dataframe_null_columns = dataframe[["nullInstantColumn"]]
self.assertTrue(dataframe_null_columns.isnull().values.all())

# Write the timestamps as int64 using deephaven writing code and compare with int96 table
write(dh_table, "data_from_dh.parquet")
from_disk_int64 = read('data_from_dh.parquet')
self.assert_table_equals(from_disk_int64, from_disk_int96)

def get_table_data(self):
# create a table with columns to test different types and edge cases
dh_table = empty_table(20).update(formulas=[
Expand Down Expand Up @@ -272,5 +293,7 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
# result_table = read('data_from_pandas.parquet')
# self.assert_table_equals(dh_table, result_table)



if __name__ == '__main__':
unittest.main()
Loading