Skip to content

Commit

Permalink
adding add_files_overwrite method
Browse files Browse the repository at this point in the history
use delete instead of overwrite

check history too
  • Loading branch information
enkidulan committed Jun 12, 2024
1 parent 4cd67ac commit 09193eb
Show file tree
Hide file tree
Showing 3 changed files with 449 additions and 13 deletions.
5 changes: 5 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,12 @@ file_paths = [
tbl.add_files(file_paths=file_paths)
# or if you want to overwrite
tbl.add_files_overwrite(file_paths=file_paths)
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
```

<!-- prettier-ignore-start -->
Expand Down
65 changes: 53 additions & 12 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,27 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for adding files as data files and overwriting the table.
Args:
file_paths: The list of full file paths to be added as data files to the table
snapshot_properties: Custom properties to be added to the snapshot summary
Raises:
FileNotFoundError: If the file does not exist.
"""
if self._table.name_mapping() is None:
self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self._table.schema().name_mapping.model_dump_json()})
self.delete(delete_filter=ALWAYS_TRUE, snapshot_properties=snapshot_properties)
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
data_files = _parquet_files_to_data_files(
table_metadata=self._table.metadata, file_paths=file_paths, io=self._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def update_spec(self) -> UpdateSpec:
"""Create a new UpdateSpec to update the partitioning of the table.
Expand Down Expand Up @@ -1480,6 +1501,20 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
with self.transaction() as tx:
tx.add_files(file_paths=file_paths, snapshot_properties=snapshot_properties)

def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for adding files as data files and overwriting the table.
Args:
file_paths: The list of full file paths to be added as data files to the table
snapshot_properties: Custom properties to be added to the snapshot summary
Raises:
FileNotFoundError: If the file does not exist.
"""
with self.transaction() as tx:
tx.add_files_overwrite(file_paths=file_paths, snapshot_properties=snapshot_properties)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -3273,9 +3308,9 @@ def fast_append(self) -> FastAppendFiles:
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
return OverwriteFiles(
commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
if self._transaction.table_metadata.current_snapshot() is not None
else Operation.APPEND,
operation=(
Operation.OVERWRITE if self._transaction.table_metadata.current_snapshot() is not None else Operation.APPEND
),
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
Expand Down Expand Up @@ -3665,12 +3700,16 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
"lower_bound": (
from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None
),
"upper_bound": (
from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None
),
}
for field in self.tbl.metadata.schema().fields
}
Expand Down Expand Up @@ -3905,9 +3944,11 @@ def _partition_summaries_to_rows(
"added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
"existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
"deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
"partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
if manifest.partitions
else [],
"partition_summaries": (
_partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
if manifest.partitions
else []
),
})

return pa.Table.from_pylist(
Expand Down
Loading

0 comments on commit 09193eb

Please sign in to comment.