Skip to content

Commit

Permalink
Interface for merge v2 (#182)
Browse files Browse the repository at this point in the history
* Adding interface and definitions for merge step

* fix tests and merge logs
  • Loading branch information
raghumdani authored Aug 8, 2023
1 parent 61413b5 commit 30ac113
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 12 deletions.
Empty file.
115 changes: 115 additions & 0 deletions deltacat/compute/compactor_v2/model/merge_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

from ray.types import ObjectRef
from typing import Dict, List, Optional, Any
from deltacat.utils.metrics import MetricsConfig
from deltacat.utils.common import ReadKwargsProvider
from deltacat.io.object_store import IObjectStore
from deltacat.storage import (
Partition,
SortKey,
interface as unimplemented_deltacat_storage,
)
from deltacat.types.media import ContentType
from deltacat.compute.compactor.model.round_completion_info import RoundCompletionInfo
from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups


class MergeInput(Dict):
@staticmethod
def of(
dfe_groups_refs: List[ObjectRef[DeltaFileEnvelopeGroups]],
write_to_partition: Partition,
compacted_file_content_type: ContentType,
primary_keys: List[str],
sort_keys: Optional[List[SortKey]] = None,
dedupe_task_index: Optional[int] = 0,
max_records_per_output_file: Optional[int] = 4_000_000,
enable_profiler: Optional[bool] = False,
metrics_config: Optional[MetricsConfig] = None,
s3_table_writer_kwargs: Optional[Dict[str, Any]] = None,
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
round_completion_info: Optional[RoundCompletionInfo] = None,
object_store: Optional[IObjectStore] = None,
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[Dict[str, Any]] = None,
) -> MergeInput:

result = MergeInput()
result["dfe_groups_refs"] = dfe_groups_refs
result["write_to_partition"] = write_to_partition
result["compacted_file_content_type"] = compacted_file_content_type
result["primary_keys"] = primary_keys
result["sort_keys"] = sort_keys
result["dedupe_task_index"] = dedupe_task_index
result["max_records_per_output_file"] = max_records_per_output_file
result["enable_profiler"] = enable_profiler
result["metrics_config"] = metrics_config
result["s3_table_writer_kwargs"] = s3_table_writer_kwargs or {}
result["read_kwargs_provider"] = read_kwargs_provider
result["round_completion_info"] = round_completion_info
result["object_store"] = object_store
result["deltacat_storage"] = deltacat_storage
result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {}

return result

@property
def dfe_groups_refs(self) -> List[ObjectRef[DeltaFileEnvelopeGroups]]:
return self["dfe_groups_refs"]

@property
def write_to_partition(self) -> Partition:
return self["write_to_partition"]

@property
def compacted_file_content_type(self) -> ContentType:
return self["compacted_file_content_type"]

@property
def primary_keys(self) -> List[str]:
return self["primary_keys"]

@property
def sort_keys(self) -> Optional[List[SortKey]]:
return self.get("sort_keys")

@property
def dedupe_task_index(self) -> int:
return self.get("dedupe_task_index")

@property
def max_records_per_output_file(self) -> int:
return self.get("max_records_per_output_file")

@property
def enable_profiler(self) -> bool:
return self.get("enable_profiler")

@property
def metrics_config(self) -> Optional[MetricsConfig]:
return self.get("metrics_config")

@property
def s3_table_writer_kwargs(self) -> Optional[Dict[str, Any]]:
return self.get("s3_table_writer_kwargs")

@property
def read_kwargs_provider(self) -> Optional[ReadKwargsProvider]:
return self.get("read_kwargs_provider")

@property
def round_completion_info(self) -> Optional[RoundCompletionInfo]:
return self.get("round_completion_info")

@property
def object_store(self) -> Optional[IObjectStore]:
return self.get("object_store")

@property
def deltacat_storage(self) -> unimplemented_deltacat_storage:
return self["deltacat_storage"]

@property
def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]:
return self.get("deltacat_storage_kwargs")
12 changes: 12 additions & 0 deletions deltacat/compute/compactor_v2/model/merge_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import NamedTuple, List
from deltacat.compute.compactor.model.materialize_result import MaterializeResult

import numpy as np


class MergeResult(NamedTuple):
materialize_results: List[MaterializeResult]
deduped_record_count: np.int64
peak_memory_usage_bytes: np.double
telemetry_time_in_seconds: np.double
task_completed_at: np.double
Empty file.
41 changes: 41 additions & 0 deletions deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from deltacat.compute.compactor_v2.model.merge_input import MergeInput
import numpy as np
import ray
from deltacat import logs
from deltacat.compute.compactor_v2.model.merge_result import MergeResult
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def _timed_merge(input: MergeInput) -> MergeResult:
# TODO: Implementation goes here
pass


@ray.remote
def merge(input: MergeInput) -> MergeResult:

logger.info(f"Starting merge task...")
merge_result, duration = timed_invocation(func=_timed_merge, input=input)

emit_metrics_time = 0.0
if input.metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="merge",
value=duration,
metrics_config=input.metrics_config,
)
emit_metrics_time = latency

logger.info(f"Finished merge task...")
return MergeResult(
merge_result[0],
merge_result[1],
merge_result[2],
np.double(emit_metrics_time),
merge_result[4],
)
12 changes: 0 additions & 12 deletions deltacat/tests/test_repartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,6 @@ def test_same_values_in_ranges(self):
)
self.assertEqual(len(result.range_deltas), 2)

def test_ranges_with_inf(self):
self.repartition_args["ranges"] = [1678665487112747, float("inf")]
result = repartition_range(
self.tables,
self.destination_partition,
self.repartition_args,
self.max_records_per_output_file,
self.repartitioned_file_content_type,
self.deltacat_storage,
)
self.assertEqual(len(result.range_deltas), 2)

def test_null_rows_are_not_dropped(self):
# Add null value to the first table
tables_with_null = [
Expand Down

0 comments on commit 30ac113

Please sign in to comment.