From a7d55a83850e5363d8f9f52eb631ea311fb636f3 Mon Sep 17 00:00:00 2001 From: Raghavendra Dani Date: Mon, 24 Jul 2023 17:23:12 -0700 Subject: [PATCH 1/4] Allow s3 client kwargs as argument of compact_partition --- deltacat/__init__.py | 2 +- .../compute/compactor/compaction_session.py | 27 ++++++++++++++++--- .../compactor/utils/round_completion_file.py | 15 ++++++++--- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/deltacat/__init__.py b/deltacat/__init__.py index 178ec1f4..c1947b25 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -43,7 +43,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "0.1.18b4" +__version__ = "0.1.18b7" __all__ = [ diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index cd7d1fa7..9762e350 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -115,6 +115,7 @@ def compact_partition( read_kwargs_provider: Optional[ReadKwargsProvider] = None, s3_table_writer_kwargs: Optional[Dict[str, Any]] = None, object_store: Optional[IObjectStore] = RayPlasmaObjectStore(), + s3_client_kwargs: Optional[Dict[str, Any]] = None, deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Optional[str]: @@ -155,6 +156,7 @@ def compact_partition( read_kwargs_provider, s3_table_writer_kwargs, object_store, + s3_client_kwargs, deltacat_storage, **kwargs, ) @@ -201,6 +203,7 @@ def _execute_compaction_round( read_kwargs_provider: Optional[ReadKwargsProvider], s3_table_writer_kwargs: Optional[Dict[str, Any]], object_store: Optional[IObjectStore], + s3_client_kwargs: Optional[Dict[str, Any]], deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Tuple[Optional[Partition], Optional[RoundCompletionInfo], Optional[str]]: @@ -330,7 +333,11 @@ def _execute_compaction_round( delta_discovery_end - delta_discovery_start ) - s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + s3_utils.upload( + compaction_audit.audit_url, + str(json.dumps(compaction_audit)), + **s3_client_kwargs, + ) if not input_deltas: logger.info("No input deltas found to compact.") @@ -424,7 +431,11 @@ def _execute_compaction_round( hb_end - hb_start, ) - s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + s3_utils.upload( + compaction_audit.audit_url, + str(json.dumps(compaction_audit)), + **s3_client_kwargs, + ) all_hash_group_idx_to_obj_id = defaultdict(list) for hb_result in hb_results: @@ -539,7 +550,11 @@ def _execute_compaction_round( # parallel step 3: # materialize records to keep by index - s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + s3_utils.upload( + compaction_audit.audit_url, + str(json.dumps(compaction_audit)), + **s3_client_kwargs, + ) materialize_start = time.monotonic() @@ -641,7 +656,11 @@ def _execute_compaction_round( mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize ) - s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + s3_utils.upload( + compaction_audit.audit_url, + str(json.dumps(compaction_audit)), + **s3_client_kwargs, + ) new_round_completion_info = RoundCompletionInfo.of( last_stream_position_compacted, diff --git a/deltacat/compute/compactor/utils/round_completion_file.py b/deltacat/compute/compactor/utils/round_completion_file.py index 382e99a5..48167d58 100644 --- a/deltacat/compute/compactor/utils/round_completion_file.py +++ b/deltacat/compute/compactor/utils/round_completion_file.py @@ -1,6 +1,6 @@ import json import logging - +from typing import Dict, Any from deltacat import logs from deltacat.compute.compactor import RoundCompletionInfo from deltacat.storage import PartitionLocator @@ -19,7 +19,9 @@ def get_round_completion_file_s3_url( def read_round_completion_file( - bucket: str, source_partition_locator: PartitionLocator + bucket: str, + source_partition_locator: PartitionLocator, + **s3_client_kwargs: Optional[Dict[str, Any]], ) -> RoundCompletionInfo: round_completion_file_url = get_round_completion_file_s3_url( @@ -28,7 +30,7 @@ def read_round_completion_file( ) logger.info(f"reading round completion file from: {round_completion_file_url}") round_completion_info = None - result = s3_utils.download(round_completion_file_url, False) + result = s3_utils.download(round_completion_file_url, False, **s3_client_kwargs) if result: json_str = result["Body"].read().decode("utf-8") round_completion_info = RoundCompletionInfo(json.loads(json_str)) @@ -41,6 +43,7 @@ def write_round_completion_file( source_partition_locator: Optional[PartitionLocator], round_completion_info: RoundCompletionInfo, completion_file_s3_url: str = None, + **s3_client_kwargs: Optional[Dict[str, Any]], ) -> str: if bucket is None and completion_file_s3_url is None: raise AssertionError("Either bucket or completion_file_s3_url must be passed") @@ -52,6 +55,10 @@ def write_round_completion_file( source_partition_locator, ) logger.info(f"writing round completion file to: {completion_file_s3_url}") - s3_utils.upload(completion_file_s3_url, str(json.dumps(round_completion_info))) + s3_utils.upload( + completion_file_s3_url, + str(json.dumps(round_completion_info)), + **s3_client_kwargs, + ) logger.info(f"round completion file written to: {completion_file_s3_url}") return completion_file_s3_url From 702407eb8a46a00f86202398fd86234d7f0c1d98 Mon Sep 17 00:00:00 2001 From: Raghavendra Dani Date: Mon, 24 Jul 2023 19:04:59 -0700 Subject: [PATCH 2/4] Changing default as empty dict --- deltacat/compute/compactor/compaction_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index 9762e350..78399459 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -115,7 +115,7 @@ def compact_partition( read_kwargs_provider: Optional[ReadKwargsProvider] = None, s3_table_writer_kwargs: Optional[Dict[str, Any]] = None, object_store: Optional[IObjectStore] = RayPlasmaObjectStore(), - s3_client_kwargs: Optional[Dict[str, Any]] = None, + s3_client_kwargs: Optional[Dict[str, Any]] = {}, deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Optional[str]: From 3227817621d5a61db898ac1c20bbb19b7d1c52d1 Mon Sep 17 00:00:00 2001 From: Raghavendra Dani Date: Mon, 24 Jul 2023 19:56:26 -0700 Subject: [PATCH 3/4] add s3_client_kwargs to rcf --- deltacat/compute/compactor/compaction_session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index 78399459..5808811a 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -176,6 +176,7 @@ def compact_partition( compaction_artifact_s3_bucket, new_rcf_partition_locator, new_rci, + **s3_client_kwargs, ) logger.info(f"Completed compaction session for: {source_partition_locator}") return round_completion_file_s3_url @@ -287,7 +288,7 @@ def _execute_compaction_round( round_completion_info = None if not rebase_source_partition_locator: round_completion_info = rcf.read_round_completion_file( - compaction_artifact_s3_bucket, source_partition_locator + compaction_artifact_s3_bucket, source_partition_locator, **s3_client_kwargs ) if not round_completion_info: logger.info( From 33c41df62073bc34a95a36a67ecc39aabdc5fd3e Mon Sep 17 00:00:00 2001 From: Raghavendra Dani Date: Mon, 24 Jul 2023 20:19:58 -0700 Subject: [PATCH 4/4] Bunping up version to 0.1.18b8 --- deltacat/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deltacat/__init__.py b/deltacat/__init__.py index c1947b25..57be4916 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -43,7 +43,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "0.1.18b7" +__version__ = "0.1.18b8" __all__ = [