diff --git a/deltacat/compute/compactor_v2/model/__init__.py b/deltacat/compute/compactor_v2/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/compactor_v2/model/merge_input.py b/deltacat/compute/compactor_v2/model/merge_input.py new file mode 100644 index 00000000..d5317056 --- /dev/null +++ b/deltacat/compute/compactor_v2/model/merge_input.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from ray.types import ObjectRef +from typing import Dict, List, Optional, Any +from deltacat.utils.metrics import MetricsConfig +from deltacat.utils.common import ReadKwargsProvider +from deltacat.io.object_store import IObjectStore +from deltacat.storage import ( + Partition, + SortKey, + interface as unimplemented_deltacat_storage, +) +from deltacat.types.media import ContentType +from deltacat.compute.compactor.model.round_completion_info import RoundCompletionInfo +from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups + + +class MergeInput(Dict): + @staticmethod + def of( + dfe_groups_refs: List[ObjectRef[DeltaFileEnvelopeGroups]], + write_to_partition: Partition, + compacted_file_content_type: ContentType, + primary_keys: List[str], + sort_keys: Optional[List[SortKey]] = None, + dedupe_task_index: Optional[int] = 0, + max_records_per_output_file: Optional[int] = 4_000_000, + enable_profiler: Optional[bool] = False, + metrics_config: Optional[MetricsConfig] = None, + s3_table_writer_kwargs: Optional[Dict[str, Any]] = None, + read_kwargs_provider: Optional[ReadKwargsProvider] = None, + round_completion_info: Optional[RoundCompletionInfo] = None, + object_store: Optional[IObjectStore] = None, + deltacat_storage=unimplemented_deltacat_storage, + deltacat_storage_kwargs: Optional[Dict[str, Any]] = None, + ) -> MergeInput: + + result = MergeInput() + result["dfe_groups_refs"] = dfe_groups_refs + result["write_to_partition"] = write_to_partition + result["compacted_file_content_type"] = compacted_file_content_type + result["primary_keys"] = primary_keys + result["sort_keys"] = sort_keys + result["dedupe_task_index"] = dedupe_task_index + result["max_records_per_output_file"] = max_records_per_output_file + result["enable_profiler"] = enable_profiler + result["metrics_config"] = metrics_config + result["s3_table_writer_kwargs"] = s3_table_writer_kwargs or {} + result["read_kwargs_provider"] = read_kwargs_provider + result["round_completion_info"] = round_completion_info + result["object_store"] = object_store + result["deltacat_storage"] = deltacat_storage + result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {} + + return result + + @property + def dfe_groups_refs(self) -> List[ObjectRef[DeltaFileEnvelopeGroups]]: + return self["dfe_groups_refs"] + + @property + def write_to_partition(self) -> Partition: + return self["write_to_partition"] + + @property + def compacted_file_content_type(self) -> ContentType: + return self["compacted_file_content_type"] + + @property + def primary_keys(self) -> List[str]: + return self["primary_keys"] + + @property + def sort_keys(self) -> Optional[List[SortKey]]: + return self.get("sort_keys") + + @property + def dedupe_task_index(self) -> int: + return self.get("dedupe_task_index") + + @property + def max_records_per_output_file(self) -> int: + return self.get("max_records_per_output_file") + + @property + def enable_profiler(self) -> bool: + return self.get("enable_profiler") + + @property + def metrics_config(self) -> Optional[MetricsConfig]: + return self.get("metrics_config") + + @property + def s3_table_writer_kwargs(self) -> Optional[Dict[str, Any]]: + return self.get("s3_table_writer_kwargs") + + @property + def read_kwargs_provider(self) -> Optional[ReadKwargsProvider]: + return self.get("read_kwargs_provider") + + @property + def round_completion_info(self) -> Optional[RoundCompletionInfo]: + return self.get("round_completion_info") + + @property + def object_store(self) -> Optional[IObjectStore]: + return self.get("object_store") + + @property + def deltacat_storage(self) -> unimplemented_deltacat_storage: + return self["deltacat_storage"] + + @property + def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]: + return self.get("deltacat_storage_kwargs") diff --git a/deltacat/compute/compactor_v2/model/merge_result.py b/deltacat/compute/compactor_v2/model/merge_result.py new file mode 100644 index 00000000..91a24c6e --- /dev/null +++ b/deltacat/compute/compactor_v2/model/merge_result.py @@ -0,0 +1,12 @@ +from typing import NamedTuple, List +from deltacat.compute.compactor.model.materialize_result import MaterializeResult + +import numpy as np + + +class MergeResult(NamedTuple): + materialize_results: List[MaterializeResult] + deduped_record_count: np.int64 + peak_memory_usage_bytes: np.double + telemetry_time_in_seconds: np.double + task_completed_at: np.double diff --git a/deltacat/compute/compactor_v2/steps/__init__.py b/deltacat/compute/compactor_v2/steps/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/compactor_v2/steps/merge.py b/deltacat/compute/compactor_v2/steps/merge.py new file mode 100644 index 00000000..6e0693a7 --- /dev/null +++ b/deltacat/compute/compactor_v2/steps/merge.py @@ -0,0 +1,41 @@ +import logging +from deltacat.compute.compactor_v2.model.merge_input import MergeInput +import numpy as np +import ray +from deltacat import logs +from deltacat.compute.compactor_v2.model.merge_result import MergeResult +from deltacat.utils.performance import timed_invocation +from deltacat.utils.metrics import emit_timer_metrics + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def _timed_merge(input: MergeInput) -> MergeResult: + # TODO: Implementation goes here + pass + + +@ray.remote +def merge(input: MergeInput) -> MergeResult: + + logger.info(f"Starting merge task...") + merge_result, duration = timed_invocation(func=_timed_merge, input=input) + + emit_metrics_time = 0.0 + if input.metrics_config: + emit_result, latency = timed_invocation( + func=emit_timer_metrics, + metrics_name="merge", + value=duration, + metrics_config=input.metrics_config, + ) + emit_metrics_time = latency + + logger.info(f"Finished merge task...") + return MergeResult( + merge_result[0], + merge_result[1], + merge_result[2], + np.double(emit_metrics_time), + merge_result[4], + ) diff --git a/deltacat/tests/test_repartition.py b/deltacat/tests/test_repartition.py index 215e2b2a..c1208c64 100644 --- a/deltacat/tests/test_repartition.py +++ b/deltacat/tests/test_repartition.py @@ -176,18 +176,6 @@ def test_same_values_in_ranges(self): ) self.assertEqual(len(result.range_deltas), 2) - def test_ranges_with_inf(self): - self.repartition_args["ranges"] = [1678665487112747, float("inf")] - result = repartition_range( - self.tables, - self.destination_partition, - self.repartition_args, - self.max_records_per_output_file, - self.repartitioned_file_content_type, - self.deltacat_storage, - ) - self.assertEqual(len(result.range_deltas), 2) - def test_null_rows_are_not_dropped(self): # Add null value to the first table tables_with_null = [