diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index fc81001ea..f423505ad 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -22,14 +22,25 @@ def _set_default_storage_options_kwargs( - boto3_session: boto3.Session | None, s3_additional_kwargs: dict[str, Any] | None, s3_allow_unsafe_rename: bool + boto3_session: boto3.Session | None, + s3_additional_kwargs: dict[str, Any] | None, + s3_allow_unsafe_rename: bool, + lock_dynamodb_table: str | None = None, ) -> dict[str, Any]: defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()} defaults["AWS_REGION"] = defaults.pop("REGION_NAME") + s3_additional_kwargs = s3_additional_kwargs or {} + + s3_lock_arguments = {} + if lock_dynamodb_table: + s3_lock_arguments["AWS_S3_LOCKING_PROVIDER"] = "dynamodb" + s3_lock_arguments["DELTA_DYNAMO_TABLE_NAME"] = lock_dynamodb_table + return { **defaults, **s3_additional_kwargs, + **s3_lock_arguments, "AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE" if s3_allow_unsafe_rename else "FALSE", } @@ -44,9 +55,10 @@ def to_deltalake( dtype: dict[str, str] | None = None, partition_cols: list[str] | None = None, overwrite_schema: bool = False, + lock_dynamodb_table: str | None = None, + s3_allow_unsafe_rename: bool = False, boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, str] | None = None, - s3_allow_unsafe_rename: bool = False, ) -> None: """Write a DataFrame to S3 as a DeltaLake table. @@ -71,13 +83,19 @@ def to_deltalake( List of columns to partition the table by. Only required when creating a new table. overwrite_schema: bool If True, allows updating the schema of the table. + lock_dynamodb_table: str | None + DynamoDB table to use as a locking provider. + A locking mechanism is needed to prevent unsafe concurrent writes to a delta lake directory when writing to S3. + If you don't want to use a locking mechanism, you can choose to set ``s3_allow_unsafe_rename`` to True. + + For information on how to set up the lock table, + please check `this page `_. + s3_allow_unsafe_rename: bool + Allows using the default S3 backend without support for concurrent writers. boto3_session: boto3.Session, optional Boto3 Session. If None, the default boto3 session is used. pyarrow_additional_kwargs: dict[str, Any], optional Forwarded to the Delta Table class for the storage options of the S3 backend. - s3_allow_unsafe_rename: bool - Allows using the default S3 backend without support for concurrent writers. - Concurrent writing is currently not supported, so this option needs to be turned on explicitely. Examples -------- @@ -86,9 +104,9 @@ def to_deltalake( >>> import awswrangler as wr >>> import pandas as pd >>> wr.s3.to_deltalake( - ... df=pd.DataFrame({'col': [1, 2, 3]}), - ... path='s3://bucket/prefix/', - ... s3_allow_unsafe_rename=True, + ... df=pd.DataFrame({"col": [1, 2, 3]}), + ... path="s3://bucket/prefix/", + ... lock_dynamodb_table="my-lock-table", ... ) See Also @@ -101,7 +119,12 @@ def to_deltalake( schema: pa.Schema = _data_types.pyarrow_schema_from_pandas(df=df, index=index, ignore_cols=None, dtype=dtype) table: pa.Table = _df_to_table(df, schema, index, dtype) - storage_options = _set_default_storage_options_kwargs(boto3_session, s3_additional_kwargs, s3_allow_unsafe_rename) + storage_options = _set_default_storage_options_kwargs( + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, + s3_allow_unsafe_rename=s3_allow_unsafe_rename, + lock_dynamodb_table=lock_dynamodb_table, + ) deltalake.write_deltalake( table_or_uri=path, data=table, diff --git a/tests/unit/test_s3_deltalake.py b/tests/unit/test_s3_deltalake.py index 8225244c2..014d7ccb5 100644 --- a/tests/unit/test_s3_deltalake.py +++ b/tests/unit/test_s3_deltalake.py @@ -1,22 +1,66 @@ from __future__ import annotations -from typing import Any +from typing import Any, Iterator +import boto3 import pytest import awswrangler as wr import awswrangler.pandas as pd +from .._utils import ( + get_time_str_with_random_suffix, +) + + +@pytest.fixture(scope="session") +def lock_dynamodb_table() -> Iterator[str]: + name = f"deltalake_lock_{get_time_str_with_random_suffix()}" + print(f"Table name: {name}") + + dynamodb_client = boto3.client("dynamodb") + dynamodb_client.create_table( + TableName=name, + BillingMode="PAY_PER_REQUEST", + KeySchema=[ + {"AttributeName": "tablePath", "KeyType": "HASH"}, + {"AttributeName": "fileName", "KeyType": "RANGE"}, + ], + AttributeDefinitions=[ + {"AttributeName": "tablePath", "AttributeType": "S"}, + {"AttributeName": "fileName", "AttributeType": "S"}, + ], + ) + + dynamodb_client.get_waiter("table_exists").wait(TableName=name) + + yield name + + dynamodb_client.delete_table(TableName=name) + dynamodb_client.get_waiter("table_not_exists").wait(TableName=name) + print(f"Table {name} deleted.") + + +@pytest.fixture(params=["no_lock", "dynamodb_lock"], scope="session") +def lock_settings(request: pytest.FixtureRequest) -> dict[str, Any]: + if request.param == "no_lock": + return dict(s3_allow_unsafe_rename=True) + else: + return dict(lock_dynamodb_table=request.getfixturevalue("lock_dynamodb_table")) + @pytest.mark.parametrize("s3_additional_kwargs", [None, {"ServerSideEncryption": "AES256"}]) @pytest.mark.parametrize( "pyarrow_additional_kwargs", [{"safe": True, "deduplicate_objects": False, "types_mapper": None}] ) def test_read_deltalake( - path: str, s3_additional_kwargs: dict[str, Any] | None, pyarrow_additional_kwargs: dict[str, Any] + path: str, + lock_settings: dict[str, Any], + s3_additional_kwargs: dict[str, Any] | None, + pyarrow_additional_kwargs: dict[str, Any], ) -> None: df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", None, "bar"], "c2": [3.0, 4.0, 5.0], "c3": [True, False, None]}) - wr.s3.to_deltalake(path=path, df=df, s3_additional_kwargs=s3_additional_kwargs, s3_allow_unsafe_rename=True) + wr.s3.to_deltalake(path=path, df=df, s3_additional_kwargs=s3_additional_kwargs, **lock_settings) df2 = wr.s3.read_deltalake( path=path, s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs @@ -25,15 +69,17 @@ def test_read_deltalake( @pytest.mark.parametrize("pyarrow_additional_kwargs", [{"types_mapper": None}]) -def test_read_deltalake_versioned(path: str, pyarrow_additional_kwargs: dict[str, Any]) -> None: +def test_read_deltalake_versioned( + path: str, lock_settings: dict[str, Any], pyarrow_additional_kwargs: dict[str, Any] +) -> None: df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "baz", "bar"]}) - wr.s3.to_deltalake(path=path, df=df, s3_allow_unsafe_rename=True) + wr.s3.to_deltalake(path=path, df=df, **lock_settings) df2 = wr.s3.read_deltalake(path=path, pyarrow_additional_kwargs=pyarrow_additional_kwargs) assert df2.equals(df) df["c2"] = [True, False, True] - wr.s3.to_deltalake(path=path, df=df, mode="overwrite", overwrite_schema=True, s3_allow_unsafe_rename=True) + wr.s3.to_deltalake(path=path, df=df, mode="overwrite", overwrite_schema=True, **lock_settings) df3 = wr.s3.read_deltalake(path=path, version=0, pyarrow_additional_kwargs=pyarrow_additional_kwargs) assert df3.equals(df.drop("c2", axis=1)) @@ -42,9 +88,9 @@ def test_read_deltalake_versioned(path: str, pyarrow_additional_kwargs: dict[str assert df4.equals(df) -def test_read_deltalake_partitions(path: str) -> None: +def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) -> None: df = pd.DataFrame({"c0": [1, 2, 3], "c1": [True, False, True], "par0": ["foo", "foo", "bar"], "par1": [1, 2, 2]}) - wr.s3.to_deltalake(path=path, df=df, partition_cols=["par0", "par1"], s3_allow_unsafe_rename=True) + wr.s3.to_deltalake(path=path, df=df, partition_cols=["par0", "par1"], **lock_settings) df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")]) assert df2.shape == (1, 1)