diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index f4f98c47..a24db182 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -3,6 +3,10 @@ import functools import logging import ray +import time +import json +from deltacat.aws import s3u as s3_utils +import deltacat from deltacat import logs import pyarrow as pa from deltacat.compute.compactor import ( @@ -12,6 +16,7 @@ ) from deltacat.compute.compactor.model.dedupe_result import DedupeResult from deltacat.compute.compactor.model.hash_bucket_result import HashBucketResult +from deltacat.compute.compactor.model.materialize_result import MaterializeResult from deltacat.compute.stats.models.delta_stats import DeltaStats from deltacat.storage import ( Delta, @@ -37,7 +42,11 @@ from typing import List, Set, Optional, Tuple, Dict, Any from collections import defaultdict from deltacat.utils.metrics import MetricsConfig -from deltacat.utils.resources import log_current_cluster_utilization +from deltacat.compute.compactor.model.compaction_session_audit_info import ( + CompactionSessionAuditInfo, +) +from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes + if importlib.util.find_spec("memray"): import memray @@ -188,6 +197,23 @@ def _execute_compaction_round( **kwargs, ) -> Tuple[Optional[Partition], Optional[RoundCompletionInfo], Optional[str]]: + rcf_source_partition_locator = ( + rebase_source_partition_locator + if rebase_source_partition_locator + else source_partition_locator + ) + + base_audit_url = rcf_source_partition_locator.path( + f"s3://{compaction_artifact_s3_bucket}/compaction-audit" + ) + audit_url = f"{base_audit_url}.json" + + logger.info(f"Compaction audit will be written to {audit_url}") + + compaction_audit = CompactionSessionAuditInfo(deltacat.__version__, audit_url) + + compaction_start = time.monotonic() + if not primary_keys: # TODO (pdames): run simple rebatch to reduce all deltas into 1 delta # with normalized manifest entry sizes @@ -230,6 +256,7 @@ def _execute_compaction_round( f"{node_resource_keys}" ) + compaction_audit.set_cluster_cpu_max(cluster_cpus) # create a remote options provider to round-robin tasks across all nodes or allocated bundles logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}") round_robin_opt_provider = functools.partial( @@ -268,6 +295,7 @@ def _execute_compaction_round( round_completion_info.high_watermark if round_completion_info else None ) + delta_discovery_start = time.monotonic() ( input_deltas, previous_last_stream_position_compacted_on_destination_table, @@ -282,6 +310,13 @@ def _execute_compaction_round( **list_deltas_kwargs, ) + delta_discovery_end = time.monotonic() + compaction_audit.set_delta_discovery_time_in_seconds( + delta_discovery_end - delta_discovery_start + ) + + s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + if not input_deltas: logger.info("No input deltas found to compact.") return None, None, None @@ -298,6 +333,7 @@ def _execute_compaction_round( io.fit_input_deltas( input_deltas, cluster_resources, + compaction_audit, hash_bucket_count, deltacat_storage=deltacat_storage, ) @@ -307,11 +343,14 @@ def _execute_compaction_round( cluster_resources, hash_bucket_count, min_hash_bucket_chunk_size, + compaction_audit=compaction_audit, input_deltas_stats=input_deltas_stats, deltacat_storage=deltacat_storage, ) ) + compaction_audit.set_uniform_deltas_created(len(uniform_deltas)) + assert hash_bucket_count is not None and hash_bucket_count > 0, ( f"Expected hash bucket count to be a positive integer, but found " f"`{hash_bucket_count}`" @@ -335,6 +374,8 @@ def _execute_compaction_round( "Multiple rounds are not supported. Please increase the cluster size and run again." ) + hb_start = time.monotonic() + hb_tasks_pending = invoke_parallel( items=uniform_deltas, ray_task=hb.hash_bucket, @@ -350,9 +391,25 @@ def _execute_compaction_round( read_kwargs_provider=read_kwargs_provider, deltacat_storage=deltacat_storage, ) + + hb_invoke_end = time.monotonic() + logger.info(f"Getting {len(hb_tasks_pending)} hash bucket results...") hb_results: List[HashBucketResult] = ray.get(hb_tasks_pending) logger.info(f"Got {len(hb_results)} hash bucket results.") + hb_end = time.monotonic() + hb_results_retrieved_at = time.time() + + telemetry_time_hb = compaction_audit.save_step_stats( + CompactionSessionAuditInfo.HASH_BUCKET_STEP_NAME, + hb_results, + hb_results_retrieved_at, + hb_invoke_end - hb_start, + hb_end - hb_start, + ) + + s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + all_hash_group_idx_to_obj_id = defaultdict(list) for hb_result in hb_results: for hash_group_index, object_id in enumerate( @@ -367,6 +424,8 @@ def _execute_compaction_round( f"Got {total_hb_record_count} hash bucket records from hash bucketing step..." ) + compaction_audit.set_input_records(total_hb_record_count.item()) + # TODO (pdames): when resources are freed during the last round of hash # bucketing, start running dedupe tasks that read existing dedupe # output from S3 then wait for hash bucketing to finish before continuing @@ -389,6 +448,9 @@ def _execute_compaction_round( # identify the index of records to keep or drop based on sort keys num_materialize_buckets = max_parallelism logger.info(f"Materialize Bucket Count: {num_materialize_buckets}") + + dedupe_start = time.monotonic() + dd_tasks_pending = invoke_parallel( items=all_hash_group_idx_to_obj_id.values(), ray_task=dd.dedupe, @@ -403,11 +465,31 @@ def _execute_compaction_round( enable_profiler=enable_profiler, metrics_config=metrics_config, ) + + dedupe_invoke_end = time.monotonic() logger.info(f"Getting {len(dd_tasks_pending)} dedupe results...") dd_results: List[DedupeResult] = ray.get(dd_tasks_pending) logger.info(f"Got {len(dd_results)} dedupe results.") + + # we use time.time() here because time.monotonic() has no reference point + # whereas time.time() measures epoch seconds. Hence, it will be reasonable + # to compare time.time()s captured in different nodes. + dedupe_results_retrieved_at = time.time() + dedupe_end = time.monotonic() + total_dd_record_count = sum([ddr.deduped_record_count for ddr in dd_results]) logger.info(f"Deduped {total_dd_record_count} records...") + + telemetry_time_dd = compaction_audit.save_step_stats( + CompactionSessionAuditInfo.DEDUPE_STEP_NAME, + dd_results, + dedupe_results_retrieved_at, + dedupe_invoke_end - dedupe_start, + dedupe_end - dedupe_start, + ) + + compaction_audit.set_records_deduped(total_dd_record_count.item()) + all_mat_buckets_to_obj_id = defaultdict(list) for dd_result in dd_results: for ( @@ -420,6 +502,8 @@ def _execute_compaction_round( logger.info(f"Getting {len(dd_tasks_pending)} dedupe result stat(s)...") logger.info(f"Materialize buckets created: " f"{len(all_mat_buckets_to_obj_id)}") + compaction_audit.set_materialize_buckets(len(all_mat_buckets_to_obj_id)) + # TODO(pdames): when resources are freed during the last round of deduping # start running materialize tasks that read materialization source file # tables from S3 then wait for deduping to finish before continuing @@ -432,6 +516,11 @@ def _execute_compaction_round( # parallel step 3: # materialize records to keep by index + + s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + + materialize_start = time.monotonic() + mat_tasks_pending = invoke_parallel( items=all_mat_buckets_to_obj_id.items(), ray_task=mat.materialize, @@ -453,30 +542,24 @@ def _execute_compaction_round( s3_table_writer_kwargs=s3_table_writer_kwargs, deltacat_storage=deltacat_storage, ) + + materialize_invoke_end = time.monotonic() + logger.info(f"Getting {len(mat_tasks_pending)} materialize result(s)...") - mat_results = ray.get(mat_tasks_pending) - total_count_of_src_dfl_not_touched = sum( - m.count_of_src_dfl_not_touched for m in mat_results - ) - total_length_src_dfl = sum(m.count_of_src_dfl for m in mat_results) - logger.info( - f"Got total of {total_count_of_src_dfl_not_touched} manifest files not touched." - ) - logger.info( - f"Got total of {total_length_src_dfl} manifest files during compaction." - ) - manifest_entry_copied_by_reference_ratio = ( - (round(total_count_of_src_dfl_not_touched / total_length_src_dfl, 4) * 100) - if total_length_src_dfl != 0 - else None - ) - logger.info( - f"{manifest_entry_copied_by_reference_ratio} percent of manifest files are copied by reference during materialize." - ) + mat_results: List[MaterializeResult] = ray.get(mat_tasks_pending) logger.info(f"Got {len(mat_results)} materialize result(s).") - log_current_cluster_utilization(log_identifier="post_materialize") + materialize_end = time.monotonic() + materialize_results_retrieved_at = time.time() + + telemetry_time_materialize = compaction_audit.save_step_stats( + CompactionSessionAuditInfo.MATERIALIZE_STEP_NAME, + mat_results, + materialize_results_retrieved_at, + materialize_invoke_end - materialize_start, + materialize_end - materialize_start, + ) mat_results = sorted(mat_results, key=lambda m: m.task_index) deltas = [m.delta for m in mat_results] @@ -494,6 +577,7 @@ def _execute_compaction_round( f" Materialized records: {merged_delta.meta.record_count}" ) logger.info(record_info_msg) + assert ( total_hb_record_count - total_dd_record_count == merged_delta.meta.record_count ), ( @@ -506,6 +590,9 @@ def _execute_compaction_round( ) logger.info(f"Committed compacted delta: {compacted_delta}") + compaction_end = time.monotonic() + compaction_audit.set_compaction_time_in_seconds(compaction_end - compaction_start) + new_compacted_delta_locator = DeltaLocator.of( new_compacted_partition_locator, compacted_delta.stream_position, @@ -516,24 +603,38 @@ def _execute_compaction_round( if round_completion_info else None ) + + pyarrow_write_result = PyArrowWriteResult.union( + [m.pyarrow_write_result for m in mat_results] + ) + + session_peak_memory = get_current_node_peak_memory_usage_in_bytes() + compaction_audit.set_peak_memory_used_bytes_by_compaction_session_process( + session_peak_memory + ) + + compaction_audit.save_round_completion_stats( + mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize + ) + + s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit))) + new_round_completion_info = RoundCompletionInfo.of( last_stream_position_compacted, new_compacted_delta_locator, - PyArrowWriteResult.union([m.pyarrow_write_result for m in mat_results]), + pyarrow_write_result, bit_width_of_sort_keys, last_rebase_source_partition_locator, - manifest_entry_copied_by_reference_ratio, - ) - rcf_source_partition_locator = ( - rebase_source_partition_locator - if rebase_source_partition_locator - else source_partition_locator + compaction_audit.untouched_file_ratio, + audit_url, ) + logger.info( f"partition-{source_partition_locator.partition_values}," f"compacted at: {last_stream_position_compacted}," f"last position: {last_stream_position_to_compact}" ) + return ( partition, new_round_completion_info, diff --git a/deltacat/compute/compactor/model/compaction_session_audit_info.py b/deltacat/compute/compactor/model/compaction_session_audit_info.py new file mode 100644 index 00000000..bf165295 --- /dev/null +++ b/deltacat/compute/compactor/model/compaction_session_audit_info.py @@ -0,0 +1,725 @@ +# Allow classes to use self-referencing Type hints in Python 3.7. +from __future__ import annotations +import logging +from deltacat import logs +from typing import List, Union +from deltacat.compute.compactor.model.hash_bucket_result import HashBucketResult +from deltacat.compute.compactor.model.dedupe_result import DedupeResult +from deltacat.compute.compactor.model.materialize_result import MaterializeResult +from deltacat.utils.performance import timed_invocation +from deltacat.utils.resources import ClusterUtilization, get_size_of_object_in_bytes +from deltacat.compute.compactor import PyArrowWriteResult + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +class CompactionSessionAuditInfo(dict): + + DEDUPE_STEP_NAME = "dedupe" + MATERIALIZE_STEP_NAME = "materialize" + HASH_BUCKET_STEP_NAME = "hashBucket" + + def __init__(self, deltacat_version: str, audit_url: str): + self.set_deltacat_version(deltacat_version) + self.set_audit_url(audit_url) + + @property + def audit_url(self) -> str: + return self.get("auditUrl") + + @property + def deltacat_version(self) -> str: + """ + The deltacat version used to run compaction job. + """ + return self.get("deltacatVersion") + + @property + def input_records(self) -> int: + """ + The total number of records from input deltas that needs to be compacted + (before deduplication). + """ + return self.get("inputRecords") + + @property + def input_file_count(self) -> int: + """ + The total number of input files that needs to be compacted. + """ + return self.get("inputFileCount") + + @property + def uniform_deltas_created(self) -> int: + """ + The total number of unitform deltas fed into the hash bucket step. + """ + return self.get("uniformDeltasCreated") + + @property + def records_deduped(self) -> int: + """ + The total number of records that were deduplicated. For example, + if there are 100 records with a particular primary key, 99 records + will be deduplicated. + """ + return self.get("recordsDeduped") + + @property + def input_size_bytes(self) -> float: + """ + The on-disk size in bytes of the input. + """ + return self.get("inputSizeBytes") + + @property + def hash_bucket_count(self) -> int: + """ + Total number of hash buckets used during compaction. + """ + return self.get("hashBucketCount") + + @property + def cluster_cpu_max(self) -> float: + """ + Total cluster cpu allocated for the compaction job. If it is autoscaling cluster, + max cpu at any time will be reported. + """ + return self.get("clusterCpuMax") + + @property + def compaction_time_in_seconds(self) -> float: + """ + The total time taken by the compaction session to complete. + """ + return self.get("compactionTimeInSeconds") + + @property + def total_object_store_memory_used_bytes(self) -> float: + """ + The total object store memory used by the compaction session across all + nodes in the entire cluster. + """ + return self.get("totalObjectStoreMemoryUsedBytes") + + @property + def peak_memory_used_bytes_per_task(self) -> float: + """ + The peak memory used by a single process in the compaction job. Note that + Ray creates a single process to run each hash bucketing, dedupe and + materialize task and the process is reused. Hence, you may see + monotonically increasing values. Peak memory is important because, + the cluster must be scaled to handle the peak memory per node even + though average memory usage is low. + """ + return self.get("peakMemoryUsedBytesPerTask") + + @property + def peak_memory_used_bytes_per_hash_bucket_task(self) -> float: + """ + The peak memory used by a single hash bucketing process. For example, + if peak usage of hash bucketing process is 40GB, it is not safe to run + more than 3 hash bucketing tasks on a node with 120GB to avoid crashing + due to memory overflow. + """ + return self.get("hashBucketTaskPeakMemoryUsedBytes") + + @property + def peak_memory_used_bytes_per_dedupe_task(self) -> float: + """ + The peak memory used by a single dedupe python process. Note that + results may be max of dedupe and hash bucketing as processes are + reused by Ray to run dedupe and hash bucketing. + """ + return self.get("dedupeTaskPeakMemoryUsedBytes") + + @property + def peak_memory_used_bytes_per_materialize_task(self) -> float: + """ + The peak memory used by a single materialize python process. Note + that results may be max of materialize, dedupe and hash bucketing as + processes are reused by Ray to run all compaction steps. + """ + return self.get("materializeTaskPeakMemoryUsedBytes") + + @property + def hash_bucket_post_object_store_memory_used_bytes(self) -> float: + """ + The total object store memory used by hash bucketing step across + cluster, before dedupe is run. + """ + return self.get("hashBucketPostObjectStoreMemoryUsedBytes") + + @property + def dedupe_post_object_store_memory_used_bytes(self) -> float: + """ + The total object store memory used after dedupe step before materialize is run. + """ + return self.get("dedupePostObjectStoreMemoryUsedBytes") + + @property + def materialize_post_object_store_memory_used_bytes(self) -> float: + """ + The total object store memory used after materialize step. + """ + return self.get("materializePostObjectStoreMemoryUsedBytes") + + @property + def materialize_buckets(self) -> int: + """ + The total number of materialize buckets created. + """ + return self.get("materializeBuckets") + + @property + def hash_bucket_time_in_seconds(self) -> float: + """ + The time taken by hash bucketing step. This includes all hash bucket tasks. + This includes invoke time. + """ + return self.get("hashBucketTimeInSeconds") + + @property + def hash_bucket_invoke_time_in_seconds(self) -> float: + """ + The time taken to invoke and create all hash bucketing tasks. + """ + return self.get("hashBucketInvokeTimeInSeconds") + + @property + def hash_bucket_result_wait_time_in_seconds(self) -> float: + """ + The time it takes ray.get() to resolve after the last hash bucket task has completed. + This value may not be accurate at less than 1 second precision. + """ + return self.get("hashBucketResultWaitTimeInSeconds") + + @property + def dedupe_time_in_seconds(self) -> float: + """ + The time taken by dedupe step. This include all dedupe tasks. + """ + return self.get("dedupeTimeInSeconds") + + @property + def dedupe_invoke_time_in_seconds(self) -> float: + """ + The time taken to invoke all dedupe tasks. + """ + return self.get("dedupeInvokeTimeInSeconds") + + @property + def dedupe_result_wait_time_in_seconds(self) -> float: + """ + The time it takes ray.get() to resolve after the last dedupe task has completed. + This value may not be accurate at less than 1 second precision. + """ + return self.get("dedupeResultWaitTimeInSeconds") + + @property + def materialize_time_in_seconds(self) -> float: + """ + The time taken by materialize step. This includes all materialize tasks. + """ + return self.get("materializeTimeInSeconds") + + @property + def materialize_invoke_time_in_seconds(self) -> float: + """ + The time taken to invoke all materialize tasks. + """ + return self.get("materializeInvokeTimeInSeconds") + + @property + def materialize_result_wait_time_in_seconds(self) -> float: + """ + The time it takes ray.get() to resolve after the last hash bucket task has completed. + This value may not be accurate at less than 1 second precision. + """ + return self.get("materializeResultWaitTimeInSeconds") + + @property + def delta_discovery_time_in_seconds(self) -> float: + """ + The time taken by delta discovery step which is mostly run before hash bucketing is started. + """ + return self.get("deltaDiscoveryTimeInSeconds") + + @property + def output_file_count(self) -> int: + """ + The total number of files in the compacted output (includes untouched files). + """ + return self.get("outputFileCount") + + @property + def output_size_bytes(self) -> float: + """ + The on-disk size of the compacted output including any untouched files. + """ + return self.get("outputSizeBytes") + + @property + def output_size_pyarrow_bytes(self) -> float: + """ + The pyarrow in-memory size of compacted output including any untouched files. + """ + return self.get("outputSizePyarrowBytes") + + @property + def total_cluster_memory_bytes(self) -> float: + """ + The total memory allocated to the cluster. + """ + return self.get("totalClusterMemoryBytes") + + @property + def total_cluster_object_store_memory_bytes(self) -> float: + """ + The total object store memory allocated to the cluster. + """ + return self.get("totalClusterObjectStoreMemoryBytes") + + @property + def untouched_file_count(self) -> int: + """ + The total number of files that were untouched by materialize step. + """ + return self.get("untouchedFileCount") + + @property + def untouched_file_ratio(self) -> float: + """ + The ratio between total number of files untouched and total number of files in the compacted output. + """ + return self.get("untouchedFileRatio") + + @property + def untouched_record_count(self) -> int: + """ + The total number of records untouched during materialization. + """ + return self.get("untouchedRecordCount") + + @property + def untouched_size_bytes(self) -> float: + """ + The on-disk size of the data untouched during materialization. + """ + return self.get("untouchedSizeBytes") + + @property + def telemetry_time_in_seconds(self) -> float: + """ + The total time taken by all the telemetry activity across the nodes in the cluster. This includes + collecting cluster resources information, emitting metrics, etc. + """ + return self.get("telemetryTimeInSeconds") + + @property + def hash_bucket_result_size_bytes(self) -> float: + """ + The size of the results returned by hash bucket step. + """ + return self.get("hashBucketResultSize") + + @property + def dedupe_result_size_bytes(self) -> float: + """ + The size of the results returned by dedupe step. + """ + return self.get("dedupeResultSize") + + @property + def materialize_result_size(self) -> float: + """ + The size of the results returned by materialize step. + """ + return self.get("materializeResultSize") + + @property + def peak_memory_used_bytes_by_compaction_session_process(self) -> float: + """ + The peak memory used by the entrypoint for compaction_session. + """ + return self.get("peakMemoryUsedBytesCompactionSessionProcess") + + # Setters follow + + def set_audit_url(self, audit_url: str) -> CompactionSessionAuditInfo: + self["auditUrl"] = audit_url + return self + + def set_deltacat_version(self, version: str) -> CompactionSessionAuditInfo: + self["deltacatVersion"] = version + return self + + def set_input_records(self, input_records: int) -> CompactionSessionAuditInfo: + self["inputRecords"] = input_records + return self + + def set_input_file_count(self, input_file_count: int) -> CompactionSessionAuditInfo: + self["inputFileCount"] = input_file_count + return self + + def set_uniform_deltas_created( + self, uniform_deltas_created: int + ) -> CompactionSessionAuditInfo: + self["uniformDeltasCreated"] = uniform_deltas_created + return self + + def set_records_deduped(self, records_deduped: int) -> CompactionSessionAuditInfo: + self["recordsDeduped"] = records_deduped + return self + + def set_input_size_bytes( + self, input_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["inputSizeBytes"] = input_size_bytes + return self + + def set_hash_bucket_count( + self, hash_bucket_count: int + ) -> CompactionSessionAuditInfo: + self["hashBucketCount"] = hash_bucket_count + return self + + def set_cluster_cpu_max(self, cluster_cpu_max: float) -> CompactionSessionAuditInfo: + self["clusterCpuMax"] = cluster_cpu_max + return self + + def set_compaction_time_in_seconds( + self, compaction_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["compactionTimeInSeconds"] = compaction_time_in_seconds + return self + + def set_total_object_store_memory_used_bytes( + self, total_object_store_memory_used_bytes: float + ) -> CompactionSessionAuditInfo: + self["totalObjectStoreMemoryUsedBytes"] = total_object_store_memory_used_bytes + return self + + def set_peak_memory_used_bytes_per_task( + self, peak_memory_used_bytes: float + ) -> CompactionSessionAuditInfo: + self["peakMemoryUsedBytesPerTask"] = peak_memory_used_bytes + return self + + def set_peak_memory_used_bytes_per_hash_bucket_task( + self, peak_memory_used_bytes_per_hash_bucket_task: float + ) -> CompactionSessionAuditInfo: + self[ + "hashBucketTaskPeakMemoryUsedBytes" + ] = peak_memory_used_bytes_per_hash_bucket_task + return self + + def set_peak_memory_used_bytes_per_dedupe_task( + self, peak_memory_used_bytes_per_dedupe_task: float + ) -> CompactionSessionAuditInfo: + self["dedupeTaskPeakMemoryUsedBytes"] = peak_memory_used_bytes_per_dedupe_task + return self + + def set_peak_memory_used_bytes_per_materialize_task( + self, peak_memory_used_bytes_per_materialize_task: float + ) -> CompactionSessionAuditInfo: + self[ + "materializeTaskPeakMemoryUsedBytes" + ] = peak_memory_used_bytes_per_materialize_task + return self + + def set_hash_bucket_post_object_store_memory_used_bytes( + self, object_store_memory_used_bytes_by_hb: float + ) -> CompactionSessionAuditInfo: + self[ + "hashBucketPostObjectStoreMemoryUsedBytes" + ] = object_store_memory_used_bytes_by_hb + return self + + def set_dedupe_post_object_store_memory_used_bytes( + self, object_store_memory_used_bytes_by_dedupe: float + ) -> CompactionSessionAuditInfo: + self[ + "dedupePostObjectStoreMemoryUsedBytes" + ] = object_store_memory_used_bytes_by_dedupe + return self + + def set_materialize_post_object_store_memory_used_bytes( + self, object_store_memory_used_bytes_by_dedupe: float + ) -> CompactionSessionAuditInfo: + self[ + "materializePostObjectStoreMemoryUsedBytes" + ] = object_store_memory_used_bytes_by_dedupe + return self + + def set_materialize_buckets( + self, materialize_buckets: int + ) -> CompactionSessionAuditInfo: + self["materializeBuckets"] = materialize_buckets + return self + + def set_hash_bucket_time_in_seconds( + self, hash_bucket_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["hashBucketTimeInSeconds"] = hash_bucket_time_in_seconds + return self + + def set_hash_bucket_invoke_time_in_seconds( + self, hash_bucket_invoke_time: float + ) -> CompactionSessionAuditInfo: + self["hashBucketInvokeTimeInSeconds"] = hash_bucket_invoke_time + return self + + def set_hash_bucket_result_wait_time_in_seconds( + self, wait_time: float + ) -> CompactionSessionAuditInfo: + self.get["hashBucketResultWaitTimeInSeconds"] = wait_time + return self + + def set_dedupe_time_in_seconds( + self, dedupe_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["dedupeTimeInSeconds"] = dedupe_time_in_seconds + return self + + def set_dedupe_invoke_time_in_seconds( + self, dedupe_invoke_time: float + ) -> CompactionSessionAuditInfo: + self["dedupeInvokeTimeInSeconds"] = dedupe_invoke_time + return self + + def set_dedupe_result_wait_time_in_seconds( + self, wait_time: float + ) -> CompactionSessionAuditInfo: + self.get["dedupeResultWaitTimeInSeconds"] = wait_time + return self + + def set_materialize_time_in_seconds( + self, materialize_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["materializeTimeInSeconds"] = materialize_time_in_seconds + return self + + def set_materialize_invoke_time_in_seconds( + self, materialize_invoke_time: float + ) -> CompactionSessionAuditInfo: + self["materializeInvokeTimeInSeconds"] = materialize_invoke_time + return self + + def set_materialize_result_wait_time_in_seconds( + self, wait_time: float + ) -> CompactionSessionAuditInfo: + self.get["materializeResultWaitTimeInSeconds"] = wait_time + return self + + def set_delta_discovery_time_in_seconds( + self, delta_discovery_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["deltaDiscoveryTimeInSeconds"] = delta_discovery_time_in_seconds + return self + + def set_output_file_count( + self, output_file_count: float + ) -> CompactionSessionAuditInfo: + self["outputFileCount"] = output_file_count + return self + + def set_output_size_bytes( + self, output_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["outputSizeBytes"] = output_size_bytes + return output_size_bytes + + def set_output_size_pyarrow_bytes( + self, output_size_pyarrow_bytes: float + ) -> CompactionSessionAuditInfo: + self["outputSizePyarrowBytes"] = output_size_pyarrow_bytes + return output_size_pyarrow_bytes + + def set_total_cluster_memory_bytes( + self, total_cluster_memory_bytes: float + ) -> CompactionSessionAuditInfo: + self["totalClusterMemoryBytes"] = total_cluster_memory_bytes + return self + + def set_total_cluster_object_store_memory_bytes( + self, total_cluster_object_store_memory_bytes: float + ) -> CompactionSessionAuditInfo: + self[ + "totalClusterObjectStoreMemoryBytes" + ] = total_cluster_object_store_memory_bytes + return self + + def set_untouched_file_count( + self, untouched_file_count: int + ) -> CompactionSessionAuditInfo: + self["untouchedFileCount"] = untouched_file_count + return self + + def set_untouched_file_ratio( + self, untouched_file_ratio: float + ) -> CompactionSessionAuditInfo: + self["untouchedFileRatio"] = untouched_file_ratio + return self + + def set_untouched_record_count( + self, untouched_record_count: int + ) -> CompactionSessionAuditInfo: + self["untouchedRecordCount"] = untouched_record_count + return self + + def set_untouched_size_bytes( + self, untouched_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["untouchedSizeBytes"] = untouched_size_bytes + return self + + def set_telemetry_time_in_seconds( + self, telemetry_time_in_seconds: float + ) -> CompactionSessionAuditInfo: + self["telemetryTimeInSeconds"] = telemetry_time_in_seconds + return self + + def set_hash_bucket_result_size_bytes( + self, hash_bucket_result_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["hashBucketResultSize"] = hash_bucket_result_size_bytes + return self + + def set_dedupe_result_size_bytes( + self, dedupe_result_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["dedupeResultSize"] = dedupe_result_size_bytes + return self + + def set_materialize_result_size_bytes( + self, materialize_result_size_bytes: float + ) -> CompactionSessionAuditInfo: + self["materializeResultSize"] = materialize_result_size_bytes + return self + + def set_peak_memory_used_bytes_by_compaction_session_process( + self, peak_memory: float + ) -> CompactionSessionAuditInfo: + self["peakMemoryUsedBytesCompactionSessionProcess"] = peak_memory + return self + + # High level methods to save stats + def save_step_stats( + self, + step_name: str, + task_results: Union[ + List[HashBucketResult], List[DedupeResult], List[MaterializeResult] + ], + task_results_retrieved_at: float, + invoke_time_in_seconds: float, + task_time_in_seconds: float, + ) -> float: + """ + Saves the stats by calling individual setters and returns the cluster telemetry time. + """ + + last_task_completed_at = max( + result.task_completed_at for result in task_results + ) + + self[f"{step_name}ResultWaitTimeInSeconds"] = ( + task_results_retrieved_at - last_task_completed_at.item() + ) + self[f"{step_name}TimeInSeconds"] = task_time_in_seconds + self[f"{step_name}InvokeTimeInSeconds"] = invoke_time_in_seconds + + self[f"{step_name}ResultSize"] = get_size_of_object_in_bytes(task_results) + + ( + cluster_utilization_after_task, + cluster_util_after_task_latency, + ) = timed_invocation(ClusterUtilization.get_current_cluster_utilization) + + self.set_total_cluster_object_store_memory_bytes( + cluster_utilization_after_task.total_object_store_memory_bytes + ) + self.set_total_cluster_memory_bytes( + cluster_utilization_after_task.total_memory_bytes + ) + self.set_total_object_store_memory_used_bytes( + cluster_utilization_after_task.used_object_store_memory_bytes + ) + + self[ + f"{step_name}PostObjectStoreMemoryUsedBytes" + ] = cluster_utilization_after_task.used_object_store_memory_bytes + + peak_task_memory = max( + result.peak_memory_usage_bytes for result in task_results + ) + + telemetry_time = sum( + result.telemetry_time_in_seconds for result in task_results + ) + + self[f"{step_name}TaskPeakMemoryUsedBytes"] = peak_task_memory.item() + + return cluster_util_after_task_latency + telemetry_time + + def save_round_completion_stats( + self, mat_results: List[MaterializeResult], total_telemetry_time: float + ) -> None: + """ + This method saves all the relevant stats after all the steps are completed. + """ + pyarrow_write_result = PyArrowWriteResult.union( + [m.pyarrow_write_result for m in mat_results] + ) + + total_count_of_src_dfl_not_touched = sum( + m.referenced_pyarrow_write_result.files for m in mat_results + ) + + logger.info( + f"Got total of {total_count_of_src_dfl_not_touched} manifest files not touched." + ) + logger.info( + f"Got total of {pyarrow_write_result.files} manifest files during compaction." + ) + manifest_entry_copied_by_reference_ratio = ( + ( + round( + total_count_of_src_dfl_not_touched / pyarrow_write_result.files, 4 + ) + * 100 + ) + if pyarrow_write_result.files != 0 + else None + ) + logger.info( + f"{manifest_entry_copied_by_reference_ratio} percent of manifest files are copied by reference during materialize." + ) + + untouched_file_record_count = sum( + m.referenced_pyarrow_write_result.records for m in mat_results + ) + untouched_file_size_bytes = sum( + m.referenced_pyarrow_write_result.file_bytes for m in mat_results + ) + + self.set_untouched_file_count(total_count_of_src_dfl_not_touched) + self.set_untouched_file_ratio(manifest_entry_copied_by_reference_ratio) + self.set_untouched_record_count(untouched_file_record_count) + self.set_untouched_size_bytes(untouched_file_size_bytes) + + self.set_output_file_count(pyarrow_write_result.files) + self.set_output_size_bytes(pyarrow_write_result.file_bytes) + self.set_output_size_pyarrow_bytes(pyarrow_write_result.pyarrow_bytes) + + self.set_peak_memory_used_bytes_per_task( + max( + [ + self.peak_memory_used_bytes_per_hash_bucket_task, + self.peak_memory_used_bytes_per_dedupe_task, + self.peak_memory_used_bytes_per_materialize_task, + ] + ) + ) + + self.set_telemetry_time_in_seconds(total_telemetry_time) diff --git a/deltacat/compute/compactor/model/dedupe_result.py b/deltacat/compute/compactor/model/dedupe_result.py index 3728c42f..8c297652 100644 --- a/deltacat/compute/compactor/model/dedupe_result.py +++ b/deltacat/compute/compactor/model/dedupe_result.py @@ -6,3 +6,6 @@ class DedupeResult(NamedTuple): mat_bucket_idx_to_obj_id: Dict[int, Tuple] deduped_record_count: np.int64 + peak_memory_usage_bytes: np.double + telemetry_time_in_seconds: np.double + task_completed_at: np.double diff --git a/deltacat/compute/compactor/model/hash_bucket_result.py b/deltacat/compute/compactor/model/hash_bucket_result.py index d9e8c95e..e0e6043c 100644 --- a/deltacat/compute/compactor/model/hash_bucket_result.py +++ b/deltacat/compute/compactor/model/hash_bucket_result.py @@ -6,3 +6,6 @@ class HashBucketResult(NamedTuple): hash_bucket_group_to_obj_id: np.ndarray hb_record_count: np.int64 + peak_memory_usage_bytes: np.double + telemetry_time_in_seconds: np.double + task_completed_at: np.double diff --git a/deltacat/compute/compactor/model/materialize_result.py b/deltacat/compute/compactor/model/materialize_result.py index 1b12059a..a6d38bc8 100644 --- a/deltacat/compute/compactor/model/materialize_result.py +++ b/deltacat/compute/compactor/model/materialize_result.py @@ -2,6 +2,7 @@ from __future__ import annotations from typing import Any, Dict, Optional +import numpy as np from deltacat.compute.compactor.model.pyarrow_write_result import PyArrowWriteResult from deltacat.storage import Delta @@ -13,15 +14,19 @@ def of( delta: Delta, task_index: int, pyarrow_write_result: PyArrowWriteResult, - count_of_src_dfl_not_touched: Optional[int] = 0, - count_of_src_dfl: Optional[int] = 0, + referenced_pyarrow_write_result: Optional[PyArrowWriteResult] = None, + peak_memory_usage_bytes: Optional[np.double] = None, + telemetry_time_in_seconds: Optional[np.double] = None, + task_completed_at: Optional[np.double] = None, ) -> MaterializeResult: materialize_result = MaterializeResult() materialize_result["delta"] = delta materialize_result["taskIndex"] = task_index materialize_result["paWriteResult"] = pyarrow_write_result - materialize_result["countOfSrcFileNotTouched"] = count_of_src_dfl_not_touched - materialize_result["countOfSrcFile"] = count_of_src_dfl + materialize_result["referencedPaWriteResult"] = referenced_pyarrow_write_result + materialize_result["peakMemoryUsageBytes"] = peak_memory_usage_bytes + materialize_result["telemetryTimeInSeconds"] = telemetry_time_in_seconds + materialize_result["taskCompletedAt"] = task_completed_at return materialize_result @property @@ -35,6 +40,14 @@ def delta(self) -> Delta: def task_index(self) -> int: return self["taskIndex"] + @property + def peak_memory_usage_bytes(self) -> Optional[np.double]: + return self["peakMemoryUsageBytes"] + + @property + def telemetry_time_in_seconds(self) -> Optional[np.double]: + return self["telemetryTimeInSeconds"] + @property def pyarrow_write_result(self) -> PyArrowWriteResult: val: Dict[str, Any] = self.get("paWriteResult") @@ -47,5 +60,13 @@ def count_of_src_dfl_not_touched(self) -> int: return self["countOfSrcFileNotTouched"] @property - def count_of_src_dfl(self) -> int: - return self["countOfSrcFile"] + def referenced_pyarrow_write_result(self) -> PyArrowWriteResult: + val: Dict[str, Any] = self.get("referencedPaWriteResult") + if val is not None and not isinstance(val, PyArrowWriteResult): + self["referencedPaWriteResult"] = val = PyArrowWriteResult(val) + + return val + + @property + def task_completed_at(self) -> Optional[np.double]: + return self["taskCompletedAt"] diff --git a/deltacat/compute/compactor/model/round_completion_info.py b/deltacat/compute/compactor/model/round_completion_info.py index ce7102e6..9fbd4235 100644 --- a/deltacat/compute/compactor/model/round_completion_info.py +++ b/deltacat/compute/compactor/model/round_completion_info.py @@ -3,6 +3,9 @@ from deltacat.storage import DeltaLocator, PartitionLocator from deltacat.compute.compactor.model.pyarrow_write_result import PyArrowWriteResult +from deltacat.compute.compactor.model.compaction_session_audit_info import ( + CompactionSessionAuditInfo, +) from typing import Any, Dict, Optional @@ -39,6 +42,7 @@ def of( sort_keys_bit_width: int, rebase_source_partition_locator: Optional[PartitionLocator], manifest_entry_copied_by_reference_ratio: Optional[float] = None, + compaction_audit_url: Optional[str] = None, ) -> RoundCompletionInfo: rci = RoundCompletionInfo() @@ -50,6 +54,7 @@ def of( rci[ "manifestEntryCopiedByReferenceRatio" ] = manifest_entry_copied_by_reference_ratio + rci["compactionAuditUrl"] = compaction_audit_url return rci @property @@ -81,6 +86,10 @@ def compacted_pyarrow_write_result(self) -> PyArrowWriteResult: def sort_keys_bit_width(self) -> int: return self["sortKeysBitWidth"] + @property + def compaction_audit(self) -> Optional[CompactionSessionAuditInfo]: + return self.get("compactionAudit") + @property def rebase_source_partition_locator(self) -> Optional[PartitionLocator]: return self.get("rebaseSourcePartitionLocator") diff --git a/deltacat/compute/compactor/steps/dedupe.py b/deltacat/compute/compactor/steps/dedupe.py index bf0714ae..13f33e72 100644 --- a/deltacat/compute/compactor/steps/dedupe.py +++ b/deltacat/compute/compactor/steps/dedupe.py @@ -1,9 +1,9 @@ import importlib import logging +import time from collections import defaultdict from contextlib import nullcontext from typing import Any, Dict, List, Tuple - import numpy as np import pyarrow as pa import pyarrow.compute as pc @@ -25,6 +25,7 @@ ) from deltacat.utils.performance import timed_invocation from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig +from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes if importlib.util.find_spec("memray"): import memray @@ -231,8 +232,14 @@ def _timed_dedupe( f"{len(mat_bucket_to_dd_idx_obj_id)}" ) + peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes() + return DedupeResult( - mat_bucket_to_dd_idx_obj_id, np.int64(total_deduped_records) + mat_bucket_to_dd_idx_obj_id, + np.int64(total_deduped_records), + np.double(peak_memory_usage_bytes), + np.double(0.0), + np.double(time.time()), ) @@ -254,10 +261,22 @@ def dedupe( dedupe_task_index=dedupe_task_index, enable_profiler=enable_profiler, ) + + emit_metrics_time = 0.0 if metrics_config: - emit_timer_metrics( - metrics_name="dedupe", value=duration, metrics_config=metrics_config + emit_result, latency = timed_invocation( + func=emit_timer_metrics, + metrics_name="dedupe", + value=duration, + metrics_config=metrics_config, ) + emit_metrics_time = latency logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...") - return dedupe_result + return DedupeResult( + dedupe_result[0], + dedupe_result[1], + dedupe_result[2], + np.double(emit_metrics_time), + dedupe_result[4], + ) diff --git a/deltacat/compute/compactor/steps/hash_bucket.py b/deltacat/compute/compactor/steps/hash_bucket.py index f3f66d91..c7fc3ce1 100644 --- a/deltacat/compute/compactor/steps/hash_bucket.py +++ b/deltacat/compute/compactor/steps/hash_bucket.py @@ -1,5 +1,6 @@ import importlib import logging +import time from contextlib import nullcontext from itertools import chain from typing import Generator, List, Optional, Tuple @@ -30,6 +31,7 @@ from deltacat.utils.common import ReadKwargsProvider from deltacat.utils.performance import timed_invocation from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig +from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes if importlib.util.find_spec("memray"): import memray @@ -207,8 +209,14 @@ def _timed_hash_bucket( num_buckets, num_groups, ) + + peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes() return HashBucketResult( - hash_bucket_group_to_obj_id, np.int64(total_record_count) + hash_bucket_group_to_obj_id, + np.int64(total_record_count), + np.double(peak_memory_usage_bytes), + np.double(0.0), + np.double(time.time()), ) @@ -239,9 +247,22 @@ def hash_bucket( read_kwargs_provider=read_kwargs_provider, deltacat_storage=deltacat_storage, ) + + emit_metrics_time = 0.0 if metrics_config: - emit_timer_metrics( - metrics_name="hash_bucket", value=duration, metrics_config=metrics_config + emit_result, latency = timed_invocation( + func=emit_timer_metrics, + metrics_name="hash_bucket", + value=duration, + metrics_config=metrics_config, ) + emit_metrics_time = latency + logger.info(f"Finished hash bucket task...") - return hash_bucket_result + return HashBucketResult( + hash_bucket_result[0], + hash_bucket_result[1], + hash_bucket_result[2], + np.double(emit_metrics_time), + hash_bucket_result[4], + ) diff --git a/deltacat/compute/compactor/steps/materialize.py b/deltacat/compute/compactor/steps/materialize.py index b69fd120..1a2fbe6e 100644 --- a/deltacat/compute/compactor/steps/materialize.py +++ b/deltacat/compute/compactor/steps/materialize.py @@ -7,6 +7,7 @@ from itertools import chain, repeat from typing import List, Optional, Tuple, Dict, Any, Union import pyarrow as pa +import numpy as np import ray from ray import cloudpickle from deltacat import logs @@ -46,6 +47,7 @@ get_current_ray_worker_id, ) from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig +from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes if importlib.util.find_spec("memray"): import memray @@ -253,7 +255,7 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult: untouched_src_manifest_entry = manifest.entries[src_file_idx_np.item()] manifest_entry_list_reference.append(untouched_src_manifest_entry) referenced_pyarrow_write_result = PyArrowWriteResult.of( - len(untouched_src_manifest_entry.entries), + 1, TABLE_CLASS_TO_SIZE_FUNC[type(pa_table)](pa_table), manifest.meta.content_length, len(pa_table), @@ -274,15 +276,11 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult: referenced_manifest_delta = ( _stage_delta_from_manifest_entry_reference_list( - manifest_entry_list_reference + manifest_entry_list_reference, partition ) if manifest_entry_list_reference else None ) - if referenced_manifest_delta: - logger.info( - f"Got delta with {len(referenced_manifest_delta.manifest.entries)} referenced manifest entries" - ) merged_materialized_delta = [mr.delta for mr in materialized_results] merged_materialized_delta.append(referenced_manifest_delta) @@ -290,33 +288,58 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult: [d for d in merged_materialized_delta if d is not None] ) - write_results_union = referenced_pyarrow_write_results + write_results_union = [*referenced_pyarrow_write_results] if materialized_results: for mr in materialized_results: write_results_union.append(mr.pyarrow_write_result) write_result = PyArrowWriteResult.union(write_results_union) + referenced_write_result = PyArrowWriteResult.union( + referenced_pyarrow_write_results + ) + + if referenced_manifest_delta: + logger.info( + f"Got delta with {len(referenced_manifest_delta.manifest.entries)} referenced manifest entries" + ) + assert referenced_write_result.files == len( + referenced_manifest_delta.manifest.entries + ), "The files referenced must match with the entries in the delta" + + assert write_result.files == len( + merged_delta.manifest.entries + ), "The total number of files written by materialize must match manifest entries" logger.debug( - f"{len(write_results_union)} files written" - f" with records: {[wr.records for wr in write_results_union]}" - ) - # Merge all new deltas into one for this materialize bucket index - merged_materialize_result = MaterializeResult.of( - merged_delta, - mat_bucket_index, - write_result, - len(manifest_entry_list_reference), - count_of_src_dfl, + f"{write_result.files} files written" + f" with records: {write_result.records}" ) logger.info(f"Finished materialize task...") end = time.time() duration = end - start + + emit_metrics_time = 0.0 if metrics_config: - emit_timer_metrics( + emit_result, latency = timed_invocation( + func=emit_timer_metrics, metrics_name="materialize", value=duration, metrics_config=metrics_config, ) + emit_metrics_time = latency logger.info(f"Materialize task ended in {end - start}s") + + peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes() + + # Merge all new deltas into one for this materialize bucket index + merged_materialize_result = MaterializeResult.of( + merged_delta, + mat_bucket_index, + write_result, + referenced_write_result, + np.double(peak_memory_usage_bytes), + np.double(emit_metrics_time), + np.double(time.time()), + ) + return merged_materialize_result diff --git a/deltacat/compute/compactor/utils/io.py b/deltacat/compute/compactor/utils/io.py index 1c9c1b1a..c30649d5 100644 --- a/deltacat/compute/compactor/utils/io.py +++ b/deltacat/compute/compactor/utils/io.py @@ -16,6 +16,9 @@ from deltacat.compute.compactor import DeltaAnnotated from typing import Dict, List, Optional, Tuple, Union from deltacat.compute.compactor import HighWatermark +from deltacat.compute.compactor.model.compaction_session_audit_info import ( + CompactionSessionAuditInfo, +) logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) @@ -94,6 +97,7 @@ def limit_input_deltas( hash_bucket_count: int, user_hash_bucket_chunk_size: int, input_deltas_stats: Dict[int, DeltaStats], + compaction_audit: CompactionSessionAuditInfo, deltacat_storage=unimplemented_deltacat_storage, ) -> Tuple[List[DeltaAnnotated], int, HighWatermark, bool]: # TODO (pdames): when row counts are available in metadata, use them @@ -236,6 +240,11 @@ def limit_input_deltas( # TODO (pdames): Test and add value for min_file_counts ) + compaction_audit.set_input_size_bytes(delta_bytes) + compaction_audit.set_input_file_count(delta_manifest_entries) + compaction_audit.set_total_cluster_memory_bytes(worker_task_mem) + compaction_audit.set_hash_bucket_count(hash_bucket_count) + logger.info(f"Hash bucket chunk size: {hash_bucket_chunk_size}") logger.info(f"Hash bucket count: {hash_bucket_count}") logger.info(f"Input uniform delta count: {len(rebatched_da_list)}") @@ -246,6 +255,7 @@ def limit_input_deltas( def fit_input_deltas( input_deltas: List[Delta], cluster_resources: Dict[str, float], + compaction_audit: CompactionSessionAuditInfo, hash_bucket_count: Optional[int], deltacat_storage=unimplemented_deltacat_storage, ) -> Tuple[List[DeltaAnnotated], int, HighWatermark, bool]: @@ -314,6 +324,11 @@ def estimate_size(content_length): math.ceil(total_memory / MEMORY_TO_HASH_BUCKET_COUNT_RATIO) ) + compaction_audit.set_input_file_count(total_files) + compaction_audit.set_input_size_bytes(delta_bytes) + compaction_audit.set_total_cluster_memory_bytes(total_memory) + compaction_audit.set_hash_bucket_count(hash_bucket_count) + logger.info( f"Input delta bytes: {delta_bytes}, Total files: {total_files}, The worker_cpus: {worker_cpus}, " f" total_memory: {total_memory}, and hash_bucket_count: {hash_bucket_count}" diff --git a/deltacat/compute/compactor/utils/round_completion_file.py b/deltacat/compute/compactor/utils/round_completion_file.py index e602ff52..382e99a5 100644 --- a/deltacat/compute/compactor/utils/round_completion_file.py +++ b/deltacat/compute/compactor/utils/round_completion_file.py @@ -42,6 +42,8 @@ def write_round_completion_file( round_completion_info: RoundCompletionInfo, completion_file_s3_url: str = None, ) -> str: + if bucket is None and completion_file_s3_url is None: + raise AssertionError("Either bucket or completion_file_s3_url must be passed") logger.info(f"writing round completion file contents: {round_completion_info}") if completion_file_s3_url is None: diff --git a/deltacat/storage/model/delta.py b/deltacat/storage/model/delta.py index 2696ea53..6ccf62a5 100644 --- a/deltacat/storage/model/delta.py +++ b/deltacat/storage/model/delta.py @@ -256,7 +256,8 @@ def stream_position(self) -> Optional[int]: class DeltaLocator(Locator, dict): @staticmethod def of( - partition_locator: Optional[PartitionLocator], stream_position: Optional[int] + partition_locator: Optional[PartitionLocator] = None, + stream_position: Optional[int] = None, ) -> DeltaLocator: """ Creates a partition delta locator. Stream Position, if provided, should diff --git a/deltacat/tests/compactor/utils/test_io.py b/deltacat/tests/compactor/utils/test_io.py index 80a41e3c..ccfa8477 100644 --- a/deltacat/tests/compactor/utils/test_io.py +++ b/deltacat/tests/compactor/utils/test_io.py @@ -9,6 +9,12 @@ def setUpClass(cls): cls.module_patcher = mock.patch.dict("sys.modules", {"ray": mock.MagicMock()}) cls.module_patcher.start() + from deltacat.compute.compactor.model.compaction_session_audit_info import ( + CompactionSessionAuditInfo, + ) + + cls.COMPACTION_AUDIT = CompactionSessionAuditInfo("1.0", "test") + super().setUpClass() def test_sanity(self): @@ -19,12 +25,18 @@ def test_sanity(self): hash_bucket_count, high_watermark, require_multiple_rounds, - ) = io.fit_input_deltas([TEST_DELTA], {"CPU": 1, "memory": 20000000}, None) + ) = io.fit_input_deltas( + [TEST_DELTA], {"CPU": 1, "memory": 20000000}, self.COMPACTION_AUDIT, None + ) self.assertIsNotNone(hash_bucket_count) self.assertTrue(1, len(delta_list)) self.assertIsNotNone(high_watermark) self.assertFalse(require_multiple_rounds) + self.assertIsNotNone(hash_bucket_count, self.COMPACTION_AUDIT.hash_bucket_count) + self.assertIsNotNone(self.COMPACTION_AUDIT.input_file_count) + self.assertIsNotNone(self.COMPACTION_AUDIT.input_size_bytes) + self.assertIsNotNone(self.COMPACTION_AUDIT.total_cluster_memory_bytes) def test_when_hash_bucket_count_overridden(self): from deltacat.compute.compactor.utils import io @@ -34,7 +46,9 @@ def test_when_hash_bucket_count_overridden(self): hash_bucket_count, high_watermark, require_multiple_rounds, - ) = io.fit_input_deltas([TEST_DELTA], {"CPU": 1, "memory": 20000000}, 20) + ) = io.fit_input_deltas( + [TEST_DELTA], {"CPU": 1, "memory": 20000000}, self.COMPACTION_AUDIT, 20 + ) self.assertEqual(20, hash_bucket_count) self.assertEqual(1, len(delta_list)) @@ -49,7 +63,9 @@ def test_when_not_enough_memory_splits_manifest_entries(self): hash_bucket_count, high_watermark, require_multiple_rounds, - ) = io.fit_input_deltas([TEST_DELTA], {"CPU": 2, "memory": 10}, 20) + ) = io.fit_input_deltas( + [TEST_DELTA], {"CPU": 2, "memory": 10}, self.COMPACTION_AUDIT, 20 + ) self.assertIsNotNone(hash_bucket_count) self.assertTrue(2, len(delta_list)) @@ -60,10 +76,12 @@ def test_when_no_input_deltas(self): from deltacat.compute.compactor.utils import io with self.assertRaises(AssertionError): - io.fit_input_deltas([], {"CPU": 100, "memory": 20000.0}, None) + io.fit_input_deltas( + [], {"CPU": 100, "memory": 20000.0}, self.COMPACTION_AUDIT, None + ) def test_when_cpu_resources_is_not_passed(self): from deltacat.compute.compactor.utils import io with self.assertRaises(KeyError): - io.fit_input_deltas([], {}, None) + io.fit_input_deltas([], {}, self.COMPACTION_AUDIT, None) diff --git a/deltacat/tests/utils/test_resources.py b/deltacat/tests/utils/test_resources.py index 33220ed7..2c6da46b 100644 --- a/deltacat/tests/utils/test_resources.py +++ b/deltacat/tests/utils/test_resources.py @@ -1,5 +1,6 @@ import unittest from unittest import mock +import sys class TestGetCurrentClusterUtilization(unittest.TestCase): @@ -20,6 +21,10 @@ def setUpClass(cls): cls.module_patcher = mock.patch.dict("sys.modules", {"ray": cls.ray_mock}) cls.module_patcher.start() + # delete reference to reload from mocked ray + if "deltacat.utils.resources" in sys.modules: + del sys.modules["deltacat.utils.resources"] + super().setUpClass() def test_sanity(self): diff --git a/deltacat/utils/ray_utils/concurrency.py b/deltacat/utils/ray_utils/concurrency.py index f9aa7efa..2cdf8efb 100644 --- a/deltacat/utils/ray_utils/concurrency.py +++ b/deltacat/utils/ray_utils/concurrency.py @@ -7,7 +7,6 @@ from ray.types import ObjectRef from deltacat.utils.ray_utils.runtime import current_node_resource_key -from deltacat.utils.resources import log_current_cluster_utilization def invoke_parallel( @@ -47,7 +46,6 @@ def invoke_parallel( Returns: List of Ray object references returned from the submitted tasks. """ - log_current_cluster_utilization(log_identifier=ray_task.__name__) if max_parallelism is not None and max_parallelism <= 0: raise ValueError(f"Max parallelism ({max_parallelism}) must be > 0.") pending_ids = [] diff --git a/deltacat/utils/resources.py b/deltacat/utils/resources.py index 60070eb4..5b368ae5 100644 --- a/deltacat/utils/resources.py +++ b/deltacat/utils/resources.py @@ -2,11 +2,15 @@ from __future__ import annotations import ray +import sys from typing import Dict, Any from dataclasses import dataclass from deltacat import logs import logging -from deltacat.utils.performance import timed_invocation +from resource import getrusage, RUSAGE_SELF +import platform +import psutil + logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) @@ -52,21 +56,29 @@ def get_current_cluster_utilization() -> ClusterUtilization: ) -def log_current_cluster_utilization(log_identifier: str): - cluster_utilization, latency = timed_invocation( - ClusterUtilization.get_current_cluster_utilization - ) - logger.info(f"Retrieved cluster utilization metrics. Took {latency}s") +def get_current_node_peak_memory_usage_in_bytes(): + """ + Returns the peak memory usage of the node in bytes. This method works across + Windows, Darwin and Linux platforms. + """ + current_platform = platform.system() + if current_platform != "Windows": + usage = getrusage(RUSAGE_SELF).ru_maxrss + if current_platform == "Linux": + usage = usage * 1024 + return usage + else: + return psutil.Process().memory_info().peak_wset + - logger.info( - f"Log ID={log_identifier} | Cluster Object store memory used: {cluster_utilization.used_object_store_memory_bytes} " - f"which is {cluster_utilization.used_object_store_memory_percent}%" - ) - logger.info( - f"Log ID={log_identifier} | Total Cluster Memory used: {cluster_utilization.used_memory_bytes} which is " - f"{cluster_utilization.used_memory_percent}%" - ) - logger.info( - f"Log ID={log_identifier} | Total Cluster CPU used: {cluster_utilization.used_cpu} which is " - f"{cluster_utilization.used_cpu_percent}%" - ) +def get_size_of_object_in_bytes(obj: object) -> float: + size = sys.getsizeof(obj) + if isinstance(obj, dict): + return ( + size + + sum(map(get_size_of_object_in_bytes, obj.keys())) + + sum(map(get_size_of_object_in_bytes, obj.values())) + ) + if isinstance(obj, (list, tuple, set, frozenset)): + return size + sum(map(get_size_of_object_in_bytes, obj)) + return size