Skip to content

Commit

Permalink
Add benchmark support for vector radial search (#546)
Browse files Browse the repository at this point in the history
Signed-off-by: Junqiu Lei <junqiu@amazon.com>
  • Loading branch information
junqiu-lei authored Aug 2, 2024
1 parent fc9c147 commit 1eb5171
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 25 deletions.
8 changes: 8 additions & 0 deletions osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Context(Enum):
INDEX = 1
QUERY = 2
NEIGHBORS = 3
MAX_DISTANCE_NEIGHBORS = 4
MIN_SCORE_NEIGHBORS = 5


class DataSet(ABC):
Expand Down Expand Up @@ -141,6 +143,12 @@ def parse_context(context: Context) -> str:
if context == Context.QUERY:
return "test"

if context == Context.MAX_DISTANCE_NEIGHBORS:
return "max_distance_neighbors"

if context == Context.MIN_SCORE_NEIGHBORS:
return "min_score_neighbors"

raise Exception("Unsupported context")


Expand Down
96 changes: 82 additions & 14 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,13 +1232,6 @@ async def _vector_search_query_with_recall(opensearch, params):
Perform vector search and report recall@k , recall@r and time taken to perform recall in ms as
meta object.
"""
result = {
"weight": 1,
"unit": "ops",
"success": True,
"recall@k": 0,
"recall@1": 0,
}

def _is_empty_search_results(content):
if content is None:
Expand All @@ -1261,7 +1254,7 @@ def _get_field_value(content, field_name):
return _get_field_value(content["_source"], field_name)
return None

def calculate_recall(predictions, neighbors, top_k):
def calculate_topk_search_recall(predictions, neighbors, top_k):
"""
Calculates the recall by comparing top_k neighbors with predictions.
recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth
Expand Down Expand Up @@ -1289,9 +1282,71 @@ def calculate_recall(predictions, neighbors, top_k):

return correct / min_num_of_results

def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=False):
"""
Calculates the recall by comparing max_distance/min_score threshold neighbors with predictions.
recall = Sum of matched neighbors from predictions / total number of neighbors from ground truth
Args:
predictions: list containing ids of results returned by OpenSearch.
neighbors: list containing ids of the actual neighbors for a set of queries
enable_top_1_recall: boolean to calculate recall@1
Returns:
Recall between predictions and top k neighbors from ground truth
"""
correct = 0.0
try:
n = neighbors.index('-1')
# Slice the list to have a length of n
truth_set = neighbors[:n]
except ValueError:
# If '-1' is not found in the list, use the entire list
truth_set = neighbors
min_num_of_results = len(truth_set)
if min_num_of_results == 0:
self.logger.info("No neighbors are provided for recall calculation")
return 1

if enable_top_1_recall:
min_num_of_results = 1

for j in range(min_num_of_results):
if j >= len(predictions):
self.logger.info("No more neighbors in prediction to compare against ground truth.\n"
"Total neighbors in prediction: [%d].\n"
"Total neighbors in ground truth: [%d]", len(predictions), min_num_of_results)
break
if predictions[j] in truth_set:
correct += 1.0

return correct / min_num_of_results

result = {
"weight": 1,
"unit": "ops",
"success": True,
}
# Add recall@k and recall@1 to the initial result only if k is present in the params
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)
recall_processing_start = time.perf_counter()

if detailed_results:
props = parse(response, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"])
hits_total = props.get("hits.total.value", props.get("hits.total", 0))
Expand All @@ -1305,6 +1360,8 @@ def calculate_recall(predictions, neighbors, top_k):
"timed_out": timed_out,
"took": took
})

recall_processing_start = time.perf_counter()
response_json = json.loads(response.getvalue())
if _is_empty_search_results(response_json):
self.logger.info("Vector search query returned no results.")
Expand All @@ -1318,12 +1375,23 @@ def calculate_recall(predictions, neighbors, top_k):
continue
candidates.append(field_value)
neighbors_dataset = params["neighbors"]
num_neighbors = params.get("k", 1)
recall_k = calculate_recall(candidates, neighbors_dataset, num_neighbors)
result.update({"recall@k": recall_k})

recall_1 = calculate_recall(candidates, neighbors_dataset, 1)
result.update({"recall@1": recall_1})
if "k" in params:
num_neighbors = params.get("k", 1)
recall_top_k = calculate_topk_search_recall(candidates, neighbors_dataset, num_neighbors)
recall_top_1 = calculate_topk_search_recall(candidates, neighbors_dataset, 1)
result.update({"recall@k": recall_top_k})
result.update({"recall@1": recall_top_1})

if "max_distance" in params or "min_score" in params:
recall_threshold = calculate_radial_search_recall(candidates, neighbors_dataset)
recall_top_1 = calculate_radial_search_recall(candidates, neighbors_dataset, True)
if "min_score" in params:
result.update({"recall@min_score": recall_threshold})
result.update({"recall@min_score_1": recall_top_1})
elif "max_distance" in params:
result.update({"recall@max_distance": recall_threshold})
result.update({"recall@max_distance_1": recall_top_1})

recall_processing_end = time.perf_counter()
recall_processing_time = convert.seconds_to_ms(recall_processing_end - recall_processing_start)
Expand Down
86 changes: 75 additions & 11 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from osbenchmark import exceptions
from osbenchmark.utils import io
from osbenchmark.utils.dataset import DataSet, get_data_set, Context
from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter
from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter
from osbenchmark.workload import workload

__PARAM_SOURCES_BY_OP = {}
Expand Down Expand Up @@ -1027,6 +1027,8 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
request-params: query parameters that can be passed to search request
"""
PARAMS_NAME_K = "k"
PARAMS_NAME_MAX_DISTANCE = "max_distance"
PARAMS_NAME_MIN_SCORE = "min_score"
PARAMS_NAME_BODY = "body"
PARAMS_NAME_SIZE = "size"
PARAMS_NAME_QUERY = "query"
Expand All @@ -1041,11 +1043,27 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
PARAMS_NAME_REQUEST_PARAMS = "request-params"
PARAMS_NAME_SOURCE = "_source"
PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results"
MIN_SCORE_QUERY_TYPE = "min_score"
MAX_DISTANCE_QUERY_TYPE = "max_distance"
KNN_QUERY_TYPE = "knn"
DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE = 10000

def __init__(self, workloads, params, query_params, **kwargs):
super().__init__(workloads, params, Context.QUERY, **kwargs)
self.logger = logging.getLogger(__name__)
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.k = None
self.distance = None
self.score = None
if self.PARAMS_NAME_K in params:
self.k = parse_int_parameter(self.PARAMS_NAME_K, params)
self.query_type = self.KNN_QUERY_TYPE
if self.PARAMS_NAME_MAX_DISTANCE in params:
self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params)
self.query_type = self.MAX_DISTANCE_QUERY_TYPE
if self.PARAMS_NAME_MIN_SCORE in params:
self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params)
self.query_type = self.MIN_SCORE_QUERY_TYPE
self._validate_query_type_parameters()
self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1)
self.current_rep = 1
self.neighbors_data_set_format = parse_string_parameter(
Expand All @@ -1058,10 +1076,21 @@ def __init__(self, workloads, params, query_params, **kwargs):
self.PARAMS_VALUE_VECTOR_SEARCH)
self.query_params = query_params
self.query_params.update({
self.PARAMS_NAME_K: self.k,
self.PARAMS_NAME_OPERATION_TYPE: operation_type,
self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME),
})
if self.PARAMS_NAME_K in params:
self.query_params.update({
self.PARAMS_NAME_K: self.k
})
if self.PARAMS_NAME_MAX_DISTANCE in params:
self.query_params.update({
self.PARAMS_NAME_MAX_DISTANCE: self.distance
})
if self.PARAMS_NAME_MIN_SCORE in params:
self.query_params.update({
self.PARAMS_NAME_MIN_SCORE: self.score
})
if self.PARAMS_NAME_FILTER in params:
self.query_params.update({
self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER)
Expand All @@ -1072,6 +1101,10 @@ def __init__(self, workloads, params, query_params, **kwargs):
neighbors_corpora = self.extract_corpora(self.neighbors_data_set_corpus, self.neighbors_data_set_format)
self.corpora.extend(corpora for corpora in neighbors_corpora if corpora not in self.corpora)

def _validate_query_type_parameters(self):
if bool(self.k) + bool(self.distance) + bool(self.score) > 1:
raise ValueError("Only one of k, max_distance, or min_score can be specified in vector search.")

@staticmethod
def _validate_neighbors_data_set(file_path, corpus):
if file_path and corpus:
Expand All @@ -1086,15 +1119,29 @@ def _update_request_params(self):
self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false")
self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params})

def _get_query_neighbors(self):
if self.query_type == self.KNN_QUERY_TYPE:
return Context.NEIGHBORS
if self.query_type == self.MIN_SCORE_QUERY_TYPE:
return Context.MIN_SCORE_NEIGHBORS
if self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
return Context.MAX_DISTANCE_NEIGHBORS
raise Exception("Unknown query type [%s]" % self.query_type)

def _get_query_size(self):
if self.query_type == self.KNN_QUERY_TYPE:
return self.k
return self.DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE

def _update_body_params(self, vector):
# accept body params if passed from workload, else, create empty dictionary
body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict()
if self.PARAMS_NAME_SIZE not in body_params:
body_params[self.PARAMS_NAME_SIZE] = self.k
body_params[self.PARAMS_NAME_SIZE] = self._get_query_size()
if self.PARAMS_NAME_QUERY in body_params:
self.logger.warning(
"[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY)
efficient_filter=self.query_params.get(self.PARAMS_NAME_FILTER)
efficient_filter = self.query_params.get(self.PARAMS_NAME_FILTER)
# override query params with vector search query
body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter)
self.query_params.update({self.PARAMS_NAME_BODY: body_params})
Expand All @@ -1110,7 +1157,7 @@ def partition(self, partition_index, total_partitions):
self.neighbors_data_set_path = self.data_set_path
# add neighbor instance to partition
partition.neighbors_data_set = get_data_set(
self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS)
self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors())
partition.neighbors_data_set.seek(partition.offset)
return partition

Expand All @@ -1129,7 +1176,7 @@ def params(self):
raise StopIteration
vector = self.data_set.read(1)[0]
neighbor = self.neighbors_data_set.read(1)[0]
true_neighbors = list(map(str, neighbor[:self.k]))
true_neighbors = list(map(str, neighbor[:self.k] if self.k else neighbor))
self.query_params.update({
"neighbors": true_neighbors,
})
Expand All @@ -1140,17 +1187,34 @@ def params(self):
return self.query_params

def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict:
"""Builds a k-NN request that can be used to execute an approximate nearest
"""Builds a vector search request that can be used to execute an approximate nearest
neighbor search against a k-NN plugin index
Args:
vector: vector used for query
efficient_filter: efficient filter used for query
Returns:
A dictionary containing the body used for search query
"""
query = {
query = {}
if self.query_type == self.KNN_QUERY_TYPE:
query.update({
"k": self.k,
})
elif self.query_type == self.MIN_SCORE_QUERY_TYPE:
query.update({
"min_score": self.score,
})
elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE:
query.update({
"max_distance": self.distance,
})
else:
raise Exception("Unknown query type [%s]" % self.query_type)

query.update({
"vector": vector,
"k": self.k,
}
})

if efficient_filter:
query.update({
"filter": efficient_filter,
Expand Down
Loading

0 comments on commit 1eb5171

Please sign in to comment.