diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 41b303073..7e773d586 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -26,6 +26,7 @@ class Context(Enum): NEIGHBORS = 3 MAX_DISTANCE_NEIGHBORS = 4 MIN_SCORE_NEIGHBORS = 5 + PARENTS = 6 class DataSet(ABC): @@ -143,6 +144,8 @@ def parse_context(context: Context) -> str: if context == Context.QUERY: return "test" + if context == Context.PARENTS: + return "parents" # used in nested benchmarks to get the parent document id associated with each vector. if context == Context.MAX_DISTANCE_NEIGHBORS: return "max_distance_neighbors" diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index d59ff557e..270fb6178 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -33,7 +33,7 @@ import time from abc import ABC, abstractmethod from enum import Enum -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional, Tuple import numpy as np @@ -880,10 +880,12 @@ class VectorDataSetPartitionParamSource(ParamSource): offset: Offset into the data set to start at. Relevant when there are multiple partitions """ + NESTED_FIELD_SEPARATOR = "." def __init__(self, workload, params, context: Context, **kwargs): super().__init__(workload, params, **kwargs) self.field_name: str = parse_string_parameter("field", params) + self.is_nested = self.NESTED_FIELD_SEPARATOR in self.field_name # in base class because used for both bulk ingest and queries. self.context = context self.data_set_format = parse_string_parameter("data_set_format", params) self.data_set_path = parse_string_parameter("data_set_path", params, "") @@ -979,6 +981,18 @@ def partition(self, partition_index, total_partitions): partition_x.current = partition_x.offset return partition_x + def get_split_fields(self) -> Tuple[str, str]: + fields_as_array = self.field_name.split(self.NESTED_FIELD_SEPARATOR) + + # TODO: Add support to multiple levels of nesting if a future benchmark requires it. + + if len(fields_as_array) != 2: + raise ValueError( + f"Field name {self.field_name} is not a nested field name. Currently we support only 1 level of nesting." + ) + return fields_as_array[0], fields_as_array[1] + + @abstractmethod def params(self): """ @@ -1219,12 +1233,24 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict query.update({ "filter": efficient_filter, }) - return { + + knn_search_query = { "knn": { self.field_name: query, }, } + if self.is_nested: + outer_field_name, _ = self.get_split_fields() + return { + "nested": { + "path": outer_field_name, + "query": knn_search_query + } + } + + return knn_search_query + class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): """ Create bulk index requests from a data set of vectors. @@ -1241,13 +1267,74 @@ class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): def __init__(self, workload, params, **kwargs): super().__init__(workload, params, Context.INDEX, **kwargs) self.bulk_size: int = parse_int_parameter("bulk_size", params) - self.retries: int = parse_int_parameter("retries", params, - self.DEFAULT_RETRIES) + self.retries: int = parse_int_parameter("retries", params, self.DEFAULT_RETRIES) self.index_name: str = parse_string_parameter("index", params) self.id_field_name: str = parse_string_parameter( - self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME) + self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME + ) - def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: + self.action_buffer = None + self.num_nested_vectors = 10 + + self.parent_data_set_path = parse_string_parameter( + "parents_data_set_path", params, self.data_set_path + ) + + self.parent_data_set_format = self.data_set_format + + self.parent_data_set_corpus = self.data_set_corpus + + self.logger = logging.getLogger(__name__) + + def partition(self, partition_index, total_partitions): + partition = super().partition(partition_index, total_partitions) + if self.parent_data_set_corpus and not self.parent_data_set_path: + parent_data_set_path = self._get_corpora_file_paths( + self.parent_data_set_corpus, self.parent_data_set_format + ) + self._validate_data_set_corpus(parent_data_set_path) + self.parent_data_set_path = parent_data_set_path[0] + if not self.parent_data_set_path: + self.parent_data_set_path = self.data_set_path + # add neighbor instance to partition + if self.is_nested: + partition.parent_data_set = get_data_set( + self.parent_data_set_format, self.parent_data_set_path, Context.PARENTS + ) + partition.parent_data_set.seek(partition.offset) + + return partition + + def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: + """ + Create bulk ingest actions for data with a non-nested field. + """ + actions = [] + + _ = [ + actions.extend([action(self.id_field_name, i + self.current), None]) + for i in range(len(partition)) + ] + bulk_contents = [] + + add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME + for vec, identifier in zip( + partition.tolist(), range(self.current, self.current + len(partition)) + ): + row = {self.field_name: vec} + if add_id_field_to_body: + row.update({self.id_field_name: identifier}) + bulk_contents.append(row) + + actions[1::2] = bulk_contents + + self.logger.info("Actions: %s", actions) + return actions + + + def bulk_transform( + self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray] + ) -> List[Dict[str, Any]]: """Partitions and transforms a list of vectors into OpenSearch's bulk injection format. Args: @@ -1257,19 +1344,63 @@ def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: Returns: An array of transformed vectors in bulk format. """ + + if not self.is_nested: + return self.bulk_transform_non_nested(partition, action) + actions = [] - _ = [ - actions.extend([action(self.id_field_name, i + self.current), None]) - for i in range(len(partition)) - ] - bulk_contents = [] + + outer_field_name, inner_field_name = self.get_split_fields() + add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME - for vec, identifier in zip(partition.tolist(), range(self.current, self.current + len(partition))): - row = {self.field_name: vec} + + if self.action_buffer is None: + first_index_of_parent_ids = 0 + self.action_buffer = {outer_field_name: []} + self.action_parent_id = parents_ids[first_index_of_parent_ids] if add_id_field_to_body: - row.update({self.id_field_name: identifier}) - bulk_contents.append(row) - actions[1::2] = bulk_contents + self.action_buffer.update({self.id_field_name: self.action_parent_id}) + + part_list = partition.tolist() + for i in range(len(partition)): + + nested = {inner_field_name: part_list[i]} + + current_parent_id = parents_ids[i] + + if self.action_parent_id == current_parent_id: + self.action_buffer[outer_field_name].append(nested) + else: + # flush action buffer + actions.extend( + [ + action(self.id_field_name, self.action_parent_id), + self.action_buffer, + ] + ) + + self.current += len(self.action_buffer[outer_field_name]) + + self.action_buffer = {outer_field_name: []} + if add_id_field_to_body: + + self.action_buffer.update({self.id_field_name: current_parent_id}) + + self.action_buffer[outer_field_name].append(nested) + + self.action_parent_id = current_parent_id + + max_position = self.offset + self.num_vectors + if ( + self.current + len(self.action_buffer[outer_field_name]) + self.bulk_size + >= max_position + ): + # final flush of remaining vectors in the last partition (for the last client) + self.current += len(self.action_buffer[outer_field_name]) + actions.extend( + [action(self.id_field_name, self.action_parent_id), self.action_buffer] + ) + return actions def params(self): @@ -1281,29 +1412,34 @@ def params(self): def action(id_field_name, doc_id): # support only index operation - bulk_action = 'index' - metadata = { - '_index': self.index_name - } + bulk_action = "index" + metadata = {"_index": self.index_name} # Add id field to metadata only if it is _id if id_field_name == self.DEFAULT_ID_FIELD_NAME: metadata.update({id_field_name: doc_id}) return {bulk_action: metadata} remaining_vectors_in_partition = self.num_vectors + self.offset - self.current - # update bulk size if number of vectors to read is less than actual bulk size + bulk_size = min(self.bulk_size, remaining_vectors_in_partition) + partition = self.data_set.read(bulk_size) - body = self.bulk_transform(partition, action) + + if self.is_nested: + parent_ids = self.parent_data_set.read(bulk_size) + else: + parent_ids = None + + body = self.bulk_transform(partition, action, parent_ids) size = len(body) // 2 - self.current += size + + if not self.is_nested: + # in the nested case, we may have irregular number of vectors ingested, + # so we calculate self.current within bulk_transform method when self.is_nested. + self.current += size self.percent_completed = self.current / self.total - return { - "body": body, - "retries": self.retries, - "size": size - } + return {"body": body, "retries": self.retries, "size": size} def get_target(workload, params): diff --git a/small-nested-works.hdf5 b/small-nested-works.hdf5 new file mode 100644 index 000000000..00b07c0d0 Binary files /dev/null and b/small-nested-works.hdf5 differ diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py index d8de935a9..c13a27b39 100644 --- a/tests/utils/dataset_helper.py +++ b/tests/utils/dataset_helper.py @@ -193,6 +193,12 @@ def _build_data_set(self, context: DataSetBuildContext): # file with distance. context.vectors.tofile(f) +def create_parent_ids(num_vectors: int, group_size: int = 10) -> np.ndarray: + num_ids = (num_vectors + group_size - 1) // group_size # Calculate total number of different IDs needed + ids = np.arange(1, num_ids + 1) # Create an array of IDs starting from 1 + parent_ids = np.repeat(ids, group_size)[:num_vectors] # Repeat each ID 'group_size' times and trim to 'num_vectors' + return parent_ids + def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray: rng = np.random.default_rng() @@ -239,6 +245,35 @@ def create_data_set( return data_set_path +def create_parent_data_set( + num_vectors: int, + dimension: int, + extension: str, + data_set_context: Context, + data_set_dir, + file_path: str = None +) -> str: + if file_path: + data_set_path = file_path + else: + file_name_base = ''.join(random.choice(string.ascii_letters) for _ in + range(DEFAULT_RANDOM_STRING_LENGTH)) + data_set_file_name = "{}.{}".format(file_name_base, extension) + data_set_path = os.path.join(data_set_dir, data_set_file_name) + context = DataSetBuildContext( + data_set_context, + create_parent_ids(num_vectors), + data_set_path) + + if extension == HDF5DataSet.FORMAT_NAME: + HDF5Builder().add_data_set_build_context(context).build() + else: + BigANNVectorBuilder().add_data_set_build_context(context).build() + + return data_set_path + + + def create_ground_truth( num_queries: int, k: int, diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 7b3c61da0..17c904e3e 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -37,7 +37,7 @@ from osbenchmark.workload import params, workload from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource, \ BulkVectorsFromDataSetParamSource -from tests.utils.dataset_helper import create_data_set +from tests.utils.dataset_helper import create_data_set, create_parent_data_set from tests.utils.dataset_test import DEFAULT_NUM_VECTORS @@ -3206,3 +3206,291 @@ def _check_params( self.assertFalse(expected_id_field in req_body) continue self.assertTrue(expected_id_field in req_body) + + +class VectorsNestedCase(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_VECTOR_FIELD_NAME = "nested.test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + DEFAULT_ID_FIELD_NAME = "_id" + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_invalid_nesting_scheme(self): + # Test with 0 "." in the vector field, with 2 "." in the vector field, and with a different separator. + invalid_nesting_schemes = ["a", "a.b.c", "a.b.c.d"] + for nesting_scheme in invalid_nesting_schemes: + with self.subTest(nesting_scheme=nesting_scheme): + bulk_param_source = BulkVectorsFromDataSetParamSource( + workload.Workload(name="unit-test"), + { + "index": self.DEFAULT_INDEX_NAME, + "field": nesting_scheme, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": "path", + "bulk_size": 10, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + }, + ) + with self.assertRaises(ValueError): + bulk_param_source.get_split_fields() + + def _test_params_default( + self, bulk_size, data_set_path, parent_data_set_path, num_vectors + ): + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "bulk_size": bulk_size, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + } + bulk_param_source = BulkVectorsFromDataSetParamSource( + workload.Workload(name="unit-test"), test_param_source_params + ) + bulk_param_source.parent_data_set_path = parent_data_set_path + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + actual_params = bulk_param_source_partition.params() + expected_num_docs = len(actual_params["body"]) // 2 + + self._check_params_nested( + actual_params, + self.DEFAULT_INDEX_NAME, + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + expected_num_vectors, + expected_num_docs, + self.DEFAULT_ID_FIELD_NAME, + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def test_params_default(self): + + bulk_sizes = [1, 3, 4, 10, 50] + + num_vectors = 49 + # bulk_size = 10 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir, + ) + parent_data_set_path = create_parent_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.PARENTS, + self.data_set_dir, + ) + + for bulk_size in bulk_sizes: + with self.subTest(bulk_size=bulk_size): + self._test_params_default( + bulk_size, data_set_path, parent_data_set_path, num_vectors + ) + + def test_params_custom(self): + num_vectors = 49 + bulk_size = 15 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir, + ) + + parent_data_set_path = create_parent_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.PARENTS, + self.data_set_dir, + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "parents_data_set_path": parent_data_set_path, + "bulk_size": bulk_size, + "id-field-name": "id", + } + + # todo is it weird with the parent data set path? + bulk_param_source = BulkVectorsFromDataSetParamSource( + workload.Workload(name="unit-test"), test_param_source_params + ) + bulk_param_source.parent_data_set_path = parent_data_set_path + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + # expected_num_vectors = 10, 30, 10, 9 (15, 15, 15, 4) + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + # expected_num_documents = min() + actual_params = bulk_param_source_partition.params() + expected_num_docs = len(actual_params["body"]) // 2 + self._check_params_nested( + actual_params, + self.DEFAULT_INDEX_NAME, + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + expected_num_vectors, + expected_num_docs, + "id", + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def test_build_vector_search_query_body(self): + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir + ) + create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir, + data_set_path + ) + + # Create a QueryVectorsFromDataSetParamSource with relevant params + test_param_source_params = { + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "k": k + } + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params, { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + } + ) + query_param_source_partition = query_param_source.partition(0, 1) + + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + self._check_query_params( + query_param_source_partition.params(), + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + k, + ) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source_partition.params() + + def _check_query_params( + self, + actual_params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int, + expected_size=None, + expected_filter=None, + ): + body = actual_params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + nested = query.get("nested") + self.assertIsInstance(nested, dict) + + outer, _inner = expected_field.split(".") + + path = nested.get("path") + self.assertEqual(path, outer) + + query_knn = nested.get("query").get("knn") + + field = query_knn.get(expected_field) + self.assertIsInstance(field, dict) + vector = field.get("vector") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + k = field.get("k") + self.assertEqual(k, expected_k) + neighbor = actual_params.get("neighbors") + self.assertIsInstance(neighbor, list) + self.assertEqual(len(neighbor), expected_dimension) + size = body.get("size") + self.assertEqual(size, expected_size if expected_size else expected_k) + self.assertEqual(field.get("filter"), expected_filter) + + def _check_params_nested( + self, + actual_params: dict, + expected_index: str, + expected_vector_field: str, + expected_dimension: int, + _expected_num_vectors_in_payload: int, + expected_num_docs_in_payload: int, + expected_id_field: str, + ): + size = actual_params.get("size") + self.assertEqual(size, expected_num_docs_in_payload) + body = actual_params.get("body") + self.assertIsInstance(body, list) + self.assertEqual(len(body) // 2, expected_num_docs_in_payload) + + # Bulk payload has 2 parts: first one is the header and the second one + # is the body. The header will have the index name and the body will + # have the vector + for header, req_body in zip(*[iter(body)] * 2): + index = header.get("index") + self.assertIsInstance(index, dict) + + index_name = index.get("_index") + self.assertEqual(index_name, expected_index) + # here, need to iterate over all of the nested fields. + outer, inner = expected_vector_field.split(".") + vector_list = req_body.get(outer) + self.assertIsInstance(vector_list, list) + for vec in vector_list: + actual_vec = vec.get(inner) + self.assertIsInstance(actual_vec, list) + + self.assertEqual(len(actual_vec), expected_dimension) + + if expected_id_field in index: + self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field) + self.assertFalse(expected_id_field in req_body) + continue + self.assertTrue(expected_id_field in req_body) + + def test_nested_vector_query_body(self): + # assert that _build_vector_search_query_body returns the correct thing. + pass