Skip to content

Commit

Permalink
Add partition stats in snapshot summary (#521)
Browse files Browse the repository at this point in the history
* add partition stats in snapshot summary

* clean up

* enhance unit test

* clean up for review
  • Loading branch information
jqin61 committed Mar 18, 2024
1 parent a077c73 commit c311dac
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 110 deletions.
15 changes: 14 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ class TableProperties:

METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"

WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
Expand Down Expand Up @@ -2628,9 +2631,19 @@ def _write_delete_manifest() -> List[ManifestFile]:

def _summary(self) -> Summary:
ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc.set_partition_summary_limit(partition_summary_limit)

for data_file in self._added_data_files:
ssc.add_file(data_file=data_file)
ssc.add_file(
data_file=data_file,
partition_spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
)

previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
Expand Down
236 changes: 141 additions & 95 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
# specific language governing permissions and limitations
# under the License.
import time
from collections import defaultdict
from enum import Enum
from typing import (
Any,
Dict,
List,
Mapping,
Optional,
)
from typing import Any, DefaultDict, Dict, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = 'added-data-files'
Expand All @@ -52,8 +49,8 @@
TOTAL_DELETE_FILES = 'total-delete-files'
TOTAL_RECORDS = 'total-records'
TOTAL_FILE_SIZE = 'total-files-size'


CHANGED_PARTITION_COUNT_PROP = 'changed-partition-count'
CHANGED_PARTITION_PREFIX = "partitions."
OPERATION = "operation"


Expand All @@ -77,6 +74,97 @@ def __repr__(self) -> str:
return f"Operation.{self.name}"


class UpdateMetrics:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def to_dict(self) -> Dict[str, str]:
properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)
return properties


class Summary(IcebergBaseModel, Mapping[str, str]):
"""A class that stores the summary information for a Snapshot.
Expand Down Expand Up @@ -172,100 +260,53 @@ class SnapshotLogEntry(IcebergBaseModel):


class SnapshotSummaryCollector:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int
metrics: UpdateMetrics
partition_metrics: DefaultDict[str, UpdateMetrics]
max_changed_partitions_for_summaries: int

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
self.metrics = UpdateMetrics()
self.partition_metrics = defaultdict(UpdateMetrics)
self.max_changed_partitions_for_summaries = 0

def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit

def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None:
self.metrics.add_file(data_file)
if len(data_file.partition.record_fields()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema)

def remove_file(
self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC
) -> None:
self.metrics.remove_file(data_file)
if len(data_file.partition.record_fields()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema)

def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None:
partition_path = partition_spec.partition_to_path(file.partition, schema)
partition_metrics: UpdateMetrics = self.partition_metrics[partition_path]

if is_add_file:
partition_metrics.add_file(file)
else:
raise ValueError(f"Unknown data file content: {data_file.content}")
partition_metrics.remove_file(file)

def build(self) -> Dict[str, str]:
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)

properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)
properties = self.metrics.to_dict()
changed_partitions_size = len(self.partition_metrics)
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
for partition_path, update_metrics_partition in self.partition_metrics.items():
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

return properties

def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])


def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
Expand Down Expand Up @@ -366,3 +407,8 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
)

return summary


def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)
11 changes: 7 additions & 4 deletions tests/integration/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,15 @@ def test_summaries_with_only_nulls(
'total-records': '2',
}

assert summaries[0] == {
'total-data-files': '0',
'total-delete-files': '0',
assert summaries[2] == {
'removed-files-size': '4239',
'total-equality-deletes': '0',
'total-files-size': '0',
'total-position-deletes': '0',
'deleted-data-files': '1',
'total-delete-files': '0',
'total-files-size': '0',
'deleted-records': '2',
'total-data-files': '0',
'total-records': '0',
}

Expand Down
Loading

0 comments on commit c311dac

Please sign in to comment.