From 3fc2e2438ef95ee1e86bb68287bea28ea5072ca6 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:00:30 -0700 Subject: [PATCH 01/13] Initial commit Signed-off-by: Balaji Veeramani --- .../interfaces/op_runtime_metrics.py | 417 +++++++++--------- python/ray/data/_internal/stats.py | 10 +- 2 files changed, 223 insertions(+), 204 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 68abc36796355..9c2826aea1631 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -1,6 +1,7 @@ +import functools import time -from dataclasses import dataclass, field, fields -from typing import TYPE_CHECKING, Any, Dict, Optional +from dataclasses import Field, dataclass, field, fields +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, TypedDict import ray from ray.data._internal.execution.interfaces.ref_bundle import RefBundle @@ -12,6 +13,75 @@ ) +# A metadata key used to mark a dataclass field as a metric. +_IS_FIELD_METRIC_KEY = "__is_field_metric__" + +_METRICS: List["Metric"] = [] + + +@dataclass(frozen=True) +class Metric: + """Metadata for a metric. + + Args: + name: The name of the metric. + description: A human-readable description of the metric, also used as the chart + description on the Ray Data dashboard. + metrics_group: The group of the metric, used to organize metrics into groups in + 'StatsActor' and on the Ray Data dashboard. + map_only: Whether the metric is only measured for 'MapOperators'. + """ + name: str + description: str + metrics_group: str + map_only: bool = False + + +def metricfield( + *, + description: str, + metrics_group: str = None, + map_only: bool = False, + **field_kwargs, +): + """A dataclass field that represents a metric.""" + if "metadata" in field_kwargs: + raise ValueError("You can't use 'metadata' with 'metricfield'.") + + metadata = {"map_only": map_only, _IS_FIELD_METRIC_KEY: True} + if description is not None: + metadata["description"] = description + if metrics_group is not None: + metadata["metrics_group"] = metrics_group + + return field( + metadata=metadata, + **field_kwargs + ) + + +def metricproperty( + *, + description: str, + metrics_group: str, + map_only: bool = False, +): + """A property that represents a metric.""" + def wrap(func): + metric = Metric( + name=func.__name__, + description=description, + metrics_group=metrics_group, + map_only=map_only, + ) + + _METRICS.append(metric) + + return property(func) + + return wrap + + @dataclass class RunningTaskInfo: inputs: RefBundle @@ -19,259 +89,209 @@ class RunningTaskInfo: bytes_outputs: int +class OpRuntimesMetricsMeta(type): + def __init__(cls, name, bases, dict): + super().__init__(name, bases, dict) + + # Iterate over the attributes and methods of 'OpRuntimeMetrics'. + for name, value in dict.items(): + # If an attribute is a dataclass field and has _IS_FIELD_METRIC_KEY in its + # metadata, then create a metric from the field metadata and add it to the + # list of metrics. See also the 'metricfield' function. + if isinstance(value, Field) and value.metadata.get(_IS_FIELD_METRIC_KEY): + metric = Metric( + name=name, + description=value.metadata.get("description"), + metrics_group=value.metadata.get("metrics_group"), + map_only=value.metadata.get("map_only", False), + ) + _METRICS.append(metric) + + @dataclass -class OpRuntimeMetrics: - """Runtime metrics for a PhysicalOperator. +class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): + """Runtime metrics for a 'PhysicalOperator'. Metrics are updated dynamically during the execution of the Dataset. This class can be used for either observablity or scheduling purposes. DO NOT modify the fields of this class directly. Instead, use the provided callback methods. - - Metric metadata attributes: - - description (required): A human-readable description of the metric, also used as - the chart description on the Ray Data dashboard. - - metrics_group (required): The group of the metric, used to organize metrics - into groups in StatsActor and on the Ray Data dashboard. - - map_only (optional): Whether the metric is only measured for MapOperators. """ # TODO(hchen): Fields tagged with "map_only" currently only work for MapOperator. # We should make them work for all operators by unifying the task execution code. # === Inputs-related metrics === - num_inputs_received: int = field( + num_inputs_received: int = metricfield( default=0, - metadata={ - "description": "Number of input blocks received by operator.", - "metrics_group": "inputs", - }, + description="Number of input blocks received by operator.", + metrics_group="inputs", ) - bytes_inputs_received: int = field( + bytes_inputs_received: int = metricfield( default=0, - metadata={ - "description": "Byte size of input blocks received by operator.", - "metrics_group": "inputs", - }, + description="Byte size of input blocks received by operator.", + metrics_group="inputs", ) - num_task_inputs_processed: int = field( + num_task_inputs_processed: int = metricfield( default=0, - metadata={ - "description": ( - "Number of input blocks that operator's tasks " - "have finished processing." - ), - "metrics_group": "inputs", - "map_only": True, - }, + description=( + "Number of input blocks that operator's tasks have finished processing." + ), + metrics_group="inputs", + map_only=True, ) - bytes_task_inputs_processed: int = field( + bytes_task_inputs_processed: int = metricfield( default=0, - metadata={ - "description": ( - "Byte size of input blocks that operator's tasks " - "have finished processing." - ), - "metrics_group": "inputs", - "map_only": True, - }, + description=( + "Byte size of input blocks that operator's tasks have finished processing." + ), + metrics_group="inputs", + map_only=True, ) - bytes_inputs_of_submitted_tasks: int = field( + bytes_inputs_of_submitted_tasks: int = metricfield( default=0, - metadata={ - "description": "Byte size of input blocks passed to submitted tasks.", - "metrics_group": "inputs", - "map_only": True, - }, + description="Byte size of input blocks passed to submitted tasks.", + metrics_group="inputs", + map_only=True, ) # === Outputs-related metrics === - num_task_outputs_generated: int = field( + num_task_outputs_generated: int = metricfield( default=0, - metadata={ - "description": "Number of output blocks generated by tasks.", - "metrics_group": "outputs", - "map_only": True, - }, + description="Number of output blocks generated by tasks.", + metrics_group="outputs", + map_only=True, ) - bytes_task_outputs_generated: int = field( + bytes_task_outputs_generated: int = metricfield( default=0, - metadata={ - "description": "Byte size of output blocks generated by tasks.", - "metrics_group": "outputs", - "map_only": True, - }, + description="Byte size of output blocks generated by tasks.", + metrics_group="outputs", + map_only=True, ) - rows_task_outputs_generated: int = field( + rows_task_outputs_generated: int = metricfield( default=0, - metadata={ - "description": ("Number of output rows generated by tasks."), - "metrics_group": "outputs", - "map_only": True, - }, + description=("Number of output rows generated by tasks."), + metrics_group="outputs", + map_only=True, ) - num_outputs_taken: int = field( + num_outputs_taken: int = metricfield( default=0, - metadata={ - "description": ( - "Number of output blocks that are already " - "taken by downstream operators." - ), - "metrics_group": "outputs", - }, + description=( + "Number of output blocks that are already taken by downstream operators." + ), + metrics_group="outputs", ) - bytes_outputs_taken: int = field( + bytes_outputs_taken: int = metricfield( default=0, - metadata={ - "description": ( - "Byte size of output blocks that are already " - "taken by downstream operators." - ), - "metrics_group": "outputs", - }, + description=( + "Byte size of output blocks that are already taken by downstream operators." + ), + metrics_group="outputs", ) - num_outputs_of_finished_tasks: int = field( + num_outputs_of_finished_tasks: int = metricfield( default=0, - metadata={ - "description": ( - "Number of generated output blocks that are from finished tasks." - ), - "metrics_group": "outputs", - "map_only": True, - }, + description="Number of generated output blocks that are from finished tasks.", + metrics_group="outputs", + map_only=True, ) - bytes_outputs_of_finished_tasks: int = field( + bytes_outputs_of_finished_tasks: int = metricfield( default=0, - metadata={ - "description": ( - "Byte size of generated output blocks that are from finished tasks." - ), - "metrics_group": "outputs", - "map_only": True, - }, + description="Byte size of generated output blocks that are from finished tasks." + metrics_group="outputs", + map_only=True, ) # === Tasks-related metrics === - num_tasks_submitted: int = field( + num_tasks_submitted: int = metricfield( default=0, - metadata={ - "description": "Number of submitted tasks.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Number of submitted tasks.", + metrics_group="tasks", + map_only=True, ) - num_tasks_running: int = field( + num_tasks_running: int = metricfield( default=0, - metadata={ - "description": "Number of running tasks.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Number of running tasks.", + metrics_group="tasks", + map_only=True, ) - num_tasks_have_outputs: int = field( + num_tasks_have_outputs: int = metricfield( default=0, - metadata={ - "description": "Number of tasks that already have output.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Number of tasks that already have output.", + metrics_group="tasks", + map_only=True, ) - num_tasks_finished: int = field( + num_tasks_finished: int = metricfield( default=0, - metadata={ - "description": "Number of finished tasks.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Number of finished tasks.", + metrics_group="tasks", + map_only=True, ) - num_tasks_failed: int = field( + num_tasks_failed: int = metricfield( default=0, - metadata={ - "description": "Number of failed tasks.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Number of failed tasks.", + metrics_group="tasks", + map_only=True, ) - block_generation_time: float = field( + block_generation_time: float = metricfield( default=0, - metadata={ - "description": "Time spent generating blocks in tasks.", - "metrics_group": "tasks", - "map_only": True, - }, + description="Time spent generating blocks in tasks.", + metrics_group="tasks", + map_only=True, ) - task_submission_backpressure_time: float = field( + task_submission_backpressure_time: float = metricfield( default=0, - metadata={ - "description": "Time spent in task submission backpressure.", - "metrics_group": "tasks", - }, + description="Time spent in task submission backpressure.", + metrics_group="tasks", ) # === Object store memory metrics === - obj_store_mem_internal_inqueue_blocks: int = field( + obj_store_mem_internal_inqueue_blocks: int = metricfield( default=0, - metadata={ - "description": "Number of blocks in operator's internal input queue.", - "metrics_group": "object_store_memory", - }, + description="Number of blocks in operator's internal input queue.", + metrics_group="object_store_memory", ) - obj_store_mem_internal_inqueue: int = field( + obj_store_mem_internal_inqueue: int = metricfield( default=0, - metadata={ - "description": ( - "Byte size of input blocks in the operator's internal input queue." - ), - "metrics_group": "object_store_memory", - }, + description=( + "Byte size of input blocks in the operator's internal input queue." + ), + metrics_group="object_store_memory", ) - obj_store_mem_internal_outqueue_blocks: int = field( + obj_store_mem_internal_outqueue_blocks: int = metricfield( default=0, - metadata={ - "description": "Number of blocks in the operator's internal output queue.", - "metrics_group": "object_store_memory", - }, + description="Number of blocks in the operator's internal output queue.", + metrics_group="object_store_memory", ) - obj_store_mem_internal_outqueue: int = field( + obj_store_mem_internal_outqueue: int = metricfield( default=0, - metadata={ - "description": ( - "Byte size of output blocks in the operator's internal output queue." - ), - "metrics_group": "object_store_memory", - }, + description=( + "Byte size of output blocks in the operator's internal output queue." + ), + metrics_group="object_store_memory", ) - obj_store_mem_pending_task_inputs: int = field( + obj_store_mem_pending_task_inputs: int = metricfield( default=0, - metadata={ - "description": "Byte size of input blocks used by pending tasks.", - "metrics_group": "object_store_memory", - "map_only": True, - }, + description="Byte size of input blocks used by pending tasks.", + metrics_group="object_store_memory", + map_only=True, ) - obj_store_mem_freed: int = field( + obj_store_mem_freed: int = metricfield( default=0, - metadata={ - "description": "Byte size of freed memory in object store.", - "metrics_group": "object_store_memory", - "map_only": True, - }, + description="Byte size of freed memory in object store.", + metrics_group="object_store_memory", + map_only=True, ) - obj_store_mem_spilled: int = field( + obj_store_mem_spilled: int = metricfield( default=0, - metadata={ - "description": "Byte size of spilled memory in object store.", - "metrics_group": "object_store_memory", - "map_only": True, - }, + description="Byte size of spilled memory in object store.", + metrics_group="object_store_memory", + map_only=True, ) - obj_store_mem_used: int = field( + obj_store_mem_used: int = metricfield( default=0, - metadata={ - "description": "Byte size of used memory in object store.", - "metrics_group": "object_store_memory", - }, + description="Byte size of used memory in object store.", + metrics_group="object_store_memory", ) # === Miscellaneous metrics === @@ -292,14 +312,18 @@ def extra_metrics(self) -> Dict[str, Any]: """Return a dict of extra metrics.""" return self._extra_metrics + @classmethod + def get_metrics(self) -> List[Metric]: + return list(_METRICS) + def as_dict(self): """Return a dict representation of the metrics.""" result = [] - for f in fields(self): - if not self._is_map and f.metadata.get("map_only", False): + for metric in self.get_metrics(): + if not self._is_map and metric.map_only: continue - value = getattr(self, f.name) - result.append((f.name, value)) + value = getattr(self, metric.name) + result.append((metric.name, value)) # TODO: record resource usage in OpRuntimeMetrics, # avoid calling self._op.current_processor_usage() @@ -313,12 +337,7 @@ def as_dict(self): result.extend(self._extra_metrics.items()) return dict(result) - @classmethod - def get_metric_keys(cls): - """Return a list of metric keys.""" - return [f.name for f in fields(cls)] + ["cpu_usage", "gpu_usage"] - - @property + @metricproperty def average_num_outputs_per_task(self) -> Optional[float]: """Average number of output blocks per task, or None if no task has finished.""" if self.num_tasks_finished == 0: @@ -326,7 +345,7 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished - @property + @metricproperty def average_bytes_per_output(self) -> Optional[float]: """Average size in bytes of output blocks.""" if self.num_task_outputs_generated == 0: @@ -334,7 +353,7 @@ def average_bytes_per_output(self) -> Optional[float]: else: return self.bytes_task_outputs_generated / self.num_task_outputs_generated - @property + @metricproperty def obj_store_mem_pending_task_outputs(self) -> Optional[float]: """Estimated size in bytes of output blocks in Ray generator buffers. @@ -359,7 +378,7 @@ def obj_store_mem_pending_task_outputs(self) -> Optional[float]: return num_tasks_running * per_task_output - @property + @metricproperty def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: """Estimated size in bytes of output blocks in a task's generator buffer.""" context = ray.data.DataContext.get_current() @@ -377,7 +396,7 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: ) return bytes_per_output * num_pending_outputs - @property + @metricproperty def average_bytes_inputs_per_task(self) -> Optional[float]: """Average size in bytes of ref bundles passed to tasks, or ``None`` if no tasks have been submitted.""" @@ -386,7 +405,7 @@ def average_bytes_inputs_per_task(self) -> Optional[float]: else: return self.bytes_inputs_of_submitted_tasks / self.num_tasks_submitted - @property + @metricproperty def average_bytes_outputs_per_task(self) -> Optional[float]: """Average size in bytes of output blocks per task, or None if no task has finished.""" @@ -395,7 +414,7 @@ def average_bytes_outputs_per_task(self) -> Optional[float]: else: return self.bytes_outputs_of_finished_tasks / self.num_tasks_finished - @property + @metricproperty def average_bytes_change_per_task(self) -> Optional[float]: """Average size difference in bytes of input ref bundles and output ref bundles per task.""" diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 334b6c229ed79..3b360d22909c2 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -267,12 +267,12 @@ def _create_prometheus_metrics_for_execution_metrics( self, metrics_group: str, tag_keys: Tuple[str, ...] ) -> Dict[str, Gauge]: metrics = {} - for field in fields(OpRuntimeMetrics): - if not field.metadata.get("metrics_group") == metrics_group: + for metric in OpRuntimeMetrics.get_metrics(): + if not metric.metrics_group == metrics_group: continue - metric_name = f"data_{field.name}" - metric_description = field.metadata.get("description") - metrics[field.name] = Gauge( + metric_name = f"data_{metric.name}" + metric_description = metric.description + metrics[metric.name] = Gauge( metric_name, description=metric_description, tag_keys=tag_keys, From 8c84864f889d8d2bf0eed962ee71b74a1e5b7ec2 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:00:49 -0700 Subject: [PATCH 02/13] Fix syntax error Signed-off-by: Balaji Veeramani --- .../_internal/execution/interfaces/op_runtime_metrics.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 9c2826aea1631..ccc2d15bf591f 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -31,6 +31,7 @@ class Metric: 'StatsActor' and on the Ray Data dashboard. map_only: Whether the metric is only measured for 'MapOperators'. """ + name: str description: str metrics_group: str @@ -54,10 +55,7 @@ def metricfield( if metrics_group is not None: metadata["metrics_group"] = metrics_group - return field( - metadata=metadata, - **field_kwargs - ) + return field(metadata=metadata, **field_kwargs) def metricproperty( @@ -67,6 +65,7 @@ def metricproperty( map_only: bool = False, ): """A property that represents a metric.""" + def wrap(func): metric = Metric( name=func.__name__, @@ -197,7 +196,7 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): ) bytes_outputs_of_finished_tasks: int = metricfield( default=0, - description="Byte size of generated output blocks that are from finished tasks." + description="Byte size of generated output blocks that are from finished tasks.", metrics_group="outputs", map_only=True, ) From f7e7ef35912f6216521474ab8c3dabb3aa7ba39b Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:01:18 -0700 Subject: [PATCH 03/13] Appease lint Signed-off-by: Balaji Veeramani --- .../_internal/execution/interfaces/op_runtime_metrics.py | 5 ++--- python/ray/data/_internal/stats.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index ccc2d15bf591f..1805c6c55c44b 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -1,7 +1,6 @@ -import functools import time -from dataclasses import Field, dataclass, field, fields -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, TypedDict +from dataclasses import Field, dataclass, field +from typing import TYPE_CHECKING, Any, Dict, List, Optional import ray from ray.data._internal.execution.interfaces.ref_bundle import RefBundle diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 3b360d22909c2..80e92f5fedee6 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -3,7 +3,7 @@ import threading import time from contextlib import contextmanager -from dataclasses import dataclass, fields +from dataclasses import dataclass from typing import Any, Dict, List, Optional, Set, Tuple, Union from uuid import uuid4 From fcee9048322dad7c5eb6ea2ba7c71a165cce57bc Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:01:41 -0700 Subject: [PATCH 04/13] Format files Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/interfaces/op_runtime_metrics.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 1805c6c55c44b..416c9e2f33238 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -195,7 +195,9 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): ) bytes_outputs_of_finished_tasks: int = metricfield( default=0, - description="Byte size of generated output blocks that are from finished tasks.", + description=( + "Byte size of generated output blocks that are from finished tasks." + ), metrics_group="outputs", map_only=True, ) From 93136ac203998853ee3ddcdbb8c11db099a6d9ca Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:23:22 -0700 Subject: [PATCH 05/13] Fix type error Signed-off-by: Balaji Veeramani --- .../interfaces/op_runtime_metrics.py | 40 ++++++++++--------- .../_internal/execution/resource_manager.py | 1 + 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 416c9e2f33238..103fe493e1b90 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -337,7 +337,11 @@ def as_dict(self): result.extend(self._extra_metrics.items()) return dict(result) - @metricproperty + @metricproperty( + description="Average number of blocks generated per task.", + metrics_group="outputs", + map_only=True, + ) def average_num_outputs_per_task(self) -> Optional[float]: """Average number of output blocks per task, or None if no task has finished.""" if self.num_tasks_finished == 0: @@ -345,7 +349,11 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished - @metricproperty + @metricproperty( + description="Average size of task output in bytes.", + metrics_group="outputs", + map_only=True, + ) def average_bytes_per_output(self) -> Optional[float]: """Average size in bytes of output blocks.""" if self.num_task_outputs_generated == 0: @@ -353,7 +361,7 @@ def average_bytes_per_output(self) -> Optional[float]: else: return self.bytes_task_outputs_generated / self.num_task_outputs_generated - @metricproperty + @property def obj_store_mem_pending_task_outputs(self) -> Optional[float]: """Estimated size in bytes of output blocks in Ray generator buffers. @@ -378,7 +386,7 @@ def obj_store_mem_pending_task_outputs(self) -> Optional[float]: return num_tasks_running * per_task_output - @metricproperty + @property def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: """Estimated size in bytes of output blocks in a task's generator buffer.""" context = ray.data.DataContext.get_current() @@ -396,7 +404,11 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: ) return bytes_per_output * num_pending_outputs - @metricproperty + @metricproperty( + description="Average size of task inputs in bytes.", + metrics_group="inputs", + map_only=True, + ) def average_bytes_inputs_per_task(self) -> Optional[float]: """Average size in bytes of ref bundles passed to tasks, or ``None`` if no tasks have been submitted.""" @@ -405,7 +417,11 @@ def average_bytes_inputs_per_task(self) -> Optional[float]: else: return self.bytes_inputs_of_submitted_tasks / self.num_tasks_submitted - @metricproperty + @metricproperty( + description="Average total output size of task in bytes.", + metrics_group="outputs", + map_only=True, + ) def average_bytes_outputs_per_task(self) -> Optional[float]: """Average size in bytes of output blocks per task, or None if no task has finished.""" @@ -414,18 +430,6 @@ def average_bytes_outputs_per_task(self) -> Optional[float]: else: return self.bytes_outputs_of_finished_tasks / self.num_tasks_finished - @metricproperty - def average_bytes_change_per_task(self) -> Optional[float]: - """Average size difference in bytes of input ref bundles and output ref - bundles per task.""" - if ( - self.average_bytes_inputs_per_task is None - or self.average_bytes_outputs_per_task is None - ): - return None - - return self.average_bytes_outputs_per_task - self.average_bytes_inputs_per_task - def on_input_received(self, input: RefBundle): """Callback when the operator receives a new input.""" self.num_inputs_received += 1 diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 1358e683536b7..ac838f8e129e6 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -91,6 +91,7 @@ def _estimate_object_store_memory(self, op, state) -> int: # Pending task outputs. mem_op_internal = op.metrics.obj_store_mem_pending_task_outputs or 0 # Op's internal output buffers. + print(mem_op_internal, op.metrics.obj_store_mem_internal_outqueue) mem_op_internal += op.metrics.obj_store_mem_internal_outqueue # Op's external output buffer. From 4db6eed03680669d18961a98b1f50c9de2b94519 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:24:08 -0700 Subject: [PATCH 06/13] Remove print statement Signed-off-by: Balaji Veeramani --- python/ray/data/_internal/execution/resource_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index ac838f8e129e6..1358e683536b7 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -91,7 +91,6 @@ def _estimate_object_store_memory(self, op, state) -> int: # Pending task outputs. mem_op_internal = op.metrics.obj_store_mem_pending_task_outputs or 0 # Op's internal output buffers. - print(mem_op_internal, op.metrics.obj_store_mem_internal_outqueue) mem_op_internal += op.metrics.obj_store_mem_internal_outqueue # Op's external output buffer. From 3b5f7d8f0a99484c8d60e0fbb5381e62c272fcc4 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 14:32:59 -0700 Subject: [PATCH 07/13] Add constants Signed-off-by: Balaji Veeramani --- .../interfaces/op_runtime_metrics.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 103fe493e1b90..a4517e7f9c1e6 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -13,7 +13,11 @@ # A metadata key used to mark a dataclass field as a metric. -_IS_FIELD_METRIC_KEY = "__is_field_metric__" +_IS_FIELD_METRIC_KEY = "__is_metric" +# Metadata keys used to store information about a metric. +_METRIC_FIELD_DESCRIPTION_KEY = "__metric_description" +_METRIC_FIELD_METRICS_GROUP_KEY = "__metric_metrics_group" +_METRIC_FIELD_IS_MAP_ONLY_KEY = "__metric_is_map_only" _METRICS: List["Metric"] = [] @@ -40,19 +44,18 @@ class Metric: def metricfield( *, description: str, - metrics_group: str = None, + metrics_group: str, map_only: bool = False, **field_kwargs, ): """A dataclass field that represents a metric.""" - if "metadata" in field_kwargs: - raise ValueError("You can't use 'metadata' with 'metricfield'.") + metadata = field_kwargs.get("metadata", {}) + + metadata[_IS_FIELD_METRIC_KEY] = True - metadata = {"map_only": map_only, _IS_FIELD_METRIC_KEY: True} - if description is not None: - metadata["description"] = description - if metrics_group is not None: - metadata["metrics_group"] = metrics_group + metadata[_METRIC_FIELD_DESCRIPTION_KEY] = description + metadata[_METRIC_FIELD_METRICS_GROUP_KEY] = metrics_group + metadata[_METRIC_FIELD_IS_MAP_ONLY_KEY] = map_only return field(metadata=metadata, **field_kwargs) @@ -99,9 +102,9 @@ def __init__(cls, name, bases, dict): if isinstance(value, Field) and value.metadata.get(_IS_FIELD_METRIC_KEY): metric = Metric( name=name, - description=value.metadata.get("description"), - metrics_group=value.metadata.get("metrics_group"), - map_only=value.metadata.get("map_only", False), + description=value.metadata[_METRIC_FIELD_DESCRIPTION_KEY], + metrics_group=value.metadata[_METRIC_FIELD_METRICS_GROUP_KEY], + map_only=value.metadata[_METRIC_FIELD_IS_MAP_ONLY_KEY], ) _METRICS.append(metric) From 9d89c29dab6d6454e07ff144f625972ad51c38e6 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 15:53:11 -0700 Subject: [PATCH 08/13] Address review comments Signed-off-by: Balaji Veeramani --- .../interfaces/op_runtime_metrics.py | 145 ++++++++++-------- python/ray/data/_internal/stats.py | 17 +- 2 files changed, 89 insertions(+), 73 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index a4517e7f9c1e6..5fd7c4e4b67f1 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -1,5 +1,6 @@ import time from dataclasses import Field, dataclass, field +from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional import ray @@ -22,6 +23,14 @@ _METRICS: List["Metric"] = [] +class MetricsGroup(Enum): + INPUTS = "inputs" + OUTPUTS = "outputs" + TASKS = "tasks" + OBJECT_STORE_MEMORY = "object_store_memory" + MISC = "misc" + + @dataclass(frozen=True) class Metric: """Metadata for a metric. @@ -41,7 +50,7 @@ class Metric: map_only: bool = False -def metricfield( +def metric_field( *, description: str, metrics_group: str, @@ -57,10 +66,12 @@ def metricfield( metadata[_METRIC_FIELD_METRICS_GROUP_KEY] = metrics_group metadata[_METRIC_FIELD_IS_MAP_ONLY_KEY] = map_only - return field(metadata=metadata, **field_kwargs) + f = field(metadata=metadata, **field_kwargs) + print(f.name) + return f -def metricproperty( +def metric_property( *, description: str, metrics_group: str, @@ -92,13 +103,15 @@ class RunningTaskInfo: class OpRuntimesMetricsMeta(type): def __init__(cls, name, bases, dict): + # NOTE: `Field.name` isn't set until the dataclass is created, so we can't + # create the metrics in `metric_field` directly. super().__init__(name, bases, dict) # Iterate over the attributes and methods of 'OpRuntimeMetrics'. for name, value in dict.items(): # If an attribute is a dataclass field and has _IS_FIELD_METRIC_KEY in its # metadata, then create a metric from the field metadata and add it to the - # list of metrics. See also the 'metricfield' function. + # list of metrics. See also the 'metric_field' function. if isinstance(value, Field) and value.metadata.get(_IS_FIELD_METRIC_KEY): metric = Metric( name=name, @@ -124,177 +137,177 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): # We should make them work for all operators by unifying the task execution code. # === Inputs-related metrics === - num_inputs_received: int = metricfield( + num_inputs_received: int = metric_field( default=0, description="Number of input blocks received by operator.", - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, ) - bytes_inputs_received: int = metricfield( + bytes_inputs_received: int = metric_field( default=0, description="Byte size of input blocks received by operator.", - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, ) - num_task_inputs_processed: int = metricfield( + num_task_inputs_processed: int = metric_field( default=0, description=( "Number of input blocks that operator's tasks have finished processing." ), - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, map_only=True, ) - bytes_task_inputs_processed: int = metricfield( + bytes_task_inputs_processed: int = metric_field( default=0, description=( "Byte size of input blocks that operator's tasks have finished processing." ), - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, map_only=True, ) - bytes_inputs_of_submitted_tasks: int = metricfield( + bytes_inputs_of_submitted_tasks: int = metric_field( default=0, description="Byte size of input blocks passed to submitted tasks.", - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, map_only=True, ) # === Outputs-related metrics === - num_task_outputs_generated: int = metricfield( + num_task_outputs_generated: int = metric_field( default=0, description="Number of output blocks generated by tasks.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) - bytes_task_outputs_generated: int = metricfield( + bytes_task_outputs_generated: int = metric_field( default=0, description="Byte size of output blocks generated by tasks.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) - rows_task_outputs_generated: int = metricfield( + rows_task_outputs_generated: int = metric_field( default=0, description=("Number of output rows generated by tasks."), - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) - num_outputs_taken: int = metricfield( + num_outputs_taken: int = metric_field( default=0, description=( "Number of output blocks that are already taken by downstream operators." ), - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, ) - bytes_outputs_taken: int = metricfield( + bytes_outputs_taken: int = metric_field( default=0, description=( "Byte size of output blocks that are already taken by downstream operators." ), - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, ) - num_outputs_of_finished_tasks: int = metricfield( + num_outputs_of_finished_tasks: int = metric_field( default=0, description="Number of generated output blocks that are from finished tasks.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) - bytes_outputs_of_finished_tasks: int = metricfield( + bytes_outputs_of_finished_tasks: int = metric_field( default=0, description=( "Byte size of generated output blocks that are from finished tasks." ), - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) # === Tasks-related metrics === - num_tasks_submitted: int = metricfield( + num_tasks_submitted: int = metric_field( default=0, description="Number of submitted tasks.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - num_tasks_running: int = metricfield( + num_tasks_running: int = metric_field( default=0, description="Number of running tasks.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - num_tasks_have_outputs: int = metricfield( + num_tasks_have_outputs: int = metric_field( default=0, description="Number of tasks that already have output.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - num_tasks_finished: int = metricfield( + num_tasks_finished: int = metric_field( default=0, description="Number of finished tasks.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - num_tasks_failed: int = metricfield( + num_tasks_failed: int = metric_field( default=0, description="Number of failed tasks.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - block_generation_time: float = metricfield( + block_generation_time: float = metric_field( default=0, description="Time spent generating blocks in tasks.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, map_only=True, ) - task_submission_backpressure_time: float = metricfield( + task_submission_backpressure_time: float = metric_field( default=0, description="Time spent in task submission backpressure.", - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, ) # === Object store memory metrics === - obj_store_mem_internal_inqueue_blocks: int = metricfield( + obj_store_mem_internal_inqueue_blocks: int = metric_field( default=0, description="Number of blocks in operator's internal input queue.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_internal_inqueue: int = metricfield( + obj_store_mem_internal_inqueue: int = metric_field( default=0, description=( "Byte size of input blocks in the operator's internal input queue." ), - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_internal_outqueue_blocks: int = metricfield( + obj_store_mem_internal_outqueue_blocks: int = metric_field( default=0, description="Number of blocks in the operator's internal output queue.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_internal_outqueue: int = metricfield( + obj_store_mem_internal_outqueue: int = metric_field( default=0, description=( "Byte size of output blocks in the operator's internal output queue." ), - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_pending_task_inputs: int = metricfield( + obj_store_mem_pending_task_inputs: int = metric_field( default=0, description="Byte size of input blocks used by pending tasks.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, map_only=True, ) - obj_store_mem_freed: int = metricfield( + obj_store_mem_freed: int = metric_field( default=0, description="Byte size of freed memory in object store.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, map_only=True, ) - obj_store_mem_spilled: int = metricfield( + obj_store_mem_spilled: int = metric_field( default=0, description="Byte size of spilled memory in object store.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, map_only=True, ) - obj_store_mem_used: int = metricfield( + obj_store_mem_used: int = metric_field( default=0, description="Byte size of used memory in object store.", - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) # === Miscellaneous metrics === @@ -340,9 +353,9 @@ def as_dict(self): result.extend(self._extra_metrics.items()) return dict(result) - @metricproperty( + @metric_property( description="Average number of blocks generated per task.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) def average_num_outputs_per_task(self) -> Optional[float]: @@ -352,9 +365,9 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished - @metricproperty( + @metric_property( description="Average size of task output in bytes.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) def average_bytes_per_output(self) -> Optional[float]: @@ -407,9 +420,9 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: ) return bytes_per_output * num_pending_outputs - @metricproperty( + @metric_property( description="Average size of task inputs in bytes.", - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, map_only=True, ) def average_bytes_inputs_per_task(self) -> Optional[float]: @@ -420,9 +433,9 @@ def average_bytes_inputs_per_task(self) -> Optional[float]: else: return self.bytes_inputs_of_submitted_tasks / self.num_tasks_submitted - @metricproperty( + @metric_property( description="Average total output size of task in bytes.", - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, map_only=True, ) def average_bytes_outputs_per_task(self) -> Optional[float]: diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 80e92f5fedee6..a54810f9ab163 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -12,7 +12,10 @@ import ray from ray.actor import ActorHandle from ray.data._internal.block_list import BlockList -from ray.data._internal.execution.interfaces.op_runtime_metrics import OpRuntimeMetrics +from ray.data._internal.execution.interfaces.op_runtime_metrics import ( + MetricsGroup, + OpRuntimeMetrics, +) from ray.data._internal.util import capfirst from ray.data.block import BlockMetadata from ray.data.context import DataContext @@ -209,7 +212,7 @@ def __init__(self, max_stats=1000): # Inputs-related metrics self.execution_metrics_inputs = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="inputs", + metrics_group=MetricsGroup.INPUTS, tag_keys=op_tags_keys, ) ) @@ -217,7 +220,7 @@ def __init__(self, max_stats=1000): # Outputs-related metrics self.execution_metrics_outputs = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="outputs", + metrics_group=MetricsGroup.OUTPUTS, tag_keys=op_tags_keys, ) ) @@ -225,7 +228,7 @@ def __init__(self, max_stats=1000): # Task-related metrics self.execution_metrics_tasks = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="tasks", + metrics_group=MetricsGroup.TASKS, tag_keys=op_tags_keys, ) ) @@ -233,7 +236,7 @@ def __init__(self, max_stats=1000): # Object store memory-related metrics self.execution_metrics_obj_store_memory = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="object_store_memory", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, tag_keys=op_tags_keys, ) ) @@ -241,7 +244,7 @@ def __init__(self, max_stats=1000): # Miscellaneous metrics self.execution_metrics_misc = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="misc", + metrics_group=MetricsGroup.MISC, tag_keys=op_tags_keys, ) ) @@ -264,7 +267,7 @@ def __init__(self, max_stats=1000): ) def _create_prometheus_metrics_for_execution_metrics( - self, metrics_group: str, tag_keys: Tuple[str, ...] + self, metrics_group: MetricsGroup, tag_keys: Tuple[str, ...] ) -> Dict[str, Gauge]: metrics = {} for metric in OpRuntimeMetrics.get_metrics(): From 42f51da08db453236f3b60a438883d576bda84a5 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 15:54:13 -0700 Subject: [PATCH 09/13] Update python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py Co-authored-by: Scott Lee Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/interfaces/op_runtime_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index a4517e7f9c1e6..4e3f5892d069f 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -172,7 +172,7 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): ) rows_task_outputs_generated: int = metricfield( default=0, - description=("Number of output rows generated by tasks."), + description="Number of output rows generated by tasks.", metrics_group="outputs", map_only=True, ) From 78a0abf565941b9a2e411ab5a5b9811d3026c95f Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 23 Sep 2024 15:54:24 -0700 Subject: [PATCH 10/13] Format files Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/interfaces/op_runtime_metrics.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 5fd7c4e4b67f1..168ce0eb3981c 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -66,9 +66,7 @@ def metric_field( metadata[_METRIC_FIELD_METRICS_GROUP_KEY] = metrics_group metadata[_METRIC_FIELD_IS_MAP_ONLY_KEY] = map_only - f = field(metadata=metadata, **field_kwargs) - print(f.name) - return f + return field(metadata=metadata, **field_kwargs) def metric_property( From 8c2d56f655f5169efe463530b4b17c4d3e2b05ef Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 24 Sep 2024 13:29:52 -0700 Subject: [PATCH 11/13] Address review comments Signed-off-by: Balaji Veeramani --- .../execution/interfaces/op_runtime_metrics.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index f8aa9912aa53c..c42a02aab7d68 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -20,7 +20,7 @@ _METRIC_FIELD_METRICS_GROUP_KEY = "__metric_metrics_group" _METRIC_FIELD_IS_MAP_ONLY_KEY = "__metric_is_map_only" -_METRICS: List["Metric"] = [] +_METRICS: List["MetricDefinition"] = [] class MetricsGroup(Enum): @@ -32,7 +32,7 @@ class MetricsGroup(Enum): @dataclass(frozen=True) -class Metric: +class MetricDefinition: """Metadata for a metric. Args: @@ -47,6 +47,8 @@ class Metric: name: str description: str metrics_group: str + # TODO: Let's refactor this parameter so it isn't tightly coupled with a specific + # operator type (MapOperator). map_only: bool = False @@ -78,7 +80,7 @@ def metric_property( """A property that represents a metric.""" def wrap(func): - metric = Metric( + metric = MetricDefinition( name=func.__name__, description=description, metrics_group=metrics_group, @@ -111,7 +113,7 @@ def __init__(cls, name, bases, dict): # metadata, then create a metric from the field metadata and add it to the # list of metrics. See also the 'metric_field' function. if isinstance(value, Field) and value.metadata.get(_IS_FIELD_METRIC_KEY): - metric = Metric( + metric = MetricDefinition( name=name, description=value.metadata[_METRIC_FIELD_DESCRIPTION_KEY], metrics_group=value.metadata[_METRIC_FIELD_METRICS_GROUP_KEY], @@ -327,7 +329,7 @@ def extra_metrics(self) -> Dict[str, Any]: return self._extra_metrics @classmethod - def get_metrics(self) -> List[Metric]: + def get_metrics(self) -> List[MetricDefinition]: return list(_METRICS) def as_dict(self): From 44d26d1bbf11f9585542ae7ea389b9c442566149 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 24 Sep 2024 14:57:16 -0700 Subject: [PATCH 12/13] Fix bug Signed-off-by: Balaji Veeramani --- python/ray/util/metrics.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/util/metrics.py b/python/ray/util/metrics.py index 526ad27ddb179..e838e30a651c6 100644 --- a/python/ray/util/metrics.py +++ b/python/ray/util/metrics.py @@ -282,15 +282,19 @@ def __init__( super().__init__(name, description, tag_keys) self._metric = CythonGauge(self._name, self._description, self._tag_keys) - def set(self, value: Union[int, float], tags: Dict[str, str] = None): + def set(self, value: Optional[Union[int, float]], tags: Dict[str, str] = None): """Set the gauge to the given `value`. Tags passed in will take precedence over the metric's default tags. Args: - value(int, float): Value to set the gauge to. + value(int, float): Value to set the gauge to. If `None`, this method is a + no-op. tags(Dict[str, str]): Tags to set or override for this gauge. """ + if value is None: + return + if not isinstance(value, (int, float)): raise TypeError(f"value must be int or float, got {type(value)}.") From b607715bd77f1503fb7c7c2c8fa751df842c7b19 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 24 Sep 2024 18:24:17 -0700 Subject: [PATCH 13/13] Fix test Signed-off-by: Balaji Veeramani --- python/ray/data/tests/test_stats.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 775b3c9497028..28caa6f6773d0 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -41,6 +41,10 @@ def gen_expected_metrics( ): if is_map: metrics = [ + "'average_num_outputs_per_task': N", + "'average_bytes_per_output': N", + "'average_bytes_inputs_per_task': N", + "'average_bytes_outputs_per_task': N", "'num_inputs_received': N", "'bytes_inputs_received': N", "'num_task_inputs_processed': N", @@ -531,6 +535,10 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " base_name=ReadRange,\n" " number=N,\n" " extra_metrics={\n" + " average_num_outputs_per_task: N,\n" + " average_bytes_per_output: N,\n" + " average_bytes_inputs_per_task: N,\n" + " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" " bytes_inputs_received: N,\n" " num_task_inputs_processed: N,\n" @@ -641,6 +649,10 @@ def check_stats(): " base_name=MapBatches(),\n" " number=N,\n" " extra_metrics={\n" + " average_num_outputs_per_task: N,\n" + " average_bytes_per_output: N,\n" + " average_bytes_inputs_per_task: N,\n" + " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" " bytes_inputs_received: N,\n" " num_task_inputs_processed: N,\n" @@ -706,6 +718,10 @@ def check_stats(): " base_name=ReadRange,\n" " number=N,\n" " extra_metrics={\n" + " average_num_outputs_per_task: N,\n" + " average_bytes_per_output: N,\n" + " average_bytes_inputs_per_task: N,\n" + " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" " bytes_inputs_received: N,\n" " num_task_inputs_processed: N,\n"