Skip to content

Commit

Permalink
Support Vector Search Nested benchmark (#584)
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Roblin <finnrobl@amazon.com>
  • Loading branch information
finnroblin authored Aug 22, 2024
1 parent f4ab3ab commit 2532d77
Show file tree
Hide file tree
Showing 5 changed files with 491 additions and 29 deletions.
3 changes: 3 additions & 0 deletions osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Context(Enum):
NEIGHBORS = 3
MAX_DISTANCE_NEIGHBORS = 4
MIN_SCORE_NEIGHBORS = 5
PARENTS = 6


class DataSet(ABC):
Expand Down Expand Up @@ -143,6 +144,8 @@ def parse_context(context: Context) -> str:
if context == Context.QUERY:
return "test"

if context == Context.PARENTS:
return "parents" # used in nested benchmarks to get the parent document id associated with each vector.
if context == Context.MAX_DISTANCE_NEIGHBORS:
return "max_distance_neighbors"

Expand Down
192 changes: 164 additions & 28 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional, Tuple

import numpy as np

Expand Down Expand Up @@ -880,10 +880,12 @@ class VectorDataSetPartitionParamSource(ParamSource):
offset: Offset into the data set to start at. Relevant when there are
multiple partitions
"""
NESTED_FIELD_SEPARATOR = "."

def __init__(self, workload, params, context: Context, **kwargs):
super().__init__(workload, params, **kwargs)
self.field_name: str = parse_string_parameter("field", params)
self.is_nested = self.NESTED_FIELD_SEPARATOR in self.field_name # in base class because used for both bulk ingest and queries.
self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params, "")
Expand Down Expand Up @@ -979,6 +981,18 @@ def partition(self, partition_index, total_partitions):
partition_x.current = partition_x.offset
return partition_x

def get_split_fields(self) -> Tuple[str, str]:
fields_as_array = self.field_name.split(self.NESTED_FIELD_SEPARATOR)

# TODO: Add support to multiple levels of nesting if a future benchmark requires it.

if len(fields_as_array) != 2:
raise ValueError(
f"Field name {self.field_name} is not a nested field name. Currently we support only 1 level of nesting."
)
return fields_as_array[0], fields_as_array[1]


@abstractmethod
def params(self):
"""
Expand Down Expand Up @@ -1219,12 +1233,24 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict
query.update({
"filter": efficient_filter,
})
return {

knn_search_query = {
"knn": {
self.field_name: query,
},
}

if self.is_nested:
outer_field_name, _ = self.get_split_fields()
return {
"nested": {
"path": outer_field_name,
"query": knn_search_query
}
}

return knn_search_query


class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
""" Create bulk index requests from a data set of vectors.
Expand All @@ -1241,13 +1267,74 @@ class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
def __init__(self, workload, params, **kwargs):
super().__init__(workload, params, Context.INDEX, **kwargs)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params,
self.DEFAULT_RETRIES)
self.retries: int = parse_int_parameter("retries", params, self.DEFAULT_RETRIES)
self.index_name: str = parse_string_parameter("index", params)
self.id_field_name: str = parse_string_parameter(
self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME)
self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME
)

def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
self.action_buffer = None
self.num_nested_vectors = 10

self.parent_data_set_path = parse_string_parameter(
"parents_data_set_path", params, self.data_set_path
)

self.parent_data_set_format = self.data_set_format

self.parent_data_set_corpus = self.data_set_corpus

self.logger = logging.getLogger(__name__)

def partition(self, partition_index, total_partitions):
partition = super().partition(partition_index, total_partitions)
if self.parent_data_set_corpus and not self.parent_data_set_path:
parent_data_set_path = self._get_corpora_file_paths(
self.parent_data_set_corpus, self.parent_data_set_format
)
self._validate_data_set_corpus(parent_data_set_path)
self.parent_data_set_path = parent_data_set_path[0]
if not self.parent_data_set_path:
self.parent_data_set_path = self.data_set_path
# add neighbor instance to partition
if self.is_nested:
partition.parent_data_set = get_data_set(
self.parent_data_set_format, self.parent_data_set_path, Context.PARENTS
)
partition.parent_data_set.seek(partition.offset)

return partition

def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
"""
Create bulk ingest actions for data with a non-nested field.
"""
actions = []

_ = [
actions.extend([action(self.id_field_name, i + self.current), None])
for i in range(len(partition))
]
bulk_contents = []

add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME
for vec, identifier in zip(
partition.tolist(), range(self.current, self.current + len(partition))
):
row = {self.field_name: vec}
if add_id_field_to_body:
row.update({self.id_field_name: identifier})
bulk_contents.append(row)

actions[1::2] = bulk_contents

self.logger.info("Actions: %s", actions)
return actions


def bulk_transform(
self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray]
) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
injection format.
Args:
Expand All @@ -1257,19 +1344,63 @@ def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
Returns:
An array of transformed vectors in bulk format.
"""

if not self.is_nested:
return self.bulk_transform_non_nested(partition, action)

actions = []
_ = [
actions.extend([action(self.id_field_name, i + self.current), None])
for i in range(len(partition))
]
bulk_contents = []

outer_field_name, inner_field_name = self.get_split_fields()

add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME
for vec, identifier in zip(partition.tolist(), range(self.current, self.current + len(partition))):
row = {self.field_name: vec}

if self.action_buffer is None:
first_index_of_parent_ids = 0
self.action_buffer = {outer_field_name: []}
self.action_parent_id = parents_ids[first_index_of_parent_ids]
if add_id_field_to_body:
row.update({self.id_field_name: identifier})
bulk_contents.append(row)
actions[1::2] = bulk_contents
self.action_buffer.update({self.id_field_name: self.action_parent_id})

part_list = partition.tolist()
for i in range(len(partition)):

nested = {inner_field_name: part_list[i]}

current_parent_id = parents_ids[i]

if self.action_parent_id == current_parent_id:
self.action_buffer[outer_field_name].append(nested)
else:
# flush action buffer
actions.extend(
[
action(self.id_field_name, self.action_parent_id),
self.action_buffer,
]
)

self.current += len(self.action_buffer[outer_field_name])

self.action_buffer = {outer_field_name: []}
if add_id_field_to_body:

self.action_buffer.update({self.id_field_name: current_parent_id})

self.action_buffer[outer_field_name].append(nested)

self.action_parent_id = current_parent_id

max_position = self.offset + self.num_vectors
if (
self.current + len(self.action_buffer[outer_field_name]) + self.bulk_size
>= max_position
):
# final flush of remaining vectors in the last partition (for the last client)
self.current += len(self.action_buffer[outer_field_name])
actions.extend(
[action(self.id_field_name, self.action_parent_id), self.action_buffer]
)

return actions

def params(self):
Expand All @@ -1281,29 +1412,34 @@ def params(self):

def action(id_field_name, doc_id):
# support only index operation
bulk_action = 'index'
metadata = {
'_index': self.index_name
}
bulk_action = "index"
metadata = {"_index": self.index_name}
# Add id field to metadata only if it is _id
if id_field_name == self.DEFAULT_ID_FIELD_NAME:
metadata.update({id_field_name: doc_id})
return {bulk_action: metadata}

remaining_vectors_in_partition = self.num_vectors + self.offset - self.current
# update bulk size if number of vectors to read is less than actual bulk size

bulk_size = min(self.bulk_size, remaining_vectors_in_partition)

partition = self.data_set.read(bulk_size)
body = self.bulk_transform(partition, action)

if self.is_nested:
parent_ids = self.parent_data_set.read(bulk_size)
else:
parent_ids = None

body = self.bulk_transform(partition, action, parent_ids)
size = len(body) // 2
self.current += size

if not self.is_nested:
# in the nested case, we may have irregular number of vectors ingested,
# so we calculate self.current within bulk_transform method when self.is_nested.
self.current += size
self.percent_completed = self.current / self.total

return {
"body": body,
"retries": self.retries,
"size": size
}
return {"body": body, "retries": self.retries, "size": size}


def get_target(workload, params):
Expand Down
Binary file added small-nested-works.hdf5
Binary file not shown.
35 changes: 35 additions & 0 deletions tests/utils/dataset_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ def _build_data_set(self, context: DataSetBuildContext):
# file with distance.
context.vectors.tofile(f)

def create_parent_ids(num_vectors: int, group_size: int = 10) -> np.ndarray:
num_ids = (num_vectors + group_size - 1) // group_size # Calculate total number of different IDs needed
ids = np.arange(1, num_ids + 1) # Create an array of IDs starting from 1
parent_ids = np.repeat(ids, group_size)[:num_vectors] # Repeat each ID 'group_size' times and trim to 'num_vectors'
return parent_ids


def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray:
rng = np.random.default_rng()
Expand Down Expand Up @@ -239,6 +245,35 @@ def create_data_set(
return data_set_path


def create_parent_data_set(
num_vectors: int,
dimension: int,
extension: str,
data_set_context: Context,
data_set_dir,
file_path: str = None
) -> str:
if file_path:
data_set_path = file_path
else:
file_name_base = ''.join(random.choice(string.ascii_letters) for _ in
range(DEFAULT_RANDOM_STRING_LENGTH))
data_set_file_name = "{}.{}".format(file_name_base, extension)
data_set_path = os.path.join(data_set_dir, data_set_file_name)
context = DataSetBuildContext(
data_set_context,
create_parent_ids(num_vectors),
data_set_path)

if extension == HDF5DataSet.FORMAT_NAME:
HDF5Builder().add_data_set_build_context(context).build()
else:
BigANNVectorBuilder().add_data_set_build_context(context).build()

return data_set_path



def create_ground_truth(
num_queries: int,
k: int,
Expand Down
Loading

0 comments on commit 2532d77

Please sign in to comment.