Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partition stats in snapshot summary #521

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ class TableProperties:

METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"

WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
Expand Down Expand Up @@ -2569,9 +2572,21 @@ def _write_delete_manifest() -> List[ManifestFile]:

def _summary(self) -> Summary:
ssc = SnapshotSummaryCollector()
partition_summary_limit = self._transaction.table_metadata.properties.get(
Fokko marked this conversation as resolved.
Show resolved Hide resolved
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
if isinstance(partition_summary_limit, str):
raise ValueError(
f"WRITE_PARTITION_SUMMARY_LIMIT in table properties should be int but get str: {partition_summary_limit}"
)
ssc.set_partition_summary_limit(partition_summary_limit)

for data_file in self._added_data_files:
ssc.add_file(data_file=data_file)
ssc.add_file(
data_file=data_file,
partition_spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
)

previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
Expand Down
240 changes: 145 additions & 95 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
# specific language governing permissions and limitations
# under the License.
import time
from collections import defaultdict
from enum import Enum
from typing import (
Any,
Dict,
List,
Mapping,
Optional,
)
from typing import Any, DefaultDict, Dict, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = 'added-data-files'
Expand All @@ -52,8 +49,8 @@
TOTAL_DELETE_FILES = 'total-delete-files'
TOTAL_RECORDS = 'total-records'
TOTAL_FILE_SIZE = 'total-files-size'


CHANGED_PARTITION_COUNT_PROP = 'changed-partition-count'
CHANGED_PARTITION_PREFIX = "partitions."
OPERATION = "operation"


Expand All @@ -77,6 +74,97 @@ def __repr__(self) -> str:
return f"Operation.{self.name}"


class UpdateMetrics:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def to_dict(self) -> Dict[str, str]:
properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)
return properties


class Summary(IcebergBaseModel, Mapping[str, str]):
"""A class that stores the summary information for a Snapshot.

Expand Down Expand Up @@ -172,100 +260,57 @@ class SnapshotLogEntry(IcebergBaseModel):


class SnapshotSummaryCollector:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int
metrics: UpdateMetrics
partition_metrics: DefaultDict[str, UpdateMetrics]
max_changed_partitions_for_summaries: int

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
self.metrics = UpdateMetrics()
self.partition_metrics = defaultdict(UpdateMetrics)
self.max_changed_partitions_for_summaries = 0

def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit

def add_file(
self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None
Fokko marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
self.metrics.add_file(data_file)
if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
if partition_spec is None or schema is None:
raise ValueError("add data file with partition but without specifying the partiton_spec and schema")
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema)

def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None:
self.metrics.remove_file(data_file)
if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0:
if partition_spec is None or schema is None:
raise ValueError("remove data file with partition but without specifying the partiton_spec and schema")
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema)

def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None:
partition_path = partition_spec.partition_to_path(file.partition, schema)
partition_metrics: UpdateMetrics = self.partition_metrics[partition_path]

if is_add_file:
partition_metrics.add_file(file)
else:
raise ValueError(f"Unknown data file content: {data_file.content}")
partition_metrics.remove_file(file)

def build(self) -> Dict[str, str]:
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)

properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)
properties = self.metrics.to_dict()
changed_partitions_size = len(self.partition_metrics)
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
for partition_path, update_metrics_partition in self.partition_metrics.items():
if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0:
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

return properties

def partition_summary(self, update_metrics: UpdateMetrics) -> str:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])


def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
Expand Down Expand Up @@ -366,3 +411,8 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
)

return summary


def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)
11 changes: 7 additions & 4 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,15 @@ def test_summaries_with_only_nulls(
'total-records': '2',
}

assert summaries[0] == {
'total-data-files': '0',
'total-delete-files': '0',
assert summaries[2] == {
Fokko marked this conversation as resolved.
Show resolved Hide resolved
'removed-files-size': '4239',
'total-equality-deletes': '0',
'total-files-size': '0',
'total-position-deletes': '0',
'deleted-data-files': '1',
'total-delete-files': '0',
'total-files-size': '0',
'deleted-records': '2',
'total-data-files': '0',
'total-records': '0',
}

Expand Down
Loading