Skip to content

Commit

Permalink
Support partial deletes (apache#569)
Browse files Browse the repository at this point in the history
* Add option to delete datafiles

This is done through the Iceberg metadata, resulting
in efficient deletes if the data is partitioned correctly

* Pull in main

* WIP

* Change DataScan to accept Metadata and io

For the partial deletes I want to do a scan on in
memory metadata. Changing this API allows this.

* fix name-mapping issue

* WIP

* WIP

* Moar tests

* Oops

* Cleanup

* WIP

* WIP

* Fix summary generation

* Last few bits

* Fix the requirement

* Make ruff happy

* Comments, thanks Kevin!

* Comments

* Append rather than truncate

* Fix merge conflicts

* Make the tests pass

* Add another test

* Conflicts

* Add docs (apache#33)

* docs

* docs

* Add a partitioned overwrite test

* Fix comment

* Skip empty manifests

---------

Co-authored-by: HonahX <honahx@apache.org>
Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent cdc3e54 commit 3f574d3
Show file tree
Hide file tree
Showing 14 changed files with 1,025 additions and 139 deletions.
21 changes: 17 additions & 4 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,25 @@ df = pa.Table.from_pylist(
table.append(df)
```

<!-- prettier-ignore-start -->
You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`.

```python
tbl.delete(delete_filter="city == 'Paris'")
```

!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.
In the above example, any records where the city field value equals to `Paris` will be deleted.
Running `tbl.scan().to_arrow()` will now yield:

<!-- prettier-ignore-end -->
```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
```

## Inspecting tables

Expand Down
64 changes: 62 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import logging
import os
import re
import uuid
from abc import ABC, abstractmethod
from concurrent.futures import Future
from copy import copy
Expand Down Expand Up @@ -126,7 +127,6 @@
visit,
visit_with_partner,
)
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -159,7 +159,7 @@
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

if TYPE_CHECKING:
from pyiceberg.table import FileScanTask
from pyiceberg.table import FileScanTask, WriteTask

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1563,6 +1563,8 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_default_mode: str

def __init__(self, schema: Schema, properties: Dict[str, str]):
from pyiceberg.table import TableProperties

self._schema = schema
self._properties = properties
self._default_mode = self._properties.get(
Expand Down Expand Up @@ -1598,6 +1600,8 @@ def map(
return k + v

def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
from pyiceberg.table import TableProperties

column_name = self._schema.find_column_name(self._field_id)
if column_name is None:
return []
Expand Down Expand Up @@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import PropertyUtil, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
Expand Down Expand Up @@ -2005,6 +2011,8 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
from pyiceberg.table import PropertyUtil, TableProperties

for key_pattern in [
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_PAGE_ROW_LIMIT,
Expand Down Expand Up @@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
),
}


def _dataframe_to_data_files(
table_metadata: TableMetadata,
df: pa.Table,
io: FileIO,
write_uuid: Optional[uuid.UUID] = None,
counter: Optional[itertools.count[int]] = None,
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.
Returns:
An iterable that supplies datafiles that represent the table.
"""
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask

counter = counter or itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value.
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)

if table_metadata.spec().is_unpartitioned():
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=table_metadata.schema())
for batches in bin_pack_arrow_table(df, target_file_size)
]),
)
else:
from pyiceberg.table import _determine_partitions

partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([
WriteTask(
write_uuid=write_uuid,
task_id=next(counter),
record_batches=batches,
partition_key=partition.partition_key,
schema=table_metadata.schema(),
)
for partition in partitions
for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
]),
)
2 changes: 1 addition & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class DataFile(Record):
split_offsets: Optional[List[int]]
equality_ids: Optional[List[int]]
sort_order_id: Optional[int]
spec_id: Optional[int]
spec_id: int

def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a DataFile."""
Expand Down
Loading

0 comments on commit 3f574d3

Please sign in to comment.