Skip to content

Commit

Permalink
Refactor aggregate (opensearch-project#708)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
  • Loading branch information
OVI3D0 authored Dec 18, 2024
1 parent 29b11a9 commit 477aad9
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 60 deletions.
133 changes: 73 additions & 60 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from typing import Any, Dict, List, Union
import uuid

from osbenchmark.metrics import FileTestExecutionStore
from osbenchmark.metrics import FileTestExecutionStore, TestExecution
from osbenchmark import metrics, workload, config
from osbenchmark.utils import io as rio

class Aggregator:
def __init__(self, cfg, test_executions_dict, args):
def __init__(self, cfg, test_executions_dict, args) -> None:
self.config = cfg
self.args = args
self.test_executions = test_executions_dict
Expand All @@ -21,69 +21,72 @@ def __init__(self, cfg, test_executions_dict, args):
self.test_procedure_name = None
self.loaded_workload = None

def count_iterations_for_each_op(self, test_execution) -> None:
matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None)
def count_iterations_for_each_op(self, test_execution: TestExecution) -> None:
"""Count iterations for each operation in the test execution"""
workload_params = test_execution.workload_params if test_execution.workload_params else {}

test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}

if matching_test_procedure:
for task in matching_test_procedure.schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
if task_name_iterations in workload_params:
iterations = int(workload_params[task_name_iterations])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_execution_id][task_name] = iterations
else:
raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.")
for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
iterations = int(workload_params.get(task_name_iterations, task.iterations or 1))
self.accumulated_iterations[test_execution_id][task_name] = iterations

def accumulate_results(self, test_execution: Any) -> None:
for item in test_execution.results.get("op_metrics", []):
task = item.get("task", "")
def accumulate_results(self, test_execution: TestExecution) -> None:
"""Accumulate results from a single test execution"""
for operation_metric in test_execution.results.get("op_metrics", []):
task = operation_metric.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in self.metrics:
self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))
self.accumulated_results[task][metric].append(operation_metric.get(metric))

def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

# retrieve nested value from a dictionary given a key path
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
"""
Aggregates JSON results across multiple test executions using a specified key path.
Handles nested dictionary structures and calculates averages for numeric values
"""
all_json_results = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

def get_nested_value(json_data: Dict[str, Any], path: List[str]) -> Any:
"""
Retrieves a value from a nested dictionary structure using a path of keys.
"""
for key in path:
if isinstance(obj, dict):
obj = obj.get(key, {})
elif isinstance(obj, list) and key.isdigit():
obj = obj[int(key)] if int(key) < len(obj) else {}
if isinstance(json_data, dict):
json_data = json_data.get(key, {})
elif isinstance(json_data, list) and key.isdigit():
json_data = json_data[int(key)] if int(key) < len(json_data) else {}
else:
return None
return obj
return json_data

def aggregate_helper(objects: List[Any]) -> Any:
if not objects:
def aggregate_json_elements(json_elements: List[Any]) -> Any:
if not json_elements:
return None
if all(isinstance(obj, (int, float)) for obj in objects):
avg = sum(objects) / len(objects)
return avg
if all(isinstance(obj, dict) for obj in objects):
keys = set().union(*objects)
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys}
if all(isinstance(obj, list) for obj in objects):
max_length = max(len(obj) for obj in objects)
return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)]
return next((obj for obj in objects if obj is not None), None)
# If all elements are numbers, calculate the average
if all(isinstance(obj, (int, float)) for obj in json_elements):
return sum(json_elements) / len(json_elements)
# If all elements are dictionaries, recursively aggregate their values
if all(isinstance(obj, dict) for obj in json_elements):
keys = set().union(*json_elements)
return {key: aggregate_json_elements([obj.get(key) for obj in json_elements]) for key in keys}
# If all elements are lists, recursively aggregate corresponding elements
if all(isinstance(obj, list) for obj in json_elements):
max_length = max(len(obj) for obj in json_elements)
return [aggregate_json_elements([obj[i] if i < len(obj) else None for obj in json_elements]) for i in range(max_length)]
# If elements are of mixed types, return the first non-None value
return next((obj for obj in json_elements if obj is not None), None)

if isinstance(key_path, str):
key_path = key_path.split('.')

values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)
nested_values = [get_nested_value(json_result, key_path) for json_result in all_json_results]
return aggregate_json_elements(nested_values)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
def build_aggregated_results_dict(self) -> Dict[str, Any]:
"""Builds a dictionary of aggregated metrics from all test executions"""
aggregated_results = {
"op_metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
Expand Down Expand Up @@ -147,8 +150,30 @@ def build_aggregated_results(self):

aggregated_results["op_metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
return aggregated_results

def update_config_object(self, test_execution: TestExecution) -> None:
"""
Updates the configuration object with values from a test execution.
Uses the first test execution as reference since configurations should be identical
"""
current_timestamp = self.config.opts("system", "time.start")
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_execution.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_execution.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_execution.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles)

def build_aggregated_results(self) -> TestExecution:
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = self.build_aggregated_results_dict()

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
Expand All @@ -165,19 +190,7 @@ def build_aggregated_results(self):

print("Aggregate test execution ID: ", test_execution_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_exe.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_exe.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_exe.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)
self.update_config_object(test_exe)

loaded_workload = workload.load_workload(self.config)
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name)
Expand Down Expand Up @@ -223,7 +236,7 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_na

return weighted_metrics

def calculate_rsd(self, values: List[Union[int, float]], metric_name: str):
def calculate_rsd(self, values: List[Union[int, float]], metric_name: str) -> Union[float, str]:
if not values:
raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values")
if len(values) == 1:
Expand Down
1 change: 1 addition & 0 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator):
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure)
mock_test_execution = Mock(test_execution_id="test1", workload_params={})

aggregator.loaded_workload = mock_workload
Expand Down

0 comments on commit 477aad9

Please sign in to comment.