Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregate command #638

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import os
from typing import Any, Dict, List, Union
import uuid

from osbenchmark.metrics import FileTestExecutionStore
from osbenchmark import metrics, workload, config
from osbenchmark.utils import io as rio

class Aggregator:
def __init__(self, cfg, test_executions_dict, args):
self.config = cfg
self.args = args
self.test_executions = test_executions_dict
self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {}
self.accumulated_iterations: Dict[str, int] = {}
self.statistics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we use metrics instead of statistics, which would standardize on official documentation terminiology: https://opensearch.org/docs/latest/benchmark/reference/metrics/metric-keys/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, updated this name

self.test_store = metrics.test_execution_store(self.config)
self.cwd = cfg.opts("node", "benchmark.cwd")

def count_iterations_for_each_op(self) -> None:
loaded_workload = workload.load_workload(self.config)
for test_procedure in loaded_workload.test_procedures:
if test_procedure.name == self.config.opts("workload", "test_procedure.name"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good practice to add error handling here in case test_procedure from the test executions is not found in the loaded_workload.

Even though all test iterations might have the same test procedure by this point, it's possible where users might be using a workload (e.g. a custom workload or modified official workload) where a test procedure isn't available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this!

for task in test_procedure.schedule:
task_name = task.name
iterations = task.iterations or 1
self.accumulated_iterations[task_name] = self.accumulated_iterations.get(task_name, 0) + iterations

def accumulate_results(self, test_execution: Any) -> None:
for item in test_execution.results.get("op_metrics", []):
task = item.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in self.statistics:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clean!

self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))

def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

# retrieve nested value from a dictionary given a key path
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
for key in path:
if isinstance(obj, dict):
obj = obj.get(key, {})
elif isinstance(obj, list) and key.isdigit():
obj = obj[int(key)] if int(key) < len(obj) else {}
else:
return None
return obj

def aggregate_helper(objects: List[Any]) -> Any:
if not objects:
return None
if all(isinstance(obj, (int, float)) for obj in objects):
avg = sum(objects) / len(objects)
return avg
if all(isinstance(obj, dict) for obj in objects):
keys = set().union(*objects)
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys}
if all(isinstance(obj, list) for obj in objects):
max_length = max(len(obj) for obj in objects)
return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)]
return next((obj for obj in objects if obj is not None), None)

if isinstance(key_path, str):
key_path = key_path.split('.')

values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = {
"op-metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
"total_time": self.aggregate_json_by_key("total_time"),
"total_time_per_shard": self.aggregate_json_by_key("total_time_per_shard"),
"indexing_throttle_time": self.aggregate_json_by_key("indexing_throttle_time"),
"indexing_throttle_time_per_shard": self.aggregate_json_by_key("indexing_throttle_time_per_shard"),
"merge_time": self.aggregate_json_by_key("merge_time"),
"merge_time_per_shard": self.aggregate_json_by_key("merge_time_per_shard"),
"merge_count": self.aggregate_json_by_key("merge_count"),
"refresh_time": self.aggregate_json_by_key("refresh_time"),
"refresh_time_per_shard": self.aggregate_json_by_key("refresh_time_per_shard"),
"refresh_count": self.aggregate_json_by_key("refresh_count"),
"flush_time": self.aggregate_json_by_key("flush_time"),
"flush_time_per_shard": self.aggregate_json_by_key("flush_time_per_shard"),
"flush_count": self.aggregate_json_by_key("flush_count"),
"merge_throttle_time": self.aggregate_json_by_key("merge_throttle_time"),
"merge_throttle_time_per_shard": self.aggregate_json_by_key("merge_throttle_time_per_shard"),
"ml_processing_time": self.aggregate_json_by_key("ml_processing_time"),
"young_gc_time": self.aggregate_json_by_key("young_gc_time"),
"young_gc_count": self.aggregate_json_by_key("young_gc_count"),
"old_gc_time": self.aggregate_json_by_key("old_gc_time"),
"old_gc_count": self.aggregate_json_by_key("old_gc_count"),
"memory_segments": self.aggregate_json_by_key("memory_segments"),
"memory_doc_values": self.aggregate_json_by_key("memory_doc_values"),
"memory_terms": self.aggregate_json_by_key("memory_terms"),
"memory_norms": self.aggregate_json_by_key("memory_norms"),
"memory_points": self.aggregate_json_by_key("memory_points"),
"memory_stored_fields": self.aggregate_json_by_key("memory_stored_fields"),
"store_size": self.aggregate_json_by_key("store_size"),
"translog_size": self.aggregate_json_by_key("translog_size"),
"segment_count": self.aggregate_json_by_key("segment_count"),
"total_transform_search_times": self.aggregate_json_by_key("total_transform_search_times"),
"total_transform_index_times": self.aggregate_json_by_key("total_transform_index_times"),
"total_transform_processing_times": self.aggregate_json_by_key("total_transform_processing_times"),
"total_transform_throughput": self.aggregate_json_by_key("total_transform_throughput")
}

for task, task_metrics in self.accumulated_results.items():
iterations = self.accumulated_iterations.get(task, 1)
aggregated_task_metrics = self.calculate_weighted_average(task_metrics, iterations)
op_metric = {
"task": task,
"operation": task,
"throughput": aggregated_task_metrics["throughput"],
"latency": aggregated_task_metrics["latency"],
"service_time": aggregated_task_metrics["service_time"],
"client_processing_time": aggregated_task_metrics["client_processing_time"],
"processing_time": aggregated_task_metrics["processing_time"],
"error_rate": aggregated_task_metrics["error_rate"],
"duration": aggregated_task_metrics["duration"]
}
aggregated_results["op-metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
current_timestamp = self.config.opts("system", "time.start")

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
# ensure that the parent folder already exists when we try to write the file...
rio.ensure_dir(rio.dirname(normalized_results_file))
test_execution_id = os.path.basename(normalized_results_file)
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", normalized_results_file)
elif hasattr(self.args, 'test_execution_id') and self.args.test_execution_id:
test_execution_id = f"aggregate_results_{test_exe.workload}_{self.args.test_execution_id}"
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)
else:
test_execution_id = f"aggregate_results_{test_exe.workload}_{str(uuid.uuid4())}"
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)

print("Aggregate test execution ID: ", test_execution_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_exe.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_exe.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_exe.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)

loaded_workload = workload.load_workload(self.config)
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution.add_results(AggregatedResults(aggregated_results))
test_execution.distribution_version = test_exe.distribution_version
test_execution.revision = test_exe.revision
test_execution.distribution_flavor = test_exe.distribution_flavor
test_execution.provision_config_revision = test_exe.provision_config_revision

return test_execution

def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
weighted_metrics = {}

for metric, values in task_metrics.items():
weighted_metrics[metric] = {}
if isinstance(values[0], dict):
for item_key in values[0].keys():
if item_key == 'unit':
weighted_metrics[metric][item_key] = values[0][item_key]
else:
item_values = [value.get(item_key, 0) for value in values]
if iterations > 1:
weighted_sum = sum(value * iterations for value in item_values)
total_iterations = iterations * len(values)
weighted_metrics[metric][item_key] = weighted_sum / total_iterations
else:
weighted_metrics[metric][item_key] = sum(item_values) / len(item_values)
else:
if iterations > 1:
weighted_sum = sum(value * iterations for value in values)
total_iterations = iterations * len(values)
weighted_metrics[metric] = weighted_sum / total_iterations
else:
weighted_metrics[metric] = sum(values) / len(values)
return weighted_metrics

def test_execution_compatibility_check(self) -> None:
first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
workload = first_test_execution.workload
test_procedure = first_test_execution.test_procedure
for id in self.test_executions.keys():
test_execution = self.test_store.find_by_test_execution_id(id)
if test_execution:
if test_execution.workload != workload:
Copy link
Collaborator

@IanHoang IanHoang Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On top of checking workload, we should also verify if the first test execution id's test_procedure matches the rest. Reason being some workloads have multiple test_procedures.

For example, NYC Taxis has 4 test procedures (3 form default.json):https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/test_procedures/default.json

If a user aggregated a group of test execution ids that use the same workload but differ in test procedures, we could still run into the issue of comparing different operations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check for this👍

raise ValueError(f"Incompatible workload: test {id} has workload '{test_execution.workload}' instead of '{workload}'")
if test_execution.test_procedure != test_procedure:
raise ValueError(
f"Incompatible test procedure: test {id} has test procedure '{test_execution.test_procedure}'\n"
f"instead of '{test_procedure}'"
Copy link
Collaborator

@IanHoang IanHoang Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It's good that we stated what's wrong but it'd also be nice to point the user in the right direction:
f"Ensure that all test ids have the same test procedure from the same workload"

This is especially useful to inexperienced users who are not familiar with how OSB works. This can be applied to both ValueErrors in line 205 and 208-209.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to all the error messages

)
else:
raise ValueError("Test execution not found: ", id)

self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure)
return True

def aggregate(self) -> None:
if self.test_execution_compatibility_check():
for id in self.test_executions.keys():
test_execution = self.test_store.find_by_test_execution_id(id)
if test_execution:
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload)
self.count_iterations_for_each_op()
self.accumulate_results(test_execution)

aggregated_results = self.build_aggregated_results()
file_test_exe_store = FileTestExecutionStore(self.config)
file_test_exe_store.store_test_execution(aggregated_results)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions:

  • Will we be storing the aggregated results to ~/.benchmark/benchmarks/test_executions or to a separate directory for aggregated results?
  • Will we also store this in a OSTestExecutionStore if the user has their benchmark.ini file configured to use an external metrics data store? If so, let's implement this in a separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we'll store the results to the benchmarks test_executions folder, but I can add a separate folder in a future PR.

I did some testing and this does store in an OSTestExecutionStore when my benchmark.ini file is configured to use it!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, sounds good!

else:
raise ValueError("Incompatible test execution results")

class AggregatedResults:
def __init__(self, results):
self.results = results

def as_dict(self):
return self.results
36 changes: 35 additions & 1 deletion osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from osbenchmark.builder import provision_config, builder
from osbenchmark.workload_generator import workload_generator
from osbenchmark.utils import io, convert, process, console, net, opts, versions
from osbenchmark import aggregator


def create_arg_parser():
Expand Down Expand Up @@ -221,6 +222,26 @@ def add_workload_source(subparser):
help="Whether to include the comparison in the results file.",
default=True)

aggregate_parser = subparsers.add_parser("aggregate", help="Aggregate multiple test_executions")
aggregate_parser.add_argument(
"--test-executions",
OVI3D0 marked this conversation as resolved.
Show resolved Hide resolved
"--t",
type=non_empty_list,
required=True,
help="Comma-separated list of TestExecution IDs to aggregate")
aggregate_parser.add_argument(
"--test-execution-id",
help="Define a unique id for this aggregated test_execution.",
default="")
aggregate_parser.add_argument(
"--results-file",
help="Write the aggregated results to the provided file.",
default="")
aggregate_parser.add_argument(
"--workload-repository",
help="Define the repository from where OSB will load workloads (default: default).",
default="default")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include other common options like --test-execution-id and --results-file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these! Let me know if I should add others as well

download_parser = subparsers.add_parser("download", help="Downloads an artifact")
download_parser.add_argument(
"--provision-config-repository",
Expand Down Expand Up @@ -613,7 +634,7 @@ def add_workload_source(subparser):
action="store_true",
default=False)

for p in [list_parser, test_execution_parser, compare_parser, download_parser, install_parser,
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser,
start_parser, stop_parser, info_parser, create_workload_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
Expand Down Expand Up @@ -832,6 +853,15 @@ def configure_results_publishing_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align)

def prepare_test_executions_dict(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
test_executions_dict = {}
if args.test_executions:
for execution in args.test_executions:
execution = execution.strip()
if execution:
test_executions_dict[execution] = None
return test_executions_dict

def print_test_execution_id(args):
console.info(f"[Test Execution ID]: {args.test_execution_id}")
Expand All @@ -847,6 +877,10 @@ def dispatch_sub_command(arg_parser, args, cfg):
configure_results_publishing_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "results_publishing", "percentiles", args.percentiles)
results_publisher.compare(cfg, args.baseline, args.contender)
elif sub_command == "aggregate":
test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit)
Expand Down
Loading
Loading