Skip to content

Commit

Permalink
Revert "Add benchmark support for vector radial search (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#546)"

This reverts commit 1eb5171.
  • Loading branch information
finnroblin committed Aug 13, 2024
1 parent 96f63d1 commit 8f07c3a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 229 deletions.
96 changes: 14 additions & 82 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,13 @@ async def _vector_search_query_with_recall(opensearch, params):
Perform vector search and report recall@k , recall@r and time taken to perform recall in ms as
meta object.
"""
result = {
"weight": 1,
"unit": "ops",
"success": True,
"recall@k": 0,
"recall@1": 0,
}

def _is_empty_search_results(content):
if content is None:
Expand Down Expand Up @@ -1263,7 +1270,7 @@ def binary_search_for_last_negative_1(neighbors):
low = mid + 1
return low - 1

def calculate_topk_search_recall(predictions, neighbors, top_k):
def calculate_recall(predictions, neighbors, top_k):
"""
Calculates the recall by comparing top_k neighbors with predictions.
recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth
Expand Down Expand Up @@ -1321,71 +1328,9 @@ def calculate_topk_search_recall(predictions, neighbors, top_k):
self.logger.info("Number correct: %s, length of truth set: %s, truth_set: %s", correct, len(truth_set), truth_set)
return correct / len(truth_set) # TP / (TP + FN), but ground truth includes all TP and FN.

def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=False):
"""
Calculates the recall by comparing max_distance/min_score threshold neighbors with predictions.
recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth
Args:
predictions: list containing ids of results returned by OpenSearch.
neighbors: list containing ids of the actual neighbors for a set of queries
enable_top_1_recall: boolean to calculate recall@1
Returns:
Recall between predictions and top k neighbors from ground truth
"""
correct = 0.0
try:
n = neighbors.index('-1')
# Slice the list to have a length of n
truth_set = neighbors[:n]
except ValueError:
# If '-1' is not found in the list, use the entire list
truth_set = neighbors
min_num_of_results = len(truth_set)
if min_num_of_results == 0:
self.logger.info("No neighbors are provided for recall calculation")
return 1

if enable_top_1_recall:
min_num_of_results = 1

for j in range(min_num_of_results):
if j >= len(predictions):
self.logger.info("No more neighbors in prediction to compare against ground truth.\n"
"Total neighbors in prediction: [%d].\n"
"Total neighbors in ground truth: [%d]", len(predictions), min_num_of_results)
break
if predictions[j] in truth_set:
correct += 1.0

return correct / min_num_of_results

result = {
"weight": 1,
"unit": "ops",
"success": True,
}
# Add recall@k and recall@1 to the initial result only if k is present in the params
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)

recall_processing_start = time.perf_counter()
if detailed_results:
props = parse(response, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"])
hits_total = props.get("hits.total.value", props.get("hits.total", 0))
Expand All @@ -1399,8 +1344,6 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
"timed_out": timed_out,
"took": took
})

recall_processing_start = time.perf_counter()
response_json = json.loads(response.getvalue())
self.logger.info("Response json %s", response_json)
if _is_empty_search_results(response_json):
Expand All @@ -1415,23 +1358,12 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
continue
candidates.append(field_value)
neighbors_dataset = params["neighbors"]
num_neighbors = params.get("k", 1)
recall_k = calculate_recall(candidates, neighbors_dataset, num_neighbors)
result.update({"recall@k": recall_k})

if "k" in params:
num_neighbors = params.get("k", 1)
recall_top_k = calculate_topk_search_recall(candidates, neighbors_dataset, num_neighbors)
recall_top_1 = calculate_topk_search_recall(candidates, neighbors_dataset, 1)
result.update({"recall@k": recall_top_k})
result.update({"recall@1": recall_top_1})

if "max_distance" in params or "min_score" in params:
recall_threshold = calculate_radial_search_recall(candidates, neighbors_dataset)
recall_top_1 = calculate_radial_search_recall(candidates, neighbors_dataset, True)
if "min_score" in params:
result.update({"recall@min_score": recall_threshold})
result.update({"recall@min_score_1": recall_top_1})
elif "max_distance" in params:
result.update({"recall@max_distance": recall_threshold})
result.update({"recall@max_distance_1": recall_top_1})
recall_1 = calculate_recall(candidates, neighbors_dataset, 1)
result.update({"recall@1": recall_1})

recall_processing_end = time.perf_counter()
recall_processing_time = convert.seconds_to_ms(recall_processing_end - recall_processing_start)
Expand Down
68 changes: 8 additions & 60 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,8 +1041,6 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
request-params: query parameters that can be passed to search request
"""
PARAMS_NAME_K = "k"
PARAMS_NAME_MAX_DISTANCE = "max_distance"
PARAMS_NAME_MIN_SCORE = "min_score"
PARAMS_NAME_BODY = "body"
PARAMS_NAME_SIZE = "size"
PARAMS_NAME_QUERY = "query"
Expand All @@ -1059,27 +1057,11 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
PARAMS_NAME_REQUEST_PARAMS = "request-params"
PARAMS_NAME_SOURCE = "_source"
PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results"
MIN_SCORE_QUERY_TYPE = "min_score"
MAX_DISTANCE_QUERY_TYPE = "max_distance"
KNN_QUERY_TYPE = "knn"
DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE = 10000

def __init__(self, workloads, params, query_params, **kwargs):
super().__init__(workloads, params, Context.QUERY, **kwargs)
self.logger = logging.getLogger(__name__)
self.k = None
self.distance = None
self.score = None
if self.PARAMS_NAME_K in params:
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.query_type = self.KNN_QUERY_TYPE
if self.PARAMS_NAME_MAX_DISTANCE in params:
self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params)
self.query_type = self.MAX_DISTANCE_QUERY_TYPE
if self.PARAMS_NAME_MIN_SCORE in params:
self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params)
self.query_type = self.MIN_SCORE_QUERY_TYPE
self._validate_query_type_parameters()
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1)
self.current_rep = 1
self.neighbors_data_set_format = parse_string_parameter(
Expand All @@ -1092,6 +1074,7 @@ def __init__(self, workloads, params, query_params, **kwargs):
self.PARAMS_VALUE_VECTOR_SEARCH)
self.query_params = query_params
self.query_params.update({
self.PARAMS_NAME_K: self.k,
self.PARAMS_NAME_OPERATION_TYPE: operation_type,
self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME),
})
Expand Down Expand Up @@ -1120,10 +1103,6 @@ def __init__(self, workloads, params, query_params, **kwargs):
neighbors_corpora = self.extract_corpora(self.neighbors_data_set_corpus, self.neighbors_data_set_format)
self.corpora.extend(corpora for corpora in neighbors_corpora if corpora not in self.corpora)

def _validate_query_type_parameters(self):
if bool(self.k) + bool(self.distance) + bool(self.score) > 1:
raise ValueError("Only one of k, max_distance, or min_score can be specified in vector search.")

@staticmethod
def _validate_neighbors_data_set(file_path, corpus):
if file_path and corpus:
Expand All @@ -1138,25 +1117,11 @@ def _update_request_params(self):
self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false")
self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params})

def _get_query_neighbors(self):
if self.query_type == self.KNN_QUERY_TYPE:
return Context.NEIGHBORS
if self.query_type == self.MIN_SCORE_QUERY_TYPE:
return Context.MIN_SCORE_NEIGHBORS
if self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
return Context.MAX_DISTANCE_NEIGHBORS
raise Exception("Unknown query type [%s]" % self.query_type)

def _get_query_size(self):
if self.query_type == self.KNN_QUERY_TYPE:
return self.k
return self.DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE

def _update_body_params(self, vector):
# accept body params if passed from workload, else, create empty dictionary
body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict()
if self.PARAMS_NAME_SIZE not in body_params:
body_params[self.PARAMS_NAME_SIZE] = self._get_query_size()
body_params[self.PARAMS_NAME_SIZE] = self.k
if self.PARAMS_NAME_QUERY in body_params:
self.logger.warning(
"[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY)
Expand Down Expand Up @@ -1190,7 +1155,7 @@ def partition(self, partition_index, total_partitions):
self.neighbors_data_set_path = self.data_set_path
# add neighbor instance to partition
partition.neighbors_data_set = get_data_set(
self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors())
self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS)
partition.neighbors_data_set.seek(partition.offset)
return partition

Expand All @@ -1209,7 +1174,7 @@ def params(self):
raise StopIteration
vector = self.data_set.read(1)[0]
neighbor = self.neighbors_data_set.read(1)[0]
true_neighbors = list(map(str, neighbor[:self.k] if self.k else neighbor))
true_neighbors = list(map(str, neighbor[:self.k]))
self.query_params.update({
"neighbors": true_neighbors,
})
Expand All @@ -1224,30 +1189,13 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None, filter_
neighbor search against a k-NN plugin index
Args:
vector: vector used for query
efficient_filter: efficient filter used for query
Returns:
A dictionary containing the body used for search query
"""
query = {}
if self.query_type == self.KNN_QUERY_TYPE:
query.update({
"k": self.k,
})
elif self.query_type == self.MIN_SCORE_QUERY_TYPE:
query.update({
"min_score": self.score,
})
elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
query.update({
"max_distance": self.distance,
})
else:
raise Exception("Unknown query type [%s]" % self.query_type)

query.update({
query = {
"vector": vector,
})

"k": self.k,
}
if efficient_filter:
query.update({
"filter": efficient_filter,
Expand Down
87 changes: 0 additions & 87 deletions tests/workload/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3194,93 +3194,6 @@ def test_script_score_filter(self):



def test_params_when_multiple_query_type_provided_then_raise_exception(self):
# Create a data set
data_set_path = create_data_set(
self.DEFAULT_NUM_VECTORS,
self.DEFAULT_DIMENSION,
self.DEFAULT_TYPE,
Context.QUERY,
self.data_set_dir
)
neighbors_data_set_path = create_data_set(
self.DEFAULT_NUM_VECTORS,
self.DEFAULT_DIMENSION,
self.DEFAULT_TYPE,
Context.NEIGHBORS,
self.data_set_dir
)

test_param_source_params_1 = {
"field": self.DEFAULT_FIELD_NAME,
"data_set_format": self.DEFAULT_TYPE,
"data_set_path": data_set_path,
"neighbors_data_set_path": neighbors_data_set_path,
"k": 10,
"min_score": 0.5,
}

with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."):
query_param_source = VectorSearchPartitionParamSource(
workload.Workload(name="unit-test"),
test_param_source_params_1, {
"index": self.DEFAULT_INDEX_NAME,
"request-params": {},
"body": {
"size": 100,
}
}
)
# This line won't be executed if exception is raised during initialization
query_param_source.partition(0, 1)

test_param_source_params_2 = {
"field": self.DEFAULT_FIELD_NAME,
"data_set_format": self.DEFAULT_TYPE,
"data_set_path": data_set_path,
"neighbors_data_set_path": neighbors_data_set_path,
"k": 10,
"max_distance": 100.0,
}

with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."):
query_param_source = VectorSearchPartitionParamSource(
workload.Workload(name="unit-test"),
test_param_source_params_2, {
"index": self.DEFAULT_INDEX_NAME,
"request-params": {},
"body": {
"size": 100,
}
}
)
# This line won't be executed if exception is raised during initialization
query_param_source.partition(0, 1)

test_param_source_params_3 = {
"field": self.DEFAULT_FIELD_NAME,
"data_set_format": self.DEFAULT_TYPE,
"data_set_path": data_set_path,
"neighbors_data_set_path": neighbors_data_set_path,
"min_score": 0.5,
"max_distance": 100.0,
"k": 10,
}

with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."):
query_param_source = VectorSearchPartitionParamSource(
workload.Workload(name="unit-test"),
test_param_source_params_3, {
"index": self.DEFAULT_INDEX_NAME,
"request-params": {},
"body": {
"size": 100,
}
}
)
# This line won't be executed if exception is raised during initialization
query_param_source.partition(0, 1)

def _check_params(
self,
actual_params: dict,
Expand Down

0 comments on commit 8f07c3a

Please sign in to comment.