Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capturing all the performance metrics in an audit #146

Merged
merged 11 commits into from
Jun 30, 2023
157 changes: 129 additions & 28 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import functools
import logging
import ray
import time
import json
from deltacat.aws import s3u as s3_utils
import deltacat
from deltacat import logs
import pyarrow as pa
from deltacat.compute.compactor import (
Expand All @@ -12,6 +16,7 @@
)
from deltacat.compute.compactor.model.dedupe_result import DedupeResult
from deltacat.compute.compactor.model.hash_bucket_result import HashBucketResult
from deltacat.compute.compactor.model.materialize_result import MaterializeResult
from deltacat.compute.stats.models.delta_stats import DeltaStats
from deltacat.storage import (
Delta,
Expand All @@ -37,7 +42,11 @@
from typing import List, Set, Optional, Tuple, Dict, Any
from collections import defaultdict
from deltacat.utils.metrics import MetricsConfig
from deltacat.utils.resources import log_current_cluster_utilization
from deltacat.compute.compactor.model.compaction_session_audit_info import (
CompactionSessionAuditInfo,
)
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes


if importlib.util.find_spec("memray"):
import memray
Expand Down Expand Up @@ -188,6 +197,23 @@ def _execute_compaction_round(
**kwargs,
) -> Tuple[Optional[Partition], Optional[RoundCompletionInfo], Optional[str]]:

rcf_source_partition_locator = (
rebase_source_partition_locator
if rebase_source_partition_locator
else source_partition_locator
)

base_audit_url = rcf_source_partition_locator.path(
f"s3://{compaction_artifact_s3_bucket}/compaction-audit"
)
audit_url = f"{base_audit_url}.json"

logger.info(f"Compaction audit will be written to {audit_url}")

compaction_audit = CompactionSessionAuditInfo(deltacat.__version__, audit_url)

compaction_start = time.monotonic()

if not primary_keys:
# TODO (pdames): run simple rebatch to reduce all deltas into 1 delta
# with normalized manifest entry sizes
Expand Down Expand Up @@ -230,6 +256,7 @@ def _execute_compaction_round(
f"{node_resource_keys}"
)

compaction_audit.set_cluster_cpu_max(cluster_cpus)
# create a remote options provider to round-robin tasks across all nodes or allocated bundles
logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}")
round_robin_opt_provider = functools.partial(
Expand Down Expand Up @@ -268,6 +295,7 @@ def _execute_compaction_round(
round_completion_info.high_watermark if round_completion_info else None
)

delta_discovery_start = time.monotonic()
(
input_deltas,
previous_last_stream_position_compacted_on_destination_table,
Expand All @@ -282,6 +310,13 @@ def _execute_compaction_round(
**list_deltas_kwargs,
)

delta_discovery_end = time.monotonic()
compaction_audit.set_delta_discovery_time_in_seconds(
delta_discovery_end - delta_discovery_start
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))

if not input_deltas:
logger.info("No input deltas found to compact.")
return None, None, None
Expand All @@ -298,6 +333,7 @@ def _execute_compaction_round(
io.fit_input_deltas(
input_deltas,
cluster_resources,
compaction_audit,
hash_bucket_count,
deltacat_storage=deltacat_storage,
)
Expand All @@ -307,11 +343,14 @@ def _execute_compaction_round(
cluster_resources,
hash_bucket_count,
min_hash_bucket_chunk_size,
compaction_audit=compaction_audit,
input_deltas_stats=input_deltas_stats,
deltacat_storage=deltacat_storage,
)
)

compaction_audit.set_uniform_deltas_created(len(uniform_deltas))

assert hash_bucket_count is not None and hash_bucket_count > 0, (
f"Expected hash bucket count to be a positive integer, but found "
f"`{hash_bucket_count}`"
Expand All @@ -335,6 +374,8 @@ def _execute_compaction_round(
"Multiple rounds are not supported. Please increase the cluster size and run again."
)

hb_start = time.monotonic()

hb_tasks_pending = invoke_parallel(
items=uniform_deltas,
ray_task=hb.hash_bucket,
Expand All @@ -350,9 +391,25 @@ def _execute_compaction_round(
read_kwargs_provider=read_kwargs_provider,
deltacat_storage=deltacat_storage,
)

hb_invoke_end = time.monotonic()

logger.info(f"Getting {len(hb_tasks_pending)} hash bucket results...")
hb_results: List[HashBucketResult] = ray.get(hb_tasks_pending)
logger.info(f"Got {len(hb_results)} hash bucket results.")
hb_end = time.monotonic()
hb_results_retrieved_at = time.time()

telemetry_time_hb = compaction_audit.save_step_stats(
CompactionSessionAuditInfo.HASH_BUCKET_STEP_NAME,
hb_results,
hb_results_retrieved_at,
hb_invoke_end - hb_start,
hb_end - hb_start,
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))

all_hash_group_idx_to_obj_id = defaultdict(list)
for hb_result in hb_results:
for hash_group_index, object_id in enumerate(
Expand All @@ -367,6 +424,8 @@ def _execute_compaction_round(
f"Got {total_hb_record_count} hash bucket records from hash bucketing step..."
)

compaction_audit.set_input_records(total_hb_record_count.item())

# TODO (pdames): when resources are freed during the last round of hash
# bucketing, start running dedupe tasks that read existing dedupe
# output from S3 then wait for hash bucketing to finish before continuing
Expand All @@ -389,6 +448,9 @@ def _execute_compaction_round(
# identify the index of records to keep or drop based on sort keys
num_materialize_buckets = max_parallelism
logger.info(f"Materialize Bucket Count: {num_materialize_buckets}")

dedupe_start = time.monotonic()

dd_tasks_pending = invoke_parallel(
items=all_hash_group_idx_to_obj_id.values(),
ray_task=dd.dedupe,
Expand All @@ -403,11 +465,31 @@ def _execute_compaction_round(
enable_profiler=enable_profiler,
metrics_config=metrics_config,
)

dedupe_invoke_end = time.monotonic()
logger.info(f"Getting {len(dd_tasks_pending)} dedupe results...")
dd_results: List[DedupeResult] = ray.get(dd_tasks_pending)
logger.info(f"Got {len(dd_results)} dedupe results.")

# we use time.time() here because time.monotonic() has no reference point
# whereas time.time() measures epoch seconds. Hence, it will be reasonable
# to compare time.time()s captured in different nodes.
dedupe_results_retrieved_at = time.time()
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
dedupe_end = time.monotonic()

total_dd_record_count = sum([ddr.deduped_record_count for ddr in dd_results])
logger.info(f"Deduped {total_dd_record_count} records...")

telemetry_time_dd = compaction_audit.save_step_stats(
CompactionSessionAuditInfo.DEDUPE_STEP_NAME,
dd_results,
dedupe_results_retrieved_at,
dedupe_invoke_end - dedupe_start,
dedupe_end - dedupe_start,
)

compaction_audit.set_records_deduped(total_dd_record_count.item())

all_mat_buckets_to_obj_id = defaultdict(list)
for dd_result in dd_results:
for (
Expand All @@ -420,6 +502,8 @@ def _execute_compaction_round(
logger.info(f"Getting {len(dd_tasks_pending)} dedupe result stat(s)...")
logger.info(f"Materialize buckets created: " f"{len(all_mat_buckets_to_obj_id)}")

compaction_audit.set_materialize_buckets(len(all_mat_buckets_to_obj_id))

# TODO(pdames): when resources are freed during the last round of deduping
# start running materialize tasks that read materialization source file
# tables from S3 then wait for deduping to finish before continuing
Expand All @@ -432,6 +516,11 @@ def _execute_compaction_round(

# parallel step 3:
# materialize records to keep by index

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))

materialize_start = time.monotonic()

mat_tasks_pending = invoke_parallel(
items=all_mat_buckets_to_obj_id.items(),
ray_task=mat.materialize,
Expand All @@ -453,30 +542,24 @@ def _execute_compaction_round(
s3_table_writer_kwargs=s3_table_writer_kwargs,
deltacat_storage=deltacat_storage,
)

materialize_invoke_end = time.monotonic()

logger.info(f"Getting {len(mat_tasks_pending)} materialize result(s)...")
mat_results = ray.get(mat_tasks_pending)
total_count_of_src_dfl_not_touched = sum(
m.count_of_src_dfl_not_touched for m in mat_results
)
total_length_src_dfl = sum(m.count_of_src_dfl for m in mat_results)
logger.info(
f"Got total of {total_count_of_src_dfl_not_touched} manifest files not touched."
)
logger.info(
f"Got total of {total_length_src_dfl} manifest files during compaction."
)
manifest_entry_copied_by_reference_ratio = (
(round(total_count_of_src_dfl_not_touched / total_length_src_dfl, 4) * 100)
if total_length_src_dfl != 0
else None
)
logger.info(
f"{manifest_entry_copied_by_reference_ratio} percent of manifest files are copied by reference during materialize."
)
mat_results: List[MaterializeResult] = ray.get(mat_tasks_pending)

logger.info(f"Got {len(mat_results)} materialize result(s).")

log_current_cluster_utilization(log_identifier="post_materialize")
materialize_end = time.monotonic()
materialize_results_retrieved_at = time.time()

telemetry_time_materialize = compaction_audit.save_step_stats(
CompactionSessionAuditInfo.MATERIALIZE_STEP_NAME,
mat_results,
materialize_results_retrieved_at,
materialize_invoke_end - materialize_start,
materialize_end - materialize_start,
)

mat_results = sorted(mat_results, key=lambda m: m.task_index)
deltas = [m.delta for m in mat_results]
Expand All @@ -494,6 +577,7 @@ def _execute_compaction_round(
f" Materialized records: {merged_delta.meta.record_count}"
)
logger.info(record_info_msg)

assert (
total_hb_record_count - total_dd_record_count == merged_delta.meta.record_count
), (
Expand All @@ -506,6 +590,9 @@ def _execute_compaction_round(
)
logger.info(f"Committed compacted delta: {compacted_delta}")

compaction_end = time.monotonic()
compaction_audit.set_compaction_time_in_seconds(compaction_end - compaction_start)

new_compacted_delta_locator = DeltaLocator.of(
new_compacted_partition_locator,
compacted_delta.stream_position,
Expand All @@ -516,24 +603,38 @@ def _execute_compaction_round(
if round_completion_info
else None
)

pyarrow_write_result = PyArrowWriteResult.union(
[m.pyarrow_write_result for m in mat_results]
)

session_peak_memory = get_current_node_peak_memory_usage_in_bytes()
compaction_audit.set_peak_memory_used_bytes_by_compaction_session_process(
session_peak_memory
)

compaction_audit.save_round_completion_stats(
mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize
)

s3_utils.upload(compaction_audit.audit_url, str(json.dumps(compaction_audit)))

new_round_completion_info = RoundCompletionInfo.of(
last_stream_position_compacted,
new_compacted_delta_locator,
PyArrowWriteResult.union([m.pyarrow_write_result for m in mat_results]),
pyarrow_write_result,
bit_width_of_sort_keys,
last_rebase_source_partition_locator,
manifest_entry_copied_by_reference_ratio,
)
rcf_source_partition_locator = (
rebase_source_partition_locator
if rebase_source_partition_locator
else source_partition_locator
compaction_audit.untouched_file_ratio,
audit_url,
)

logger.info(
f"partition-{source_partition_locator.partition_values},"
f"compacted at: {last_stream_position_compacted},"
f"last position: {last_stream_position_to_compact}"
)

return (
partition,
new_round_completion_info,
Expand Down
Loading