Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the parquet uint32 reading failure #5495

Merged
merged 6 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,9 @@
import java.util.regex.Pattern;

public class FileUtils {
private final static FileFilter DIRECTORY_FILE_FILTER = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isDirectory();
}
};
private final static FileFilter DIRECTORY_FILE_FILTER = File::isDirectory;
private final static File[] EMPTY_DIRECTORY_ARRAY = new File[0];
private final static FilenameFilter DIRECTORY_FILENAME_FILTER = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return new File(dir, name).isDirectory();
}
};
private final static FilenameFilter DIRECTORY_FILENAME_FILTER = (dir, name) -> new File(dir, name).isDirectory();
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final char URI_SEPARATOR_CHAR = '/';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import io.deephaven.chunk.attributes.Any;
import org.jetbrains.annotations.NotNull;

import static io.deephaven.util.QueryConstants.NULL_INT;
import static io.deephaven.util.QueryConstants.NULL_INT_BOXED;
import static io.deephaven.util.QueryConstants.NULL_LONG;

public class ToLongPageFromUnsignedInt<ATTR extends Any> implements ToPage<ATTR, long[]> {

private static final ToLongPageFromUnsignedInt INSTANCE = new ToLongPageFromUnsignedInt<>();

public static <ATTR extends Any> ToLongPageFromUnsignedInt<ATTR> create(Class<?> nativeType) {
public static <ATTR extends Any> ToLongPageFromUnsignedInt<ATTR> create(final Class<?> nativeType) {
if (nativeType == null || long.class.equals(nativeType)) {
// noinspection unchecked
return INSTANCE;
Expand Down Expand Up @@ -42,11 +44,11 @@ public final Object nullValue() {
}

@Override
public final long[] convertResult(Object result) {
int[] from = (int[]) result;
long[] to = new long[from.length];
public final long[] convertResult(final Object result) {
final int[] from = (int[]) result;
final long[] to = new long[from.length];
for (int i = 0; i < from.length; ++i) {
to[i] = Integer.toUnsignedLong(from[i]);
to[i] = from[i] == NULL_INT ? NULL_LONG : Integer.toUnsignedLong(from[i]);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
return to;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.deephaven.vector.*;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.*;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -1581,6 +1582,50 @@ public void testReadOldParquetData() {
readParquetFileFromGitLFS(new File(path)).select();
}

/**
* The reference data is generated using:
*
* <pre>
* df = pandas.DataFrame.from_records(
* data=[(-1, -1, -1), (2, 2, 2), (0, 0, 0), (5, 5, 5)],
* columns=['uint8Col', 'uint16Col', 'uint32Col']
* )
* df['uint8Col'] = df['uint8Col'].astype(np.uint8)
* df['uint16Col'] = df['uint16Col'].astype(np.uint16)
* df['uint32Col'] = df['uint32Col'].astype(np.uint32)
*
* # Add some nulls
* df['uint8Col'][3] = df['uint16Col'][3] = df['uint32Col'][3] = None
* schema = pyarrow.schema([
* pyarrow.field('uint8Col', pyarrow.uint8()),
* pyarrow.field('uint16Col', pyarrow.uint16()),
* pyarrow.field('uint32Col', pyarrow.uint32()),
* ])
* schema = schema.remove_metadata()
* table = pyarrow.Table.from_pandas(df, schema).replace_schema_metadata()
* writer = pyarrow.parquet.ParquetWriter('data_from_pyarrow.parquet', schema=schema)
* writer.write_table(table)
* </pre>
*/
@Test
public void testReadUintParquetData() {
final String path = ParquetTableReadWriteTest.class.getResource("/ReferenceUintParquetData.parquet").getFile();
final Table fromDisk = readParquetFileFromGitLFS(new File(path)).select();

final ParquetMetadata metadata =
new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata();
final List<ColumnDescriptor> columnsMetadata = metadata.getFileMetaData().getSchema().getColumns();
assertTrue(columnsMetadata.get(0).toString().contains("int32 uint8Col (INTEGER(8,false))"));
assertTrue(columnsMetadata.get(1).toString().contains("int32 uint16Col (INTEGER(16,false))"));
assertTrue(columnsMetadata.get(2).toString().contains("int32 uint32Col (INTEGER(32,false))"));

final Table expected = newTable(
charCol("uint8Col", (char) 255, (char) 2, (char) 0, NULL_CHAR),
charCol("uint16Col", (char) 65535, (char) 2, (char) 0, NULL_CHAR),
longCol("uint32Col", 4294967295L, 2L, 0L, NULL_LONG));
assertTableEquals(expected, fromDisk);
}

@Test
public void testVersionChecks() {
assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.0.0"));
Expand Down
Git LFS file not shown
26 changes: 25 additions & 1 deletion py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import unittest
import fnmatch

import numpy as np
import pandas
import pyarrow.parquet

from deephaven import DHError, empty_table, dtypes, new_table
from deephaven import arrow as dharrow
from deephaven.column import InputColumn, Column, ColumnType, string_col, int_col
from deephaven.column import InputColumn, Column, ColumnType, string_col, int_col, char_col, long_col
from deephaven.pandas import to_pandas, to_table
from deephaven.parquet import (write, batch_write, read, delete, ColumnInstruction, ParquetFileLayout,
write_partitioned)
Expand Down Expand Up @@ -709,5 +710,28 @@ def test_write_with_definition(self):
with self.assertRaises(Exception):
write(table, "data_from_dh.parquet", table_definition=table_definition, col_definitions=col_definitions)

def test_unsigned_ints(self):
df = pandas.DataFrame.from_records(
data=[(-1, -1, -1), (2, 2, 2), (0, 0, 0)],
columns=['uint8Col', 'uint16Col', 'uint32Col']
)
df['uint8Col'] = df['uint8Col'].astype(np.uint8)
df['uint16Col'] = df['uint16Col'].astype(np.uint16)
df['uint32Col'] = df['uint32Col'].astype(np.uint32)

pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), 'data_from_pyarrow.parquet')
schema_from_disk = pyarrow.parquet.read_metadata("data_from_pyarrow.parquet").schema.to_arrow_schema()
self.assertTrue(schema_from_disk.field('uint8Col').type.equals(pyarrow.uint8()))
self.assertTrue(schema_from_disk.field('uint16Col').type.equals(pyarrow.uint16()))
self.assertTrue(schema_from_disk.field('uint32Col').type.equals(pyarrow.uint32()))

table_from_disk = read("data_from_pyarrow.parquet")
expected = new_table([
char_col("uint8Col", [255, 2, 0]),
char_col("uint16Col", [65535, 2, 0]),
long_col("uint32Col", [4294967295, 2, 0]),
])
self.assert_table_equals(table_from_disk, expected)

if __name__ == '__main__':
unittest.main()
Loading