Skip to content

Commit

Permalink
Change min/max to overall_min/overall_max + update comparison results…
Browse files Browse the repository at this point in the history
… publisher (#692)

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
  • Loading branch information
OVI3D0 authored Nov 15, 2024
1 parent 03f1b76 commit 2d14ed4
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 50 deletions.
23 changes: 13 additions & 10 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,24 +193,27 @@ def build_aggregated_results(self):

def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
weighted_metrics = {}
num_executions = len(next(iter(task_metrics.values())))
total_iterations = iterations * num_executions

for metric, values in task_metrics.items():
if isinstance(values[0], dict):
weighted_metrics[metric] = {}
for item_key in values[0].keys():
if item_key == 'unit':
weighted_metrics[metric][item_key] = values[0][item_key]
for metric_field in values[0].keys():
if metric_field == 'unit':
weighted_metrics[metric][metric_field] = values[0][metric_field]
elif metric_field == 'min':
weighted_metrics[metric]['overall_min'] = min(value.get(metric_field, 0) for value in values)
elif metric_field == 'max':
weighted_metrics[metric]['overall_max'] = max(value.get(metric_field, 0) for value in values)
else:
item_values = [value.get(item_key, 0) for value in values]
# for items like median or containing percentile values
item_values = [value.get(metric_field, 0) for value in values]
weighted_sum = sum(value * iterations for value in item_values)
total_iterations = iterations * len(item_values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric][item_key] = weighted_avg
weighted_metrics[metric][metric_field] = weighted_sum / total_iterations
else:
weighted_sum = sum(value * iterations for value in values)
total_iterations = iterations * len(values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric] = weighted_avg
weighted_metrics[metric] = weighted_sum / total_iterations

return weighted_metrics

Expand Down
8 changes: 4 additions & 4 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ def _write_results(self, metrics_table, metrics_table_console):
data_plain=metrics_table, data_rich=metrics_table_console)

def _publish_throughput(self, baseline_stats, contender_stats, task):
b_min = baseline_stats.metrics(task)["throughput"]["min"]
b_min = baseline_stats.metrics(task)["throughput"].get("overall_min") or baseline_stats.metrics(task)["throughput"]["min"]
b_mean = baseline_stats.metrics(task)["throughput"]["mean"]
b_median = baseline_stats.metrics(task)["throughput"]["median"]
b_max = baseline_stats.metrics(task)["throughput"]["max"]
b_max = baseline_stats.metrics(task)["throughput"].get("overall_max") or baseline_stats.metrics(task)["throughput"]["max"]
b_unit = baseline_stats.metrics(task)["throughput"]["unit"]

c_min = contender_stats.metrics(task)["throughput"]["min"]
c_min = contender_stats.metrics(task)["throughput"].get("overall_min") or contender_stats.metrics(task)["throughput"]["min"]
c_mean = contender_stats.metrics(task)["throughput"]["mean"]
c_median = contender_stats.metrics(task)["throughput"]["median"]
c_max = contender_stats.metrics(task)["throughput"]["max"]
c_max = contender_stats.metrics(task)["throughput"].get("overall_max") or contender_stats.metrics(task)["throughput"]["max"]

return self._join(
self._line("Min Throughput", b_min, c_min, task, b_unit, treat_increase_as_improvement=True),
Expand Down
113 changes: 78 additions & 35 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from unittest.mock import patch, Mock
from unittest.mock import Mock, patch, mock_open
import pytest

from osbenchmark import config
from osbenchmark.aggregator import Aggregator
from osbenchmark.aggregator import Aggregator, AggregatedResults

@pytest.fixture
def mock_config():
mock_cfg = Mock(spec=config.Config)
mock_cfg.opts.side_effect = lambda *args: "/path/to/root" if args == ("node", "root.dir") else None
mock_cfg.opts.side_effect = lambda *args: "test_procedure_name" if args == ("workload", "test_procedure.name") else "/path/to/root"
return mock_cfg

@pytest.fixture
Expand All @@ -29,8 +28,8 @@ def mock_args():
def mock_test_store():
mock_store = Mock()
mock_store.find_by_test_execution_id.side_effect = [
Mock(results={"key1": {"nested": 10}}),
Mock(results={"key1": {"nested": 20}})
Mock(results={"key1": {"nested": 10}}, workload="workload1", test_procedure="test_proc1"),
Mock(results={"key1": {"nested": 20}}, workload="workload1", test_procedure="test_proc1")
]
return mock_store

Expand All @@ -40,28 +39,35 @@ def aggregator(mock_config, mock_test_executions, mock_args, mock_test_store):
aggregator.test_store = mock_test_store
return aggregator

def test_iterations(aggregator, mock_args):
def test_count_iterations_for_each_op(aggregator):
mock_workload = Mock()
mock_schedule = [Mock(name="op1", iterations=5)]
mock_task = Mock(name="task1", schedule=mock_schedule)
mock_workload.test_procedures = [mock_task]

# Mock the config.opts call to return the same test_procedure.name
aggregator.config.opts.side_effect = lambda *args: mock_task.name if args == ("workload", "test_procedure.name") else None

mock_task = Mock(spec=['name', 'iterations'])
mock_task.name = "op1"
mock_task.iterations = 5
mock_schedule = [mock_task]
mock_test_procedure = Mock(spec=['name', 'schedule'])
mock_test_procedure.name = "test_procedure_name"
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

# Update the config mock to return the correct test_procedure_name
aggregator.config.opts.side_effect = lambda *args: \
mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root"
with patch('osbenchmark.workload.load_workload', return_value=mock_workload):
aggregator.count_iterations_for_each_op()

assert list(aggregator.accumulated_iterations.values())[0] == 5
print(f"accumulated_iterations: {aggregator.accumulated_iterations}") # Debug print
assert "op1" in aggregator.accumulated_iterations, "op1 not found in accumulated_iterations"
assert aggregator.accumulated_iterations["op1"] == 5

def test_results(aggregator):
def test_accumulate_results(aggregator):
mock_test_execution = Mock()
mock_test_execution.results = {
"op_metrics": [
{
"task": "task1",
"throughput": 100,
"latency": 10,
"latency": {"avg": 10, "unit": "ms"},
"service_time": 5,
"client_processing_time": 2,
"processing_time": 3,
Expand All @@ -74,9 +80,19 @@ def test_results(aggregator):
aggregator.accumulate_results(mock_test_execution)

assert "task1" in aggregator.accumulated_results
assert all(metric in aggregator.accumulated_results["task1"] for metric in
["throughput", "latency", "service_time", "client_processing_time",
"processing_time", "error_rate", "duration"])
assert all(metric in aggregator.accumulated_results["task1"] for metric in aggregator.metrics)

def test_test_execution_compatibility_check(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"), # Add one more mock response
]
aggregator.test_store = mock_test_store
aggregator.test_executions = {"test1": Mock(), "test2": Mock()}

assert aggregator.test_execution_compatibility_check()

def test_aggregate_json_by_key(aggregator):
result = aggregator.aggregate_json_by_key("key1.nested")
Expand All @@ -95,25 +111,52 @@ def test_calculate_weighted_average(aggregator):
assert result["latency"]["avg"] == 15
assert result["latency"]["unit"] == "ms"

def test_compatibility_check(aggregator):
mock_test_procedure = Mock(name="test_procedure")
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure),
Mock(workload="workload1", test_procedure=mock_test_procedure)
]
aggregator.test_store = mock_test_store
assert aggregator.test_execution_compatibility_check()

def test_calculate_rsd(aggregator):
values = [1, 2, 3, 4, 5]
rsd = aggregator.calculate_rsd(values, "test_metric")
assert isinstance(rsd, float)

def test_compatibility_check_incompatible(aggregator):
def test_test_execution_compatibility_check_incompatible(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_execution_id.side_effect = [
Mock(workload="workload1"),
Mock(workload="workload2"),
Mock(workload="workload1")
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload2", test_procedure="test_proc1"),
]
aggregator.test_store = mock_test_store
aggregator.test_executions = {"test1": Mock(), "test2": Mock()}
with pytest.raises(ValueError):
aggregator.test_execution_compatibility_check()

def test_aggregate(aggregator):
mock_aggregated_results = Mock(test_execution_id="mock_id", as_dict=lambda: {})

with patch.object(aggregator, 'test_execution_compatibility_check', return_value=True), \
patch.object(aggregator, 'count_iterations_for_each_op'), \
patch.object(aggregator, 'accumulate_results'), \
patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \
patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class, \
patch('osbenchmark.utils.io.ensure_dir') as mock_ensure_dir, \
patch('builtins.open', mock_open()) as mock_file:

mock_store = mock_store_class.return_value
mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}")

aggregator.aggregate()

print(f"mock_build called: {mock_build.called}")
print(f"mock_store.store_aggregated_execution called: {mock_store.store_aggregated_execution.called}")

assert mock_build.called, "build_aggregated_results was not called"
mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results)

print(f"ensure_dir called: {mock_ensure_dir.called}")
print(f"ensure_dir call args: {mock_ensure_dir.call_args_list}")
print(f"open called: {mock_file.called}")
print(f"open call args: {mock_file.call_args_list}")

assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called"

def test_aggregated_results():
results = {"key": "value"}
agg_results = AggregatedResults(results)
assert agg_results.as_dict() == results
58 changes: 57 additions & 1 deletion tests/results_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
# under the License.

from unittest import TestCase
from unittest.mock import Mock, patch

from osbenchmark import results_publisher


# pylint: disable=protected-access
class FormatterTests(TestCase):
def setUp(self):
self.empty_header = ["Header"]
Expand Down Expand Up @@ -57,3 +58,58 @@ def test_formats_as_csv(self):
formatted = results_publisher.format_as_csv(self.metrics_header, self.metrics_data)
# 1 header line, no separation line + 3 data lines
self.assertEqual(1 + 3, len(formatted.splitlines()))

@patch('osbenchmark.results_publisher.convert.to_bool')
def test_publish_throughput_handles_different_metrics(self, mock_to_bool):
config = Mock()

# Configure mock to return appropriate values for different calls
def config_opts_side_effect(*args, **kwargs):
if args[0] == "results_publishing":
if args[1] == "output.processingtime":
return False
elif args[1] == "percentiles":
return None
return Mock()

config.opts.side_effect = config_opts_side_effect

publisher = results_publisher.ComparisonResultsPublisher(config)

# Mock for regular test execution
regular_stats = Mock()
regular_stats.metrics.return_value = {
"throughput": {
"min": 100,
"max": 200,
"mean": 150,
"median": 160,
"unit": "ops/s"
}
}

# Mock for aggregated test execution
aggregated_stats = Mock()
aggregated_stats.metrics.return_value = {
"throughput": {
"overall_min": 95,
"overall_max": 205,
"min": 100,
"max": 200,
"mean": 150,
"median": 160,
"unit": "ops/s"
}
}

# Test with regular stats
result_regular = publisher._publish_throughput(regular_stats, regular_stats, "test_task")
self.assertEqual(len(result_regular), 4)
self.assertEqual(result_regular[0][2], 100) # baseline min
self.assertEqual(result_regular[3][3], 200) # contender max

# Test with aggregated stats
result_aggregated = publisher._publish_throughput(aggregated_stats, aggregated_stats, "test_task")
self.assertEqual(len(result_aggregated), 4)
self.assertEqual(result_aggregated[0][2], 95) # baseline overall_min
self.assertEqual(result_aggregated[3][3], 205) # contender overall_max

0 comments on commit 2d14ed4

Please sign in to comment.