Skip to content

Commit

Permalink
Fixed the parquet uint32 reading failure (#5495)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored and stanbrub committed May 17, 2024
1 parent 1316717 commit 02b11bf
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 16 deletions.
14 changes: 2 additions & 12 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,9 @@
import java.util.regex.Pattern;

public class FileUtils {
private final static FileFilter DIRECTORY_FILE_FILTER = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isDirectory();
}
};
private final static FileFilter DIRECTORY_FILE_FILTER = File::isDirectory;
private final static File[] EMPTY_DIRECTORY_ARRAY = new File[0];
private final static FilenameFilter DIRECTORY_FILENAME_FILTER = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return new File(dir, name).isDirectory();
}
};
private final static FilenameFilter DIRECTORY_FILENAME_FILTER = (dir, name) -> new File(dir, name).isDirectory();
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final char URI_SEPARATOR_CHAR = '/';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,6 @@ private static class LogicalTypeVisitor<ATTR extends Any>

@Override
public Optional<ToPage<ATTR, ?>> visit(final LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {

if (intLogicalType.isSigned()) {
switch (intLogicalType.getBitWidth()) {
case 8:
Expand All @@ -486,10 +485,9 @@ private static class LogicalTypeVisitor<ATTR extends Any>
case 16:
return Optional.of(ToCharPageFromInt.create(componentType));
case 32:
return Optional.of(ToLongPage.create(componentType));
return Optional.of(ToLongPageFromUnsignedInt.create(componentType));
}
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.pagestore.topage;

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Any;
import org.jetbrains.annotations.NotNull;

import static io.deephaven.util.QueryConstants.NULL_INT;
import static io.deephaven.util.QueryConstants.NULL_INT_BOXED;
import static io.deephaven.util.QueryConstants.NULL_LONG;

public class ToLongPageFromUnsignedInt<ATTR extends Any> implements ToPage<ATTR, long[]> {

private static final ToLongPageFromUnsignedInt INSTANCE = new ToLongPageFromUnsignedInt<>();

public static <ATTR extends Any> ToLongPageFromUnsignedInt<ATTR> create(final Class<?> nativeType) {
if (nativeType == null || long.class.equals(nativeType)) {
// noinspection unchecked
return INSTANCE;
}
throw new IllegalArgumentException("The native type for a Long column is " + nativeType.getCanonicalName());
}

private ToLongPageFromUnsignedInt() {}

@Override
@NotNull
public final Class<Long> getNativeType() {
return long.class;
}

@Override
@NotNull
public final ChunkType getChunkType() {
return ChunkType.Long;
}

@Override
@NotNull
public final Object nullValue() {
return NULL_INT_BOXED;
}

@Override
public final long[] convertResult(final Object result) {
final int[] from = (int[]) result;
final long[] to = new long[from.length];
for (int i = 0; i < from.length; ++i) {
final int fromValue = from[i];
to[i] = fromValue == NULL_INT ? NULL_LONG : Integer.toUnsignedLong(fromValue);
}
return to;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.deephaven.vector.*;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.*;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -1581,6 +1582,50 @@ public void testReadOldParquetData() {
readParquetFileFromGitLFS(new File(path)).select();
}

/**
* The reference data is generated using:
*
* <pre>
* df = pandas.DataFrame.from_records(
* data=[(-1, -1, -1), (2, 2, 2), (0, 0, 0), (5, 5, 5)],
* columns=['uint8Col', 'uint16Col', 'uint32Col']
* )
* df['uint8Col'] = df['uint8Col'].astype(np.uint8)
* df['uint16Col'] = df['uint16Col'].astype(np.uint16)
* df['uint32Col'] = df['uint32Col'].astype(np.uint32)
*
* # Add some nulls
* df['uint8Col'][3] = df['uint16Col'][3] = df['uint32Col'][3] = None
* schema = pyarrow.schema([
* pyarrow.field('uint8Col', pyarrow.uint8()),
* pyarrow.field('uint16Col', pyarrow.uint16()),
* pyarrow.field('uint32Col', pyarrow.uint32()),
* ])
* schema = schema.remove_metadata()
* table = pyarrow.Table.from_pandas(df, schema).replace_schema_metadata()
* writer = pyarrow.parquet.ParquetWriter('data_from_pyarrow.parquet', schema=schema)
* writer.write_table(table)
* </pre>
*/
@Test
public void testReadUintParquetData() {
final String path = ParquetTableReadWriteTest.class.getResource("/ReferenceUintParquetData.parquet").getFile();
final Table fromDisk = readParquetFileFromGitLFS(new File(path)).select();

final ParquetMetadata metadata =
new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata();
final List<ColumnDescriptor> columnsMetadata = metadata.getFileMetaData().getSchema().getColumns();
assertTrue(columnsMetadata.get(0).toString().contains("int32 uint8Col (INTEGER(8,false))"));
assertTrue(columnsMetadata.get(1).toString().contains("int32 uint16Col (INTEGER(16,false))"));
assertTrue(columnsMetadata.get(2).toString().contains("int32 uint32Col (INTEGER(32,false))"));

final Table expected = newTable(
charCol("uint8Col", (char) 255, (char) 2, (char) 0, NULL_CHAR),
charCol("uint16Col", (char) 65535, (char) 2, (char) 0, NULL_CHAR),
longCol("uint32Col", 4294967295L, 2L, 0L, NULL_LONG));
assertTableEquals(expected, fromDisk);
}

@Test
public void testVersionChecks() {
assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.0.0"));
Expand Down
Git LFS file not shown
26 changes: 25 additions & 1 deletion py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import unittest
import fnmatch

import numpy as np
import pandas
import pyarrow.parquet

from deephaven import DHError, empty_table, dtypes, new_table
from deephaven import arrow as dharrow
from deephaven.column import InputColumn, Column, ColumnType, string_col, int_col
from deephaven.column import InputColumn, Column, ColumnType, string_col, int_col, char_col, long_col
from deephaven.pandas import to_pandas, to_table
from deephaven.parquet import (write, batch_write, read, delete, ColumnInstruction, ParquetFileLayout,
write_partitioned)
Expand Down Expand Up @@ -709,5 +710,28 @@ def test_write_with_definition(self):
with self.assertRaises(Exception):
write(table, "data_from_dh.parquet", table_definition=table_definition, col_definitions=col_definitions)

def test_unsigned_ints(self):
df = pandas.DataFrame.from_records(
data=[(-1, -1, -1), (2, 2, 2), (0, 0, 0)],
columns=['uint8Col', 'uint16Col', 'uint32Col']
)
df['uint8Col'] = df['uint8Col'].astype(np.uint8)
df['uint16Col'] = df['uint16Col'].astype(np.uint16)
df['uint32Col'] = df['uint32Col'].astype(np.uint32)

pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), 'data_from_pyarrow.parquet')
schema_from_disk = pyarrow.parquet.read_metadata("data_from_pyarrow.parquet").schema.to_arrow_schema()
self.assertTrue(schema_from_disk.field('uint8Col').type.equals(pyarrow.uint8()))
self.assertTrue(schema_from_disk.field('uint16Col').type.equals(pyarrow.uint16()))
self.assertTrue(schema_from_disk.field('uint32Col').type.equals(pyarrow.uint32()))

table_from_disk = read("data_from_pyarrow.parquet")
expected = new_table([
char_col("uint8Col", [255, 2, 0]),
char_col("uint16Col", [65535, 2, 0]),
long_col("uint32Col", [4294967295, 2, 0]),
])
self.assert_table_equals(table_from_disk, expected)

if __name__ == '__main__':
unittest.main()

0 comments on commit 02b11bf

Please sign in to comment.