Skip to content

Commit

Permalink
Add calculate-recall parameter to vector search and skip calculating …
Browse files Browse the repository at this point in the history
…recall if number clients > cpu cores (#626)

Signed-off-by: Finn Roblin <finnrobl@amazon.com>
  • Loading branch information
finnroblin authored Dec 5, 2024
1 parent d4954f5 commit dcc4bf6
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 27 deletions.
61 changes: 43 additions & 18 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from functools import total_ordering
from io import BytesIO
from os.path import commonprefix
import multiprocessing
from typing import List, Optional

import ijson
Expand Down Expand Up @@ -1344,29 +1345,49 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F

return correct / min_num_of_results

def _set_initial_recall_values(params: dict, result: dict) -> None:
# Add recall@k and recall@1 to the initial result only if k is present in the params and calculate_recall is true
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})

def _get_should_calculate_recall(params: dict) -> bool:
# set in global config (benchmark.ini) and passed by AsyncExecutor
num_clients = params.get("num_clients", 0)
if num_clients == 0:
self.logger.debug("Expected num_clients to be specified but was not.")
# default is set for runner unit tests based on default logic for available.cores in worker_coordinator
cpu_count = params.get("num_cores", multiprocessing.cpu_count())
if cpu_count < num_clients:
self.logger.warning("Number of clients, %s, specified is greater than the number of CPUs, %s, available."\
"This will lead to unperformant context switching on load generation host. Performance "\
"metrics may not be accurate. Skipping recall calculation.", num_clients, cpu_count)
return False
return params.get("calculate-recall", True)

result = {
"weight": 1,
"unit": "ops",
"success": True,
}
# Add recall@k and recall@1 to the initial result only if k is present in the params
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})
# deal with clients here. Need to get num_clients
should_calculate_recall = _get_should_calculate_recall(params)
if should_calculate_recall:
_set_initial_recall_values(params, result)

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)
Expand All @@ -1390,6 +1411,10 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
if _is_empty_search_results(response_json):
self.logger.info("Vector search query returned no results.")
return result

if not should_calculate_recall:
return result

id_field = parse_string_parameter("id-field-name", params, "_id")
candidates = []
for hit in response_json['hits']['hits']:
Expand Down
13 changes: 11 additions & 2 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ def os_clients(all_hosts, all_client_options):
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error))
task.error_behavior(self.abort_on_error), self.cfg)
final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor
aws.append(final_executor())
run_start = time.perf_counter()
Expand Down Expand Up @@ -1577,7 +1577,7 @@ async def __call__(self, *args, **kwargs):


class AsyncExecutor:
def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error):
def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error, config=None):
"""
Executes tasks according to the schedule for a given operation.
Expand All @@ -1599,6 +1599,7 @@ def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, compl
self.complete = complete
self.on_error = on_error
self.logger = logging.getLogger(__name__)
self.cfg = config

async def __call__(self, *args, **kwargs):
task_completes_parent = self.task.completes_parent
Expand All @@ -1624,6 +1625,14 @@ async def __call__(self, *args, **kwargs):
processing_start = time.perf_counter()
self.schedule_handle.before_request(processing_start)
async with self.opensearch["default"].new_request_context() as request_context:
# add num_clients to the parameter so that vector search runner can skip calculating recall
# if num_clients > cpu_count().
if params:
if params.get("operation-type") == "vector-search":
available_cores = int(self.cfg.opts("system", "available.cores", mandatory=False,
default_value=multiprocessing.cpu_count()))
params.update({"num_clients": self.task.clients, "num_cores": available_cores})

total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error)
request_start = request_context.request_start
request_end = request_context.request_end
Expand Down
4 changes: 4 additions & 0 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def __init__(self, workload, params, **kwargs):
f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'")
request_cache = params.get("cache", None)
detailed_results = params.get("detailed-results", False)
calculate_recall = params.get("calculate-recall", True)
query_body = params.get("body", None)
pages = params.get("pages", None)
results_per_page = params.get("results-per-page", None)
Expand All @@ -561,6 +562,7 @@ def __init__(self, workload, params, **kwargs):
"type": type_name,
"cache": request_cache,
"detailed-results": detailed_results,
"calculate-recall": calculate_recall,
"request-params": request_params,
"response-compression-enabled": response_compression_enabled,
"body": query_body
Expand Down Expand Up @@ -850,6 +852,8 @@ def params(self):

class VectorSearchParamSource(SearchParamSource):
def __init__(self, workload, params, **kwargs):
# print workload
logging.getLogger(__name__).info("Workload: [%s], params: [%s]", workload, params)
super().__init__(workload, params, **kwargs)
self.delegate_param_source = VectorSearchPartitionParamSource(workload, params, self.query_params, **kwargs)
self.corpora = self.delegate_param_source.corpora
Expand Down
Loading

0 comments on commit dcc4bf6

Please sign in to comment.