Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manifests metadata table #717

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]
```

### Manifests

To show a table's current file manifests:

```python
table.inspect.manifests()
```

```
pyarrow.Table
content: int8 not null
path: string not null
length: int64 not null
partition_spec_id: int32 not null
added_snapshot_id: int64 not null
added_data_files_count: int32 not null
existing_data_files_count: int32 not null
deleted_data_files_count: int32 not null
added_delete_files_count: int32 not null
existing_delete_files_count: int32 not null
deleted_delete_files_count: int32 not null
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
child 0, contains_null: bool not null
child 1, contains_nan: bool
child 2, lower_bound: string
child 3, upper_bound: string
----
content: [[0]]
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
length: [[6886]]
partition_spec_id: [[0]]
added_snapshot_id: [[3815834705531553721]]
added_data_files_count: [[1]]
existing_data_files_count: [[0]]
deleted_data_files_count: [[0]]
added_delete_files_count: [[0]]
existing_delete_files_count: [[0]]
deleted_delete_files_count: [[0]]
partition_summaries: [[ -- is_valid: all not null
-- child 0 type: bool
[false]
-- child 1 type: bool
[false]
-- child 2 type: string
["test"]
-- child 3 type: string
["test"]]]
```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
89 changes: 89 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
write_manifest,
write_manifest_list,
)
Expand Down Expand Up @@ -3537,6 +3538,94 @@ def update_partitions_map(
schema=table_schema,
)

def manifests(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.conversions import from_bytes

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field('content', pa.int8(), nullable=False),
pa.field('path', pa.string(), nullable=False),
pa.field('length', pa.int64(), nullable=False),
pa.field('partition_spec_id', pa.int32(), nullable=False),
pa.field('added_snapshot_id', pa.int64(), nullable=False),
pa.field('added_data_files_count', pa.int32(), nullable=False),
pa.field('existing_data_files_count', pa.int32(), nullable=False),
pa.field('deleted_data_files_count', pa.int32(), nullable=False),
pa.field('added_delete_files_count', pa.int32(), nullable=False),
pa.field('existing_delete_files_count', pa.int32(), nullable=False),
pa.field('deleted_delete_files_count', pa.int32(), nullable=False),
pa.field('partition_summaries', pa.list_(partition_summary_schema), nullable=False),
])

def _partition_summaries_to_rows(
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
) -> List[Dict[str, Any]]:
rows = []
for i, field_summary in enumerate(partition_summaries):
field = spec.fields[i]
partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type
lower_bound = (
(
field.transform.to_human_string(
partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound)
)
)
if field_summary.lower_bound
else None
)
upper_bound = (
(
field.transform.to_human_string(
partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound)
)
)
if field_summary.upper_bound
else None
)
rows.append({
'contains_null': field_summary.contains_null,
'contains_nan': field_summary.contains_nan,
'lower_bound': lower_bound,
'upper_bound': upper_bound,
})
return rows

specs = self.tbl.metadata.specs()
manifests = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
is_data_file = manifest.content == ManifestContent.DATA
is_delete_file = manifest.content == ManifestContent.DELETES
manifests.append({
'content': manifest.content,
'path': manifest.manifest_path,
'length': manifest.manifest_length,
'partition_spec_id': manifest.partition_spec_id,
'added_snapshot_id': manifest.added_snapshot_id,
'added_data_files_count': manifest.added_files_count if is_data_file else 0,
'existing_data_files_count': manifest.existing_files_count if is_data_file else 0,
'deleted_data_files_count': manifest.deleted_files_count if is_data_file else 0,
'added_delete_files_count': manifest.added_files_count if is_delete_file else 0,
'existing_delete_files_count': manifest.existing_files_count if is_delete_file else 0,
'deleted_delete_files_count': manifest.deleted_files_count if is_delete_file else 0,
'partition_summaries': _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
if manifest.partitions
else [],
})

return pa.Table.from_pylist(
manifests,
schema=manifest_schema,
)


@dataclass(frozen=True)
class TablePartition:
Expand Down
83 changes: 83 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,86 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non
df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}")
check_pyiceberg_df_equals_spark_df(df, spark_df)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_metadata_manifests"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES (1, "a")
"""
)

spark.sql(
f"""
INSERT INTO {identifier} VALUES (2, "b")
"""
)

df = session_catalog.load_table(identifier).inspect.manifests()

assert df.column_names == [
'content',
'path',
'length',
'partition_spec_id',
'added_snapshot_id',
'added_data_files_count',
'existing_data_files_count',
'deleted_data_files_count',
'added_delete_files_count',
'existing_delete_files_count',
'deleted_delete_files_count',
'partition_summaries',
]

int_cols = [
'content',
'length',
'partition_spec_id',
'added_snapshot_id',
'added_data_files_count',
'existing_data_files_count',
'deleted_data_files_count',
'added_delete_files_count',
'existing_delete_files_count',
'deleted_delete_files_count',
]

for column in int_cols:
for value in df[column]:
assert isinstance(value.as_py(), int)

for value in df["path"]:
assert isinstance(value.as_py(), str)

for value in df["partition_summaries"]:
assert isinstance(value.as_py(), list)
for row in value:
assert isinstance(row["contains_null"].as_py(), bool)
assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
assert isinstance(row["upper_bound"].as_py(), (str, type(None)))

lhs = spark.table(f"{identifier}.manifests").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
assert left == right, f"Difference in column {column}: {left} != {right}"