Skip to content

Commit

Permalink
fix KeyError raised by add_files when parquet file doe not have c…
Browse files Browse the repository at this point in the history
…olumn stats (#1354)

* fix KeyError, by switching del to pop

* added unit test

* update test

* fix python 3.9 compatibility, and refactor test

* update test
  • Loading branch information
binayakd authored Nov 25, 2024
1 parent cc1ab2c commit ab43c6c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2397,8 +2397,8 @@ def data_file_statistics_from_parquet_metadata(
split_offsets.sort()

for field_id in invalidate_col:
del col_aggs[field_id]
del null_value_counts[field_id]
col_aggs.pop(field_id, None)
null_value_counts.pop(field_id, None)

return DataFileStatistics(
record_count=parquet_metadata.num_rows,
Expand Down
43 changes: 41 additions & 2 deletions tests/io/test_pyarrow_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class TestStruct:
y: Optional[float]


def construct_test_table() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
def construct_test_table(
write_statistics: Union[bool, List[str]] = True,
) -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
table_metadata = {
"format-version": 2,
"location": "s3://bucket/test/location",
Expand Down Expand Up @@ -169,7 +171,9 @@ def construct_test_table() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, Tabl
metadata_collector: List[Any] = []

with pa.BufferOutputStream() as f:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer:
with pq.ParquetWriter(
f, table.schema, metadata_collector=metadata_collector, write_statistics=write_statistics
) as writer:
writer.write_table(table)

return metadata_collector[0], table_metadata
Expand Down Expand Up @@ -681,6 +685,41 @@ def test_stats_types(table_schema_nested: Schema) -> None:
]


def test_read_missing_statistics() -> None:
# write statistics for only for "strings" column
metadata, table_metadata = construct_test_table(write_statistics=["strings"])

# expect only "strings" column to have statistics in metadata
# and all other columns to have no statistics
for r in range(metadata.num_row_groups):
for pos in range(metadata.num_columns):
if metadata.row_group(r).column(pos).path_in_schema == "strings":
assert metadata.row_group(r).column(pos).is_stats_set is True
assert metadata.row_group(r).column(pos).statistics is not None
else:
assert metadata.row_group(r).column(pos).is_stats_set is False
assert metadata.row_group(r).column(pos).statistics is None

schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

datafile = DataFile(**statistics.to_serialized_dict())

# expect only "strings" column values to be reflected in the
# upper_bound, lower_bound and null_value_counts props of datafile
string_col_idx = 1
assert len(datafile.lower_bounds) == 1
assert datafile.lower_bounds[string_col_idx].decode() == "aaaaaaaaaaaaaaaa"
assert len(datafile.upper_bounds) == 1
assert datafile.upper_bounds[string_col_idx].decode() == "zzzzzzzzzzzzzzz{"
assert len(datafile.null_value_counts) == 1
assert datafile.null_value_counts[string_col_idx] == 1


# This is commented out for now because write_to_dataset drops the partition
# columns making it harder to calculate the mapping from the column index to
# datatype id
Expand Down

0 comments on commit ab43c6c

Please sign in to comment.