From 32b47686a6ed4cda9aff49f72bfbbf2df4a1c62b Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 23 Jan 2024 10:45:41 +0100 Subject: [PATCH] add save mode --- .../dagster-deltalake/dagster_deltalake/handler.py | 5 +++-- .../dagster-deltalake/dagster_deltalake/io_manager.py | 10 ++++++++++ .../dagster-deltalake/dagster_deltalake/resource.py | 4 +++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-deltalake/dagster_deltalake/handler.py b/python_modules/libraries/dagster-deltalake/dagster_deltalake/handler.py index 52a0272e61f05..6807347bac163 100644 --- a/python_modules/libraries/dagster-deltalake/dagster_deltalake/handler.py +++ b/python_modules/libraries/dagster-deltalake/dagster_deltalake/handler.py @@ -69,12 +69,13 @@ def handle_output( # TODO make robust and move to function partition_columns = [dim.partition_expr for dim in table_slice.partition_dimensions] - + context.log.info('The save mode that will be used %s', context.resource_config.get('mode')) # type: ignore + write_deltalake( table_or_uri=connection.table_uri, data=reader, storage_options=connection.storage_options, - mode="overwrite", + mode=context.resource_config.get('mode'), # type: ignore partition_filters=partition_filters, partition_by=partition_columns, **delta_params, diff --git a/python_modules/libraries/dagster-deltalake/dagster_deltalake/io_manager.py b/python_modules/libraries/dagster-deltalake/dagster_deltalake/io_manager.py index 7901992cac0fe..dabf8d3998f00 100644 --- a/python_modules/libraries/dagster-deltalake/dagster_deltalake/io_manager.py +++ b/python_modules/libraries/dagster-deltalake/dagster_deltalake/io_manager.py @@ -15,6 +15,7 @@ TableSlice, ) from pydantic import Field +from enum import Enum if sys.version_info >= (3, 8): from typing import TypedDict @@ -45,9 +46,16 @@ class _StorageOptionsConfig(TypedDict, total=False): azure: Dict[str, str] gcs: Dict[str, str] +class _DeltaWriteMode(str, Enum): + error = "error" + append = "append" + overwrite = "overwrite" + ignore = "ignore" + class _DeltaTableIOManagerResourceConfig(TypedDict): root_uri: str + mode: str storage_options: _StorageOptionsConfig client_options: NotRequired[Dict[str, str]] table_config: NotRequired[Dict[str, str]] @@ -107,6 +115,8 @@ def my_table_a(my_table: pd.DataFrame): root_uri: str = Field(description="Storage location where Delta tables are stored.") + mode: str = Field(default='overwrite', description="The write mode passed to save the output.") + storage_options: Union[AzureConfig, S3Config, LocalConfig, GcsConfig] = Field( discriminator="provider", description="Configuration for accessing storage location.", diff --git a/python_modules/libraries/dagster-deltalake/dagster_deltalake/resource.py b/python_modules/libraries/dagster-deltalake/dagster_deltalake/resource.py index 7050db162b4b7..9122440e14e0d 100644 --- a/python_modules/libraries/dagster-deltalake/dagster_deltalake/resource.py +++ b/python_modules/libraries/dagster-deltalake/dagster_deltalake/resource.py @@ -42,7 +42,9 @@ def my_table(delta_table: DeltaTableResource): default=None, description="Additional configuration passed to http client." ) - version: Optional[int] + version: Optional[int] = Field( + default = None, description="Version to load delta table." + ) def load(self) -> DeltaTable: storage_options = self.storage_options.dict() if self.storage_options else {}