Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python API - delta.appendOnly enforcement #590

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
.vscode
.env
**/.DS_Store
**/.python-version
**/.python-version
.coverage
2 changes: 1 addition & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def created_time(self) -> int:
return self._metadata.created_time

@property
def configuration(self) -> List[str]:
def configuration(self) -> Dict[str, str]:
"""Return the DeltaTable properties."""
return self._metadata.configuration

Expand Down
27 changes: 23 additions & 4 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def write_deltalake(
when creating a new table.
:param filesystem: Optional filesystem to pass to PyArrow. If not provided will
be inferred from uri.
:param mode: How to handle existing data. Default is to error if table
already exists. If 'append', will add new data. If 'overwrite', will
replace table with new data. If 'ignore', will not write anything if
table already exists.
:param mode: How to handle existing data. Default is to error if table already exists.
If 'append', will add new data.
If 'overwrite', will replace table with new data.
If 'ignore', will not write anything if table already exists.
:param file_options: Optional write options for Parquet (ParquetFileWriteOptions).
Can be provided with defaults using ParquetFileWriteOptions().make_write_options().
Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533
Expand Down Expand Up @@ -115,6 +115,8 @@ def write_deltalake(
table = table_or_uri
table_uri = table_uri = table._table.table_uri()

__enforce_append_only(table=table, configuration=configuration, mode=mode)

# TODO: Pass through filesystem once it is complete
# if filesystem is None:
# filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri))
Expand Down Expand Up @@ -202,6 +204,23 @@ def visitor(written_file: Any) -> None:
)


def __enforce_append_only(
table: Optional[DeltaTable],
configuration: Optional[Mapping[str, Optional[str]]],
mode: str,
) -> None:
WarSame marked this conversation as resolved.
Show resolved Hide resolved
"""Throw ValueError if table configuration contains delta.appendOnly and mode is not append"""
if table:
configuration = table.metadata().configuration
config_delta_append_only = (
configuration and configuration.get("delta.appendOnly", "false") == "true"
)
if config_delta_append_only and mode != "append":
raise ValueError(
f"If configuration has delta.appendOnly = 'true', mode must be 'append'. Mode is currently {mode}"
)


class DeltaJSONEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, bytes):
Expand Down
28 changes: 28 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
import os
import pathlib
Expand Down Expand Up @@ -176,6 +177,33 @@ def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
assert DeltaTable(path).to_pyarrow_table() == sample_data


def test_append_only_should_append_only_with_the_overwrite_mode(
tmp_path: pathlib.Path, sample_data: pa.Table
):
path = str(tmp_path)

config = {"delta.appendOnly": "true"}

write_deltalake(path, sample_data, mode="append", configuration=config)

table = DeltaTable(path)
write_deltalake(table, sample_data, mode="append")

data_store_types = [path, table]
fail_modes = ["overwrite", "ignore", "error"]

for data_store_type, mode in itertools.product(data_store_types, fail_modes):
with pytest.raises(
ValueError,
match=f"If configuration has delta.appendOnly = 'true', mode must be 'append'. Mode is currently {mode}",
):
write_deltalake(data_store_type, sample_data, mode=mode)

expected = pa.concat_tables([sample_data, sample_data])
assert table.to_pyarrow_table() == expected
WarSame marked this conversation as resolved.
Show resolved Hide resolved
assert table.version() == 1


def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table):
write_deltalake(existing_table, sample_data, mode="overwrite")
existing_table.update_incremental()
Expand Down