Skip to content

Commit

Permalink
Python API - delta.appendOnly enforcement (#590)
Browse files Browse the repository at this point in the history
* Add appendOnly check and test

* Add .coverage to .gitignore

* Readd metadata test, fix test

* Whitespace

* Remove unneeded concat

* Shorten condition - is this less clear?

* Fix configuration type annotation

* Move code to func, test existing config

* Fix formatting

* Add error string match, remove config to rely on existing config

* Fix linting for function

* Rename function

* Add version, add mode to error

* Add mode/data_store_type itertools product

* Sort imports
  • Loading branch information
WarSame authored May 4, 2022
1 parent 14a783e commit d890928
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
.vscode
.env
**/.DS_Store
**/.python-version
**/.python-version
.coverage
2 changes: 1 addition & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def created_time(self) -> int:
return self._metadata.created_time

@property
def configuration(self) -> List[str]:
def configuration(self) -> Dict[str, str]:
"""Return the DeltaTable properties."""
return self._metadata.configuration

Expand Down
27 changes: 23 additions & 4 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def write_deltalake(
when creating a new table.
:param filesystem: Optional filesystem to pass to PyArrow. If not provided will
be inferred from uri.
:param mode: How to handle existing data. Default is to error if table
already exists. If 'append', will add new data. If 'overwrite', will
replace table with new data. If 'ignore', will not write anything if
table already exists.
:param mode: How to handle existing data. Default is to error if table already exists.
If 'append', will add new data.
If 'overwrite', will replace table with new data.
If 'ignore', will not write anything if table already exists.
:param file_options: Optional write options for Parquet (ParquetFileWriteOptions).
Can be provided with defaults using ParquetFileWriteOptions().make_write_options().
Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533
Expand Down Expand Up @@ -115,6 +115,8 @@ def write_deltalake(
table = table_or_uri
table_uri = table_uri = table._table.table_uri()

__enforce_append_only(table=table, configuration=configuration, mode=mode)

# TODO: Pass through filesystem once it is complete
# if filesystem is None:
# filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri))
Expand Down Expand Up @@ -202,6 +204,23 @@ def visitor(written_file: Any) -> None:
)


def __enforce_append_only(
table: Optional[DeltaTable],
configuration: Optional[Mapping[str, Optional[str]]],
mode: str,
) -> None:
"""Throw ValueError if table configuration contains delta.appendOnly and mode is not append"""
if table:
configuration = table.metadata().configuration
config_delta_append_only = (
configuration and configuration.get("delta.appendOnly", "false") == "true"
)
if config_delta_append_only and mode != "append":
raise ValueError(
f"If configuration has delta.appendOnly = 'true', mode must be 'append'. Mode is currently {mode}"
)


class DeltaJSONEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, bytes):
Expand Down
28 changes: 28 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
import os
import pathlib
Expand Down Expand Up @@ -176,6 +177,33 @@ def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
assert DeltaTable(path).to_pyarrow_table() == sample_data


def test_append_only_should_append_only_with_the_overwrite_mode(
tmp_path: pathlib.Path, sample_data: pa.Table
):
path = str(tmp_path)

config = {"delta.appendOnly": "true"}

write_deltalake(path, sample_data, mode="append", configuration=config)

table = DeltaTable(path)
write_deltalake(table, sample_data, mode="append")

data_store_types = [path, table]
fail_modes = ["overwrite", "ignore", "error"]

for data_store_type, mode in itertools.product(data_store_types, fail_modes):
with pytest.raises(
ValueError,
match=f"If configuration has delta.appendOnly = 'true', mode must be 'append'. Mode is currently {mode}",
):
write_deltalake(data_store_type, sample_data, mode=mode)

expected = pa.concat_tables([sample_data, sample_data])
assert table.to_pyarrow_table() == expected
assert table.version() == 1


def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table):
write_deltalake(existing_table, sample_data, mode="overwrite")
existing_table.update_incremental()
Expand Down

0 comments on commit d890928

Please sign in to comment.