Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add calculate-recall parameter to vector search and skip calculating recall if number clients > cpu cores #626

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from functools import total_ordering
from io import BytesIO
from os.path import commonprefix
import multiprocessing
from typing import List, Optional

import ijson
Expand Down Expand Up @@ -1320,29 +1321,49 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F

return correct / min_num_of_results

def _set_initial_recall_values(params: dict, result: dict) -> None:
# Add recall@k and recall@1 to the initial result only if k is present in the params and calculate_recall is true
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})

def _get_should_calculate_recall(params: dict) -> bool:
# set in global config (benchmark.ini) and passed by AsyncExecutor
IanHoang marked this conversation as resolved.
Show resolved Hide resolved
num_clients = params.get("num_clients", 0)
if num_clients == 0:
self.logger.debug("Expected num_clients to be specified but was not.")
# default is set for runner unit tests based on default logic for available.cores in worker_coordinator
cpu_count = params.get("num_cores", multiprocessing.cpu_count())
if cpu_count < num_clients:
self.logger.warning("Number of clients, %s, specified is greater than the number of CPUs, %s, available."\
"This will lead to unperformant context switching on load generation host. Performance "\
"metrics may not be accurate. Skipping recall calculation.", num_clients, cpu_count)
return False
return params.get("calculate-recall", True)

result = {
"weight": 1,
"unit": "ops",
"success": True,
}
# Add recall@k and recall@1 to the initial result only if k is present in the params
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})
# deal with clients here. Need to get num_clients
should_calculate_recall = _get_should_calculate_recall(params)
if should_calculate_recall:
_set_initial_recall_values(params, result)

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)
Expand All @@ -1366,6 +1387,10 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
if _is_empty_search_results(response_json):
self.logger.info("Vector search query returned no results.")
return result

if not should_calculate_recall:
return result

id_field = parse_string_parameter("id-field-name", params, "_id")
candidates = []
for hit in response_json['hits']['hits']:
Expand Down
13 changes: 11 additions & 2 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ def os_clients(all_hosts, all_client_options):
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error))
task.error_behavior(self.abort_on_error), self.cfg)
final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor
aws.append(final_executor())
run_start = time.perf_counter()
Expand Down Expand Up @@ -1577,7 +1577,7 @@ async def __call__(self, *args, **kwargs):


class AsyncExecutor:
def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error):
def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error, config=None):
"""
Executes tasks according to the schedule for a given operation.

Expand All @@ -1599,6 +1599,7 @@ def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, compl
self.complete = complete
self.on_error = on_error
self.logger = logging.getLogger(__name__)
self.cfg = config

async def __call__(self, *args, **kwargs):
task_completes_parent = self.task.completes_parent
Expand All @@ -1624,6 +1625,14 @@ async def __call__(self, *args, **kwargs):
processing_start = time.perf_counter()
self.schedule_handle.before_request(processing_start)
async with self.opensearch["default"].new_request_context() as request_context:
# add num_clients to the parameter so that vector search runner can skip calculating recall
# if num_clients > cpu_count().
if params:
if params.get("operation-type") == "vector-search":
available_cores = int(self.cfg.opts("system", "available.cores", mandatory=False,
default_value=multiprocessing.cpu_count()))
params.update({"num_clients": self.task.clients, "num_cores": available_cores})

total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error)
request_start = request_context.request_start
request_end = request_context.request_end
Expand Down
4 changes: 4 additions & 0 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def __init__(self, workload, params, **kwargs):
f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'")
request_cache = params.get("cache", None)
detailed_results = params.get("detailed-results", False)
calculate_recall = params.get("calculate-recall", True)
query_body = params.get("body", None)
pages = params.get("pages", None)
results_per_page = params.get("results-per-page", None)
Expand All @@ -561,6 +562,7 @@ def __init__(self, workload, params, **kwargs):
"type": type_name,
"cache": request_cache,
"detailed-results": detailed_results,
"calculate-recall": calculate_recall,
"request-params": request_params,
"response-compression-enabled": response_compression_enabled,
"body": query_body
Expand Down Expand Up @@ -850,6 +852,8 @@ def params(self):

class VectorSearchParamSource(SearchParamSource):
def __init__(self, workload, params, **kwargs):
# print workload
logging.getLogger(__name__).info("Workload: [%s], params: [%s]", workload, params)
super().__init__(workload, params, **kwargs)
self.delegate_param_source = VectorSearchPartitionParamSource(workload, params, self.query_params, **kwargs)
self.corpora = self.delegate_param_source.corpora
Expand Down
Loading
Loading