diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index d8ad2b74f..41b303073 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -24,6 +24,8 @@ class Context(Enum): INDEX = 1 QUERY = 2 NEIGHBORS = 3 + MAX_DISTANCE_NEIGHBORS = 4 + MIN_SCORE_NEIGHBORS = 5 class DataSet(ABC): @@ -141,6 +143,12 @@ def parse_context(context: Context) -> str: if context == Context.QUERY: return "test" + if context == Context.MAX_DISTANCE_NEIGHBORS: + return "max_distance_neighbors" + + if context == Context.MIN_SCORE_NEIGHBORS: + return "min_score_neighbors" + raise Exception("Unsupported context") diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index cac5b6ef1..21ed83b6a 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1232,13 +1232,6 @@ async def _vector_search_query_with_recall(opensearch, params): Perform vector search and report recall@k , recall@r and time taken to perform recall in ms as meta object. """ - result = { - "weight": 1, - "unit": "ops", - "success": True, - "recall@k": 0, - "recall@1": 0, - } def _is_empty_search_results(content): if content is None: @@ -1261,7 +1254,7 @@ def _get_field_value(content, field_name): return _get_field_value(content["_source"], field_name) return None - def calculate_recall(predictions, neighbors, top_k): + def calculate_topk_search_recall(predictions, neighbors, top_k): """ Calculates the recall by comparing top_k neighbors with predictions. recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth @@ -1289,9 +1282,71 @@ def calculate_recall(predictions, neighbors, top_k): return correct / min_num_of_results + def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=False): + """ + Calculates the recall by comparing max_distance/min_score threshold neighbors with predictions. + recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth + Args: + predictions: list containing ids of results returned by OpenSearch. + neighbors: list containing ids of the actual neighbors for a set of queries + enable_top_1_recall: boolean to calculate recall@1 + Returns: + Recall between predictions and top k neighbors from ground truth + """ + correct = 0.0 + try: + n = neighbors.index('-1') + # Slice the list to have a length of n + truth_set = neighbors[:n] + except ValueError: + # If '-1' is not found in the list, use the entire list + truth_set = neighbors + min_num_of_results = len(truth_set) + if min_num_of_results == 0: + self.logger.info("No neighbors are provided for recall calculation") + return 1 + + if enable_top_1_recall: + min_num_of_results = 1 + + for j in range(min_num_of_results): + if j >= len(predictions): + self.logger.info("No more neighbors in prediction to compare against ground truth.\n" + "Total neighbors in prediction: [%d].\n" + "Total neighbors in ground truth: [%d]", len(predictions), min_num_of_results) + break + if predictions[j] in truth_set: + correct += 1.0 + + return correct / min_num_of_results + + result = { + "weight": 1, + "unit": "ops", + "success": True, + } + # Add recall@k and recall@1 to the initial result only if k is present in the params + if "k" in params: + result.update({ + "recall@k": 0, + "recall@1": 0 + }) + # Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params + elif "max_distance" in params: + result.update({ + "recall@max_distance": 0, + "recall@max_distance_1": 0 + }) + # Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params + elif "min_score" in params: + result.update({ + "recall@min_score": 0, + "recall@min_score_1": 0 + }) + doc_type = params.get("type") response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers) - recall_processing_start = time.perf_counter() + if detailed_results: props = parse(response, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"]) hits_total = props.get("hits.total.value", props.get("hits.total", 0)) @@ -1305,6 +1360,8 @@ def calculate_recall(predictions, neighbors, top_k): "timed_out": timed_out, "took": took }) + + recall_processing_start = time.perf_counter() response_json = json.loads(response.getvalue()) if _is_empty_search_results(response_json): self.logger.info("Vector search query returned no results.") @@ -1318,12 +1375,23 @@ def calculate_recall(predictions, neighbors, top_k): continue candidates.append(field_value) neighbors_dataset = params["neighbors"] - num_neighbors = params.get("k", 1) - recall_k = calculate_recall(candidates, neighbors_dataset, num_neighbors) - result.update({"recall@k": recall_k}) - recall_1 = calculate_recall(candidates, neighbors_dataset, 1) - result.update({"recall@1": recall_1}) + if "k" in params: + num_neighbors = params.get("k", 1) + recall_top_k = calculate_topk_search_recall(candidates, neighbors_dataset, num_neighbors) + recall_top_1 = calculate_topk_search_recall(candidates, neighbors_dataset, 1) + result.update({"recall@k": recall_top_k}) + result.update({"recall@1": recall_top_1}) + + if "max_distance" in params or "min_score" in params: + recall_threshold = calculate_radial_search_recall(candidates, neighbors_dataset) + recall_top_1 = calculate_radial_search_recall(candidates, neighbors_dataset, True) + if "min_score" in params: + result.update({"recall@min_score": recall_threshold}) + result.update({"recall@min_score_1": recall_top_1}) + elif "max_distance" in params: + result.update({"recall@max_distance": recall_threshold}) + result.update({"recall@max_distance_1": recall_top_1}) recall_processing_end = time.perf_counter() recall_processing_time = convert.seconds_to_ms(recall_processing_end - recall_processing_start) diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 59ebe27d1..d59ff557e 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -40,7 +40,7 @@ from osbenchmark import exceptions from osbenchmark.utils import io from osbenchmark.utils.dataset import DataSet, get_data_set, Context -from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} @@ -1027,6 +1027,8 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): request-params: query parameters that can be passed to search request """ PARAMS_NAME_K = "k" + PARAMS_NAME_MAX_DISTANCE = "max_distance" + PARAMS_NAME_MIN_SCORE = "min_score" PARAMS_NAME_BODY = "body" PARAMS_NAME_SIZE = "size" PARAMS_NAME_QUERY = "query" @@ -1041,11 +1043,27 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): PARAMS_NAME_REQUEST_PARAMS = "request-params" PARAMS_NAME_SOURCE = "_source" PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results" + MIN_SCORE_QUERY_TYPE = "min_score" + MAX_DISTANCE_QUERY_TYPE = "max_distance" + KNN_QUERY_TYPE = "knn" + DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE = 10000 def __init__(self, workloads, params, query_params, **kwargs): super().__init__(workloads, params, Context.QUERY, **kwargs) self.logger = logging.getLogger(__name__) - self.k = parse_int_parameter(self.PARAMS_NAME_K, params) + self.k = None + self.distance = None + self.score = None + if self.PARAMS_NAME_K in params: + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) + self.query_type = self.KNN_QUERY_TYPE + if self.PARAMS_NAME_MAX_DISTANCE in params: + self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params) + self.query_type = self.MAX_DISTANCE_QUERY_TYPE + if self.PARAMS_NAME_MIN_SCORE in params: + self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params) + self.query_type = self.MIN_SCORE_QUERY_TYPE + self._validate_query_type_parameters() self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) self.current_rep = 1 self.neighbors_data_set_format = parse_string_parameter( @@ -1058,10 +1076,21 @@ def __init__(self, workloads, params, query_params, **kwargs): self.PARAMS_VALUE_VECTOR_SEARCH) self.query_params = query_params self.query_params.update({ - self.PARAMS_NAME_K: self.k, self.PARAMS_NAME_OPERATION_TYPE: operation_type, self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), }) + if self.PARAMS_NAME_K in params: + self.query_params.update({ + self.PARAMS_NAME_K: self.k + }) + if self.PARAMS_NAME_MAX_DISTANCE in params: + self.query_params.update({ + self.PARAMS_NAME_MAX_DISTANCE: self.distance + }) + if self.PARAMS_NAME_MIN_SCORE in params: + self.query_params.update({ + self.PARAMS_NAME_MIN_SCORE: self.score + }) if self.PARAMS_NAME_FILTER in params: self.query_params.update({ self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER) @@ -1072,6 +1101,10 @@ def __init__(self, workloads, params, query_params, **kwargs): neighbors_corpora = self.extract_corpora(self.neighbors_data_set_corpus, self.neighbors_data_set_format) self.corpora.extend(corpora for corpora in neighbors_corpora if corpora not in self.corpora) + def _validate_query_type_parameters(self): + if bool(self.k) + bool(self.distance) + bool(self.score) > 1: + raise ValueError("Only one of k, max_distance, or min_score can be specified in vector search.") + @staticmethod def _validate_neighbors_data_set(file_path, corpus): if file_path and corpus: @@ -1086,15 +1119,29 @@ def _update_request_params(self): self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false") self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params}) + def _get_query_neighbors(self): + if self.query_type == self.KNN_QUERY_TYPE: + return Context.NEIGHBORS + if self.query_type == self.MIN_SCORE_QUERY_TYPE: + return Context.MIN_SCORE_NEIGHBORS + if self.query_type == self.MAX_DISTANCE_QUERY_TYPE: + return Context.MAX_DISTANCE_NEIGHBORS + raise Exception("Unknown query type [%s]" % self.query_type) + + def _get_query_size(self): + if self.query_type == self.KNN_QUERY_TYPE: + return self.k + return self.DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE + def _update_body_params(self, vector): # accept body params if passed from workload, else, create empty dictionary body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict() if self.PARAMS_NAME_SIZE not in body_params: - body_params[self.PARAMS_NAME_SIZE] = self.k + body_params[self.PARAMS_NAME_SIZE] = self._get_query_size() if self.PARAMS_NAME_QUERY in body_params: self.logger.warning( "[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY) - efficient_filter=self.query_params.get(self.PARAMS_NAME_FILTER) + efficient_filter = self.query_params.get(self.PARAMS_NAME_FILTER) # override query params with vector search query body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter) self.query_params.update({self.PARAMS_NAME_BODY: body_params}) @@ -1110,7 +1157,7 @@ def partition(self, partition_index, total_partitions): self.neighbors_data_set_path = self.data_set_path # add neighbor instance to partition partition.neighbors_data_set = get_data_set( - self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors()) partition.neighbors_data_set.seek(partition.offset) return partition @@ -1129,7 +1176,7 @@ def params(self): raise StopIteration vector = self.data_set.read(1)[0] neighbor = self.neighbors_data_set.read(1)[0] - true_neighbors = list(map(str, neighbor[:self.k])) + true_neighbors = list(map(str, neighbor[:self.k] if self.k else neighbor)) self.query_params.update({ "neighbors": true_neighbors, }) @@ -1140,17 +1187,34 @@ def params(self): return self.query_params def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict: - """Builds a k-NN request that can be used to execute an approximate nearest + """Builds a vector search request that can be used to execute an approximate nearest neighbor search against a k-NN plugin index Args: vector: vector used for query + efficient_filter: efficient filter used for query Returns: A dictionary containing the body used for search query """ - query = { + query = {} + if self.query_type == self.KNN_QUERY_TYPE: + query.update({ + "k": self.k, + }) + elif self.query_type == self.MIN_SCORE_QUERY_TYPE: + query.update({ + "min_score": self.score, + }) + elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE: + query.update({ + "max_distance": self.distance, + }) + else: + raise Exception("Unknown query type [%s]" % self.query_type) + + query.update({ "vector": vector, - "k": self.k, - } + }) + if efficient_filter: query.update({ "filter": efficient_filter, diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 8800d7af5..fbf2ecff4 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -3121,6 +3121,164 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_query_vector_radial_search_with_min_score(self, opensearch, on_client_request_start, on_client_request_end): + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 + }, + { + "_id": 102, + "_score": 0.88 + }, + { + "_id": 103, + "_score": 0.87 + } + ] + } + } + opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + + query_runner = runner.Query() + + params = { + "index": "unittest", + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "min_score": 0.80, + "neighbors": [101, 102, 103], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "min_score": 0.80, + } + } + } + } + } + + async with query_runner: + result = await query_runner(opensearch, params) + + self.assertEqual(1, result["weight"]) + self.assertEqual("ops", result["unit"]) + self.assertEqual(3, result["hits"]) + self.assertEqual("eq", result["hits_relation"]) + self.assertFalse(result["timed_out"]) + self.assertEqual(5, result["took"]) + self.assertIn("recall_time_ms", result.keys()) + self.assertIn("recall@min_score", result.keys()) + self.assertEqual(result["recall@min_score"], 1.0) + self.assertIn("recall@min_score_1", result.keys()) + self.assertEqual(result["recall@min_score_1"], 1.0) + self.assertNotIn("error-type", result.keys()) + + opensearch.transport.perform_request.assert_called_once_with( + "GET", + "/unittest/_search", + params={}, + body=params["body"], + headers={"Accept-Encoding": "identity"} + ) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_query_vector_radial_search_with_max_distance(self, opensearch, on_client_request_start, on_client_request_end): + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 + }, + { + "_id": 102, + "_score": 0.88 + }, + { + "_id": 103, + "_score": 0.87 + } + ] + } + } + opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + + query_runner = runner.Query() + + params = { + "index": "unittest", + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "max_distance": 15.0, + "neighbors": [101, 102, 103, 104], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "max_distance": 15.0, + } + } + } + } + } + + async with query_runner: + result = await query_runner(opensearch, params) + + self.assertEqual(1, result["weight"]) + self.assertEqual("ops", result["unit"]) + self.assertEqual(3, result["hits"]) + self.assertEqual("eq", result["hits_relation"]) + self.assertFalse(result["timed_out"]) + self.assertEqual(5, result["took"]) + self.assertIn("recall_time_ms", result.keys()) + self.assertIn("recall@max_distance", result.keys()) + self.assertEqual(result["recall@max_distance"], 0.75) + self.assertIn("recall@max_distance_1", result.keys()) + self.assertEqual(result["recall@max_distance_1"], 1.0) + self.assertNotIn("error-type", result.keys()) + + opensearch.transport.perform_request.assert_called_once_with( + "GET", + "/unittest/_search", + params={}, + body=params["body"], + headers={"Accept-Encoding": "identity"} + ) + class PutPipelineRunnerTests(TestCase): @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 6deb3161f..7b3c61da0 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -2957,6 +2957,93 @@ def test_params_custom_body(self): with self.assertRaises(StopIteration): query_param_source_partition.params() + def test_params_when_multiple_query_type_provided_then_raise_exception(self): + # Create a data set + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir + ) + + test_param_source_params_1 = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "k": 10, + "min_score": 0.5, + } + + with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params_1, { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + } + } + ) + # This line won't be executed if exception is raised during initialization + query_param_source.partition(0, 1) + + test_param_source_params_2 = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "k": 10, + "max_distance": 100.0, + } + + with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params_2, { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + } + } + ) + # This line won't be executed if exception is raised during initialization + query_param_source.partition(0, 1) + + test_param_source_params_3 = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "min_score": 0.5, + "max_distance": 100.0, + "k": 10, + } + + with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params_3, { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + } + } + ) + # This line won't be executed if exception is raised during initialization + query_param_source.partition(0, 1) + def _check_params( self, actual_params: dict,