Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for DeltaLake's DynamoDB lock mechanism #2705

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions awswrangler/s3/_write_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,25 @@


def _set_default_storage_options_kwargs(
boto3_session: boto3.Session | None, s3_additional_kwargs: dict[str, Any] | None, s3_allow_unsafe_rename: bool
boto3_session: boto3.Session | None,
s3_additional_kwargs: dict[str, Any] | None,
s3_allow_unsafe_rename: bool,
lock_dynamodb_table: str | None = None,
) -> dict[str, Any]:
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()}
defaults["AWS_REGION"] = defaults.pop("REGION_NAME")

s3_additional_kwargs = s3_additional_kwargs or {}

s3_lock_arguments = {}
if lock_dynamodb_table:
s3_lock_arguments["AWS_S3_LOCKING_PROVIDER"] = "dynamodb"
s3_lock_arguments["DELTA_DYNAMO_TABLE_NAME"] = lock_dynamodb_table

return {
**defaults,
**s3_additional_kwargs,
**s3_lock_arguments,
"AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE" if s3_allow_unsafe_rename else "FALSE",
}

Expand All @@ -44,9 +55,10 @@ def to_deltalake(
dtype: dict[str, str] | None = None,
partition_cols: list[str] | None = None,
overwrite_schema: bool = False,
lock_dynamodb_table: str | None = None,
s3_allow_unsafe_rename: bool = False,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, str] | None = None,
s3_allow_unsafe_rename: bool = False,
) -> None:
"""Write a DataFrame to S3 as a DeltaLake table.

Expand All @@ -71,13 +83,19 @@ def to_deltalake(
List of columns to partition the table by. Only required when creating a new table.
overwrite_schema: bool
If True, allows updating the schema of the table.
lock_dynamodb_table: str | None
DynamoDB table to use as a locking provider.
A locking mechanism is needed to prevent unsafe concurrent writes to a delta lake directory when writing to S3.
If you don't want to use a locking mechanism, you can choose to set ``s3_allow_unsafe_rename`` to True.

For information on how to set up the lock table,
please check `this page <https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#dynamodb>`_.
s3_allow_unsafe_rename: bool
Allows using the default S3 backend without support for concurrent writers.
boto3_session: boto3.Session, optional
Boto3 Session. If None, the default boto3 session is used.
pyarrow_additional_kwargs: dict[str, Any], optional
Forwarded to the Delta Table class for the storage options of the S3 backend.
s3_allow_unsafe_rename: bool
Allows using the default S3 backend without support for concurrent writers.
Concurrent writing is currently not supported, so this option needs to be turned on explicitely.

Examples
--------
Expand All @@ -86,9 +104,9 @@ def to_deltalake(
>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.s3.to_deltalake(
... df=pd.DataFrame({'col': [1, 2, 3]}),
... path='s3://bucket/prefix/',
... s3_allow_unsafe_rename=True,
... df=pd.DataFrame({"col": [1, 2, 3]}),
... path="s3://bucket/prefix/",
... lock_dynamodb_table="my-lock-table",
... )

See Also
Expand All @@ -101,7 +119,12 @@ def to_deltalake(
schema: pa.Schema = _data_types.pyarrow_schema_from_pandas(df=df, index=index, ignore_cols=None, dtype=dtype)
table: pa.Table = _df_to_table(df, schema, index, dtype)

storage_options = _set_default_storage_options_kwargs(boto3_session, s3_additional_kwargs, s3_allow_unsafe_rename)
storage_options = _set_default_storage_options_kwargs(
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
s3_allow_unsafe_rename=s3_allow_unsafe_rename,
lock_dynamodb_table=lock_dynamodb_table,
)
deltalake.write_deltalake(
table_or_uri=path,
data=table,
Expand Down
62 changes: 54 additions & 8 deletions tests/unit/test_s3_deltalake.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,66 @@
from __future__ import annotations

from typing import Any
from typing import Any, Iterator

import boto3
import pytest

import awswrangler as wr
import awswrangler.pandas as pd

from .._utils import (
get_time_str_with_random_suffix,
)


@pytest.fixture(scope="session")
def lock_dynamodb_table() -> Iterator[str]:
jaidisido marked this conversation as resolved.
Show resolved Hide resolved
name = f"deltalake_lock_{get_time_str_with_random_suffix()}"
print(f"Table name: {name}")

dynamodb_client = boto3.client("dynamodb")
dynamodb_client.create_table(
TableName=name,
BillingMode="PAY_PER_REQUEST",
KeySchema=[
{"AttributeName": "tablePath", "KeyType": "HASH"},
{"AttributeName": "fileName", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "tablePath", "AttributeType": "S"},
{"AttributeName": "fileName", "AttributeType": "S"},
],
)

dynamodb_client.get_waiter("table_exists").wait(TableName=name)

yield name

dynamodb_client.delete_table(TableName=name)
dynamodb_client.get_waiter("table_not_exists").wait(TableName=name)
print(f"Table {name} deleted.")


@pytest.fixture(params=["no_lock", "dynamodb_lock"], scope="session")
def lock_settings(request: pytest.FixtureRequest) -> dict[str, Any]:
if request.param == "no_lock":
return dict(s3_allow_unsafe_rename=True)
else:
return dict(lock_dynamodb_table=request.getfixturevalue("lock_dynamodb_table"))


@pytest.mark.parametrize("s3_additional_kwargs", [None, {"ServerSideEncryption": "AES256"}])
@pytest.mark.parametrize(
"pyarrow_additional_kwargs", [{"safe": True, "deduplicate_objects": False, "types_mapper": None}]
)
def test_read_deltalake(
path: str, s3_additional_kwargs: dict[str, Any] | None, pyarrow_additional_kwargs: dict[str, Any]
path: str,
lock_settings: dict[str, Any],
s3_additional_kwargs: dict[str, Any] | None,
pyarrow_additional_kwargs: dict[str, Any],
) -> None:
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", None, "bar"], "c2": [3.0, 4.0, 5.0], "c3": [True, False, None]})
wr.s3.to_deltalake(path=path, df=df, s3_additional_kwargs=s3_additional_kwargs, s3_allow_unsafe_rename=True)
wr.s3.to_deltalake(path=path, df=df, s3_additional_kwargs=s3_additional_kwargs, **lock_settings)

df2 = wr.s3.read_deltalake(
path=path, s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs
Expand All @@ -25,15 +69,17 @@ def test_read_deltalake(


@pytest.mark.parametrize("pyarrow_additional_kwargs", [{"types_mapper": None}])
def test_read_deltalake_versioned(path: str, pyarrow_additional_kwargs: dict[str, Any]) -> None:
def test_read_deltalake_versioned(
path: str, lock_settings: dict[str, Any], pyarrow_additional_kwargs: dict[str, Any]
) -> None:
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "baz", "bar"]})
wr.s3.to_deltalake(path=path, df=df, s3_allow_unsafe_rename=True)
wr.s3.to_deltalake(path=path, df=df, **lock_settings)

df2 = wr.s3.read_deltalake(path=path, pyarrow_additional_kwargs=pyarrow_additional_kwargs)
assert df2.equals(df)

df["c2"] = [True, False, True]
wr.s3.to_deltalake(path=path, df=df, mode="overwrite", overwrite_schema=True, s3_allow_unsafe_rename=True)
wr.s3.to_deltalake(path=path, df=df, mode="overwrite", overwrite_schema=True, **lock_settings)

df3 = wr.s3.read_deltalake(path=path, version=0, pyarrow_additional_kwargs=pyarrow_additional_kwargs)
assert df3.equals(df.drop("c2", axis=1))
Expand All @@ -42,9 +88,9 @@ def test_read_deltalake_versioned(path: str, pyarrow_additional_kwargs: dict[str
assert df4.equals(df)


def test_read_deltalake_partitions(path: str) -> None:
def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) -> None:
df = pd.DataFrame({"c0": [1, 2, 3], "c1": [True, False, True], "par0": ["foo", "foo", "bar"], "par1": [1, 2, 2]})
wr.s3.to_deltalake(path=path, df=df, partition_cols=["par0", "par1"], s3_allow_unsafe_rename=True)
wr.s3.to_deltalake(path=path, df=df, partition_cols=["par0", "par1"], **lock_settings)

df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")])
assert df2.shape == (1, 1)
Loading