diff --git a/liteindex/defined_index.py b/liteindex/defined_index.py index a978d58..6f4209b 100644 --- a/liteindex/defined_index.py +++ b/liteindex/defined_index.py @@ -37,8 +37,8 @@ def __init__( name, schema=None, example=None, - from_csv=None, - load_data_from_csv=True, + import_from_file=None, + file_type=None, db_path=":memory:", memory_limit=64, ): @@ -65,9 +65,6 @@ def __init__( else: schema[k] = "other" - if not schema and from_csv: - pass - self.schema = schema self.hashed_key_schema = {} self.meta_table_name = f"__{name}_meta" @@ -85,7 +82,6 @@ def __init__( "datetime": "NUMBER", "other": "BLOB", } - self.not_allowed_character_in_id = chr(31) if not db_path == ":memory:": db_dir = os.path.dirname(self.db_path).strip() @@ -101,9 +97,6 @@ def __init__( self._parse_schema() self._create_table_and_meta_table() - if from_csv and load_data_from_csv: - pass - def __del__(self): if self._connection: self._connection.close() @@ -216,15 +209,17 @@ def update(self, data): # Iterate through each item in the data for k, _data in data.items(): - if self.not_allowed_character_in_id in k: - raise ValueError( - f"Invalid character '{self.not_allowed_character_in_id}' in id: {k}" - ) # Create a new dictionary to store processed (hashed key) data processed_data = {h: None for h in all_columns} processed_data["id"] = k processed_data["updated_at"] = time.time() for key, value in _data.items(): + if key not in self.schema: + raise ValueError(f"Key not in schema: {key} for id: {k}") + + if value is None: + continue + # Get the hashed equivalent of the key key_hash = self.original_key_to_key_hash[key] @@ -384,7 +379,7 @@ def group(self, keys, query): ) return { - _[0]: _[1].split(self.not_allowed_character_in_id) + _[0]: _[1].split(chr(31)) for _ in self._connection.execute(sql_query, sql_params).fetchall() } @@ -482,98 +477,3 @@ def math(self, key, op, query={}): def trigger(self): pass - - -if __name__ == "__main__": - schema = { - "name": "string", - "age": "number", - "password": "string", - "verified": "boolean", - "nicknames": "json", - "address_details": "json", - "profile_picture": "blob", - "user_vector": "other", - } - - index = DefinedIndex(name="user_details", schema=schema) - - index.update( - { - "user1": { - "name": "John Doe", - "age": 25, - "password": "password123", - "verified": True, - "nicknames": ["John", "Johnny"], - "address_details": { - "city": "New York", - "state": "New York", - "country": "USA", - }, - "profile_picture": b"some binary data here", - }, - "user2": { - "name": "Jane Doe", - "age": 22, - }, - } - ) - - index.update( - { - "user3": { - "name": "John Doe", - "age": 25, - "password": "password123", - "verified": True, - "nicknames": ["John", "Johnny"], - "address_details": { - "city": "New York", - "state": "New York", - "country": "USA", - }, - "profile_picture": b"some binary data here aaaa bbb", - }, - "user4": { - "name": "Jane Doe", - "age": 22, - }, - } - ) - - print("-->", index.list_optimized_keys()) - index.optimize_key_for_querying("name") - print("-->", index.list_optimized_keys()) - - print( - "---->", - index.math("age", "sum", query={"age": {"$gt": 20}}), - index.math("age", "sum", query={"age": {"$gt": 60}}), - ) - - # print(index.get("user1", "user2", "user3")) - - print(index.search(query={"age": {"$gt": 20}})) - - print("Get:", index.get(["user1", "user2", "user3", "user4"])) - - print(index.distinct(key="name", query={"age": {"$gt": 20}})) - - print(index.group(keys="name", query={"age": {"$gt": 20}})) - - print(index.count(query={"age": {"$gt": 20}}), index.count()) - - index.delete(ids=["user1", "user2"]) - - print(index.group(keys="name", query={"age": {"$gt": 20}})) - - print(index.count(query={"age": {"$gt": 20}})) - - index.clear() - - print(index.count(query={"age": {"$gt": 20}}), index.count()) - - index.drop() - - print(index.count(query={"age": {"$gt": 20}})) diff --git a/tests/2_test_defined_index.py b/tests/2_test_defined_index.py deleted file mode 100644 index a217343..0000000 --- a/tests/2_test_defined_index.py +++ /dev/null @@ -1,77 +0,0 @@ -import unittest -from defined_index import DefinedIndex - -class TestDefinedIndex(unittest.TestCase): - def setUp(self): - self.schema = { - "name": "", - "age": 0, - "is_student": False, - "address": { - "street": "", - "city": "", - "state": "", - "country": "" - }, - "courses": [ - { - "name": "", - "credits": 0, - "instructor": { - "name": "", - "title": "" - } - } - ] - } - self.index = DefinedIndex("test_index", schema=self.schema) - self.item = { - "name": "John Doe", - "age": 30, - "is_student": True, - "address": { - "street": "123 Main St", - "city": "New York", - "state": "NY", - "country": "USA" - }, - "courses": [ - { - "name": "Computer Science", - "credits": 3, - "instructor": { - "name": "Dr. Smith", - "title": "Professor" - } - } - ] - } - self.index.set("1", self.item) - - def compare_items(self, item1, item2): - return {k: v for k, v in item1.items() if k != 'id'} == {k: v for k, v in item2.items() if k != 'id'} - - def test_set_and_get(self): - retrieved_item = self.index.get("1") - self.assertTrue(self.compare_items(retrieved_item, self.item)) - - def test_search(self): - query = {"address": {"state": "NY"}} - results = list(self.index.search(query)) - self.assertEqual(len(results), 1) - self.assertTrue(self.compare_items(results[0][1], self.item)) - - def test_count(self): - query = {"is_student": True} - count = self.index.count(query) - self.assertEqual(count, 1) - - def test_sum_and_average(self): - query = {"is_student": True} - total_age = self.index.sum("age", query) - average_age = self.index.average("age", query) - self.assertEqual(total_age, 30) - self.assertEqual(average_age, 30) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/AnyIndex_test.py b/tests/AnyIndex_test.py deleted file mode 100644 index 48339b4..0000000 --- a/tests/AnyIndex_test.py +++ /dev/null @@ -1,145 +0,0 @@ -import uuid -import time -import random -import string -import unittest -from tabulate import tabulate - - -def generate_random_dicts(n): - keys = ["".join(random.choices(string.ascii_letters, k=10)) for i in range(1000)] - return keys, { - str(uuid.uuid4()): { - random.choice(keys): random.choice( - [ - None, - random.randint(0, 100), - random.uniform(0, 1), - "".join(random.choices(string.ascii_letters + string.digits, k=10)), - ] - ) - for i in range(random.randint(0, 20)) - } - for j in range(n) - } - - -import os -import sys - -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from liteindex import AnyIndex -from diskcache import Index - - -class TestAnyIndexPerformance(unittest.TestCase): - def test_set_and_update_performance(self): - all_possible_first_level_keys, random_dicts_dict = generate_random_dicts(100000) - - results = [] - - def test_set_speed(module_name, test_name, data_structure, data_dict): - start_time = time.time() - for key, value in data_dict.items(): - data_structure[key] = value - end_time = time.time() - results.append( - { - "Module Name": module_name, - "Test Name": test_name, - "Items": len(data_dict), - "Time (s)": end_time - start_time, - } - ) - - def test_update_speed(module_name, test_name, data_structure, data_dict): - start_time = time.time() - data_structure.update(data_dict) - end_time = time.time() - results.append( - { - "Module Name": module_name, - "Test Name": test_name, - "Items": len(data_dict), - "Time (s)": end_time - start_time, - } - ) - - def test_random_access_speed(module_name, test_name, data_structure): - unique_ids = list(random_dicts_dict.keys()) - random.shuffle(unique_ids) - start_time = time.time() - for key in unique_ids: - _ = data_structure[key] - end_time = time.time() - results.append( - { - "Module Name": module_name, - "Test Name": test_name, - "Items": len(unique_ids), - "Time (s)": end_time - start_time, - } - ) - - def test_iteration_speed(module_name, test_name, data_structure): - start_time = time.time() - for key, value in data_structure.items(): - pass - end_time = time.time() - results.append( - { - "Module Name": module_name, - "Test Name": test_name, - "Items": len(data_structure), - "Time (s)": end_time - start_time, - } - ) - - index = AnyIndex("test_any_index_in_memory") - test_set_speed("AnyIndex", "set", index, random_dicts_dict) - - index_2 = AnyIndex("test_any_index_in_memory_2") - test_update_speed("AnyIndex", "update", index_2, random_dicts_dict) - - index_3 = AnyIndex("test_any_index_in_memory", "test.db") - test_set_speed("AnyIndex (Disk)", "set", index_3, random_dicts_dict) - - index_4 = AnyIndex("test_any_index_in_memory_2", "test.db") - test_update_speed("AnyIndex (Disk)", "update", index_4, random_dicts_dict) - - diskcache_index = Index("test_diskcache_index_1") - test_set_speed("DiskCache", "set", diskcache_index, random_dicts_dict) - - diskcache_index_2 = Index("test_diskcache_index_2") - test_update_speed("DiskCache", "update", diskcache_index_2, random_dicts_dict) - - d = {} - test_set_speed("Dict", "set", d, random_dicts_dict) - - d2 = {} - test_update_speed("Dict", "update", d2, random_dicts_dict) - - test_random_access_speed("AnyIndex", "random access", index) - test_random_access_speed("AnyIndex", "random access", index_2) - test_random_access_speed("AnyIndex (Disk)", "random access", index_3) - test_random_access_speed("AnyIndex (Disk)", "random access", index_4) - test_random_access_speed("DiskCache", "random access", diskcache_index) - test_random_access_speed("DiskCache", "random access", diskcache_index_2) - test_random_access_speed("Dict", "random access", d) - test_random_access_speed("Dict", "random access", d2) - - test_iteration_speed("AnyIndex", "iteration", index) - test_iteration_speed("AnyIndex", "iteration", index_2) - test_iteration_speed("AnyIndex (Disk)", "iteration", index_3) - test_iteration_speed("AnyIndex (Disk)", "iteration", index_4) - test_iteration_speed("DiskCache", "iteration", diskcache_index) - test_iteration_speed("DiskCache", "iteration", diskcache_index_2) - test_iteration_speed("Dict", "iteration", d) - test_iteration_speed("Dict", "iteration", d2) - - print(tabulate(results, headers="keys", tablefmt="pretty")) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/random_data.py b/tests/random_data.py new file mode 100644 index 0000000..81e48ef --- /dev/null +++ b/tests/random_data.py @@ -0,0 +1,13 @@ +import secrets + +def generate_random_bytes(min_size, max_size): + # Generate a random size between min_size and max_size (inclusive) + size = secrets.randbelow(max_size - min_size + 1) + min_size + + # Generate random bytes of the generated size + random_bytes = secrets.token_bytes(size) + + return random_bytes + +def gener + diff --git a/tests/test_any_index.py b/tests/test_any_index.py deleted file mode 100644 index f5f255a..0000000 --- a/tests/test_any_index.py +++ /dev/null @@ -1,223 +0,0 @@ -import logging - -logging.basicConfig(level=logging.DEBUG) - -import random -import string - - -def random_string(length): - return "".join(random.choices(string.ascii_letters, k=length)) - - -def random_key(max_string_length=50): - return random_string(random.randint(1, max_string_length)) - - -def generate_random_dict_or_list( - max_depth=5, max_list_length=10, max_dict_length=10, max_string_length=8, depth=1 -): - def random_value(depth): - if depth > max_depth: - return random.choice( - [ - random.randint(0, 100), - random_string(random.randint(1, max_string_length)), - ] - ) - - choice = random.choices( - population=[0, 1, 2, 3, 4], - weights=[ - 35, - 35, - 10 * (max_depth - depth), - 10 * (max_depth - depth), - 10 * (max_depth - depth), - ], - k=1, - )[0] - - if choice == 0: # Integer - return random.randint(0, 100) - elif choice == 1: # String - return random_string(random.randint(1, max_string_length)) - elif choice == 2: # List - return [ - random_value(depth + 1) - for _ in range(random.randint(1, max_list_length)) - ] - elif choice == 3: # Dictionary - result = {} - for _ in range(random.randint(1, max_dict_length)): - key = random_key() - value = random_value(depth + 1) - result[key] = value - return result - else: # Nested dictionary inside a list - return generate_random_dict_or_list( - max_depth=max_depth, - max_list_length=max_list_length, - max_dict_length=max_dict_length, - max_string_length=max_string_length, - depth=depth + 1, - ) - - if random.random() < 0.5: # 50% chance of generating a list - return [ - random_value(depth + 1) for _ in range(random.randint(1, max_list_length)) - ] - else: # 50% chance of generating a dictionary - result = {} - for _ in range(random.randint(1, max_dict_length)): - key = random_key() - value = random_value(depth + 1) - result[key] = value - return result - - -import unittest -from random import choice -from string import ascii_letters -from time import time -import os -import sys - -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from liteindex import AnyIndex - - -class TestAnyIndex(unittest.TestCase): - def setUp(self): - self.index_name = "test_any_index" - self.db_path = "test_db" - self.index = AnyIndex(self.index_name, self.db_path) - - def tearDown(self): - del self.index - os.remove(self.db_path) - - if os.path.exists(f"{self.db_path}-wal"): - os.remove(path=f"{self.db_path}-wal") - - if os.path.exists(f"{self.db_path}-shm"): - os.remove(path=f"{self.db_path}-shm") - - def random_key(self, length=10): - return "".join(choice(ascii_letters) for _ in range(length)) - - def test_set_and_get_item(self): - test_dict = generate_random_dict_or_list() - test_key = self.random_key() - - self.index[test_key] = test_dict - self.assertEqual(test_dict, self.index[test_key].get_object()) - - def test_update_value(self): - test_key = self.random_key() - initial_dict = generate_random_dict_or_list() - updated_dict = generate_random_dict_or_list() - - self.index[test_key] = initial_dict - self.index[test_key] = updated_dict - - self.assertEqual(updated_dict, self.index[test_key].get_object()) - - def test_del_item(self): - test_key = self.random_key() - test_dict = generate_random_dict_or_list() - - self.index[test_key] = test_dict - del self.index[test_key] - - with self.assertRaises(KeyError): - _ = self.index[test_key] - - def test_len(self): - test_keys = [self.random_key() for _ in range(10)] - - for i, key in enumerate(test_keys): - self.index[key] = generate_random_dict_or_list() - self.assertEqual(len(self.index), i + 1) - - def test_contains(self): - test_key = self.random_key() - test_dict = generate_random_dict_or_list() - - self.index[test_key] = test_dict - self.assertIn(test_key, self.index) - - def test_iter(self): - initial_keys = set(self.index.keys()) - - test_keys = [self.random_key() for _ in range(10)] - test_dicts = [generate_random_dict_or_list() for _ in range(10)] - - for key, value in zip(test_keys, test_dicts): - self.index[key] = value - - iterated_keys = set(self.index.keys()) - - # Check if new test keys are present in the iterated keys - for key in test_keys: - self.assertIn(key, iterated_keys) - - # Check if initial keys are present in the iterated keys - for key in initial_keys: - self.assertIn(key, iterated_keys) - - def test_nested_dict(self): - test_key = self.random_key() - - test_key_2 = self.random_key() - - test_dict = generate_random_dict_or_list() - - test_key_3 = self.random_key(5) - - test_dict[test_key_2] = {test_key_3: "Initial value"} - - self.index[test_key] = test_dict - initial_value = self.index[test_key][test_key_2][test_key_3] - - updated_value = "Updated value" - self.index[test_key][test_key_2][test_key_3] = updated_value - - retrieved_value = self.index[test_key][test_key_2][test_key_3] - - self.assertEqual(updated_value, retrieved_value) - self.assertNotEqual(initial_value, retrieved_value) - - def test_performance(self): - n = 1000 - test_keys = [self.random_key() for _ in range(n)] - test_dicts = [generate_random_dict_or_list() for _ in range(n)] - - start_time = time() - for key, value in zip(test_keys, test_dicts): - self.index[key] = value - - elapsed_time = time() - start_time - print(f"Adding {n} items took {elapsed_time:.2f} seconds") - self.assertLess(elapsed_time, 1, "Adding items took too long") - - start_time = time() - for key in test_keys: - _ = self.index[key] - - elapsed_time = time() - start_time - print(f"Accessing {n} items took {elapsed_time:.2f} seconds") - self.assertLess(elapsed_time, 1, "Accessing items took too long") - - start_time = time() - for key in test_keys: - del self.index[key] - - elapsed_time = time() - start_time - print(f"Deleting {n} items took {elapsed_time:.2f} seconds") - self.assertLess(elapsed_time, 1, "Deleting items took too long") - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_defined_index.py b/tests/test_defined_index.py index 07a0a09..785bab4 100644 --- a/tests/test_defined_index.py +++ b/tests/test_defined_index.py @@ -1,192 +1,98 @@ -import unittest -import random -import string -from defined_index import DefinedIndex # Assuming the class is in the defined_index.py module - -# Helper functions for generating random data - -def random_string(length=10): - return ''.join(random.choice(string.ascii_letters) for _ in range(length)) - -def random_address(): - return { - "street": random_string(), - "city": random_string(), - "state": random_string(2), - "country": random_string() - } - -def random_skill(): - return { - "name": random_string(), - "experience": random.randint(1, 10) - } - -def random_education(): - return { - "degree": random_string(), - "major": random_string(), - "institution": { - "name": random_string(), - "location": random_address() +from liteindex import DefinedIndex + +# ---------- UNIT TESTS ----------- + +schema = { + "name": "string", + "age": "number", + "password": "string", + "email": "string", + "email_verified": "boolean", + "nicknames": "json", + "address_details": "json", + "profile_picture": "blob", + "description_vector": "other", +} + +index = DefinedIndex(name="user_details", schema=schema) + +index.update( + { + "user1": { + "name": "John Doe", + "age": 25, + "password": "password123", + "email_verified": True, + "nicknames": ["John", "Johnny"], + "address_details": { + "city": "New York", + "state": "New York", + "country": "USA", + }, + "profile_picture": b"some binary data here", + }, + "user2": { + "name": "Jane Doe", + "age": 22, }, - "year": random.randint(1990, 2023) } - -def random_person(): - return { - "name": random_string(), - "age": random.randint(18, 100), - "job": { - "title": random_string(), - "department": { - "name": random_string(), - "location": random_address() +) + +index.update( + { + "user3": { + "name": "John Doe", + "age": 25, + "password": "password123", + "email_verified": True, + "nicknames": ["John", "Johnny"], + "address_details": { + "city": "New York", + "state": "New York", + "country": "USA", }, - "skills": [random_skill() for _ in range(random.randint(1, 5))] + "profile_picture": b"some binary data here aaaa bbb", + }, + "user4": { + "name": "Jane Doe", + "age": 22, }, - "address": random_address(), - "education": [random_education() for _ in range(random.randint(1, 4))] } +) +print("-->", index.list_optimized_keys()) +index.optimize_key_for_querying("name") +print("-->", index.list_optimized_keys()) -class TestDefinedIndex(unittest.TestCase): - - def setUp(self): - self.schema = { - "name": "", - "age": 0, - "job": { - "title": "", - "department": { - "name": "", - "location": { - "city": "", - "country": "" - } - }, - "skills": [ - { - "name": "", - "experience": 0 - } - ] - }, - "address": { - "street": "", - "city": "", - "state": "", - "country": "" - }, - "education": [ - { - "degree": "", - "major": "", - "institution": { - "name": "", - "location": { - "city": "", - "country": "" - } - }, - "year": 0 - } - ] - } - - self.index = DefinedIndex("test_index", schema=self.schema) - - def test_set_get_data_integrity(self): - item = random_person() - item_id = "1" - self.index.set(item_id, item) - retrieved_item = self.index.get(item_id) - del retrieved_item["id"] - del item["id"] - self.assertEqual(item, retrieved_item) - - def test_partial_item_update(self): - item = random_person() - item_id = "2" - self.index.set(item_id, item) - - partial_update = {"job": {"title": "New Title"}} - self.index.set(item_id, partial_update) - retrieved_item = self.index.get(item_id) - del retrieved_item["id"] - - expected_item = {**item, **partial_update} - del expected_item["id"] - - self.assertEqual(expected_item, retrieved_item) - - def test_search_empty_query(self): - item = random_person() - item_id = "3" - self.index.set(item_id, item) - - results = list(self.index.search({})) - - del item["id"] - del results[0][1]["id"] - self.assertEqual(len(results), 1) - self.assertEqual(results[0][1], item) - - - def test_search_exact_value(self): - item1 = random_person() - item1["age"] = 25 - self.index.set("4", item1) - - item2 = random_person() - item2["age"] = 30 - self.index.set("5", item2) - - results = list(self.index.search({"age": 25})) - self.assertEqual(len(results), 1) - del item1["id"] - del results[0][1]["id"] - self.assertEqual(results[0][1], item1) - - def test_search_nested_fields(self): - item = random_person() - item["job"]["department"]["name"] = "Engineering" - self.index.set("6", item) - - results = list(self.index.search({"job": {"department": {"name": "Engineering"}}})) - self.assertEqual(len(results), 1) - del results[0][1]["id"] - del item["id"] - self.assertEqual(results[0][1], item) - - # def test_search_multiple_values(self): - # item1 = random_person() - # item1["age"] = 25 - # self.index.set("7", item1) - - # item2 = random_person() - # item2["age"] = 30 - # self.index.set("8", item2) - - # results = list(self.index.search({"age": [25, 30]})) - # self.assertEqual(len(results), 2) - - # def test_search_range_comparison(self): - # item1 = random_person() - # item1["age"] = 25 - # self.index.set("9", item1) - - # item2 = random_person() - # item2["age"] = 30 - # self.index.set("10", item2) - - # results = list(self.index.search({"age": (">", 25)})) - # self.assertEqual(len(results), 1) - # self.assertEqual(results[0][1], item2) - - def test_search_empty_input(self): - results = list(self.index.search(None)) - self.assertEqual(len(results), 0) - -if __name__ == '__main__': - unittest.main() +print( + "---->", + index.math("age", "sum", query={"age": {"$gt": 20}}), + index.math("age", "sum", query={"age": {"$gt": 60}}), +) + +# print(index.get("user1", "user2", "user3")) + +print(index.search(query={"age": {"$gt": 20}})) + +print("Get:", index.get(["user1", "user2", "user3", "user4"])) + +print(index.distinct(key="name", query={"age": {"$gt": 20}})) + +print(index.group(keys="name", query={"age": {"$gt": 20}})) + +print(index.count(query={"age": {"$gt": 20}}), index.count()) + +index.delete(ids=["user1", "user2"]) + +print(index.group(keys="name", query={"age": {"$gt": 20}})) + +print(index.count(query={"age": {"$gt": 20}})) + +index.clear() + +print(index.count(query={"age": {"$gt": 20}}), index.count()) + +index.drop() + +# should throw exception +print(index.count(query={"age": {"$gt": 20}})) diff --git a/tests/test_di.py b/tests/test_di.py deleted file mode 100644 index c7606fb..0000000 --- a/tests/test_di.py +++ /dev/null @@ -1,98 +0,0 @@ -import unittest -import random -import string -from typing import Optional, Dict, List, Any -from defined_index import DefinedIndex - -schema = { - "name": "str", - "age": 2, - "height": 2, - "is_active": True, - "address": { - "street": "str", - "city": "str", - "state": "str", - "zip_code": 2, - "country": "str", - }, - "skills": [{"name": "str", "level": 2, "tags": ["str"],}], - "metadata": { - "created_at": "str", - "updated_at": "str", - "notes": {"note1": "str", "note2": "str",}, - }, -} - - -class TestDefinedIndex(unittest.TestCase): - def random_string(self, length: int = 10): - return "".join(random.choices(string.ascii_letters, k=length)) - - def random_address(self): - return { - "street": self.random_string(), - "city": self.random_string(), - "state": self.random_string(2), - "zip_code": random.randint(10000, 99999), - "country": self.random_string(), - } - - def random_skill(self): - return { - "name": self.random_string(), - "level": random.randint(1, 10), - "tags": [self.random_string() for _ in range(3)], - } - - def random_item(self): - return { - "name": self.random_string(), - "age": random.randint(18, 99), - "height": random.uniform(4.0, 7.0), - "is_active": random.choice([True, False]), - "address": self.random_address(), - "skills": [self.random_skill() for _ in range(3)], - "metadata": { - "created_at": self.random_string(), - "updated_at": self.random_string(), - "notes": { - "note1": self.random_string(), - "note2": self.random_string(), - }, - }, - } - - def test_set_get(self): - index = DefinedIndex("test_index", schema=schema) - item = self.random_item() - item_id = self.random_string() - - index.set(item_id, item) - - retrieved_item = index.get(item_id) - self.assertEqual(item, retrieved_item) - - def test_search(self): - index = DefinedIndex("test_index", schema=schema) - - # Insert multiple items - for _ in range(10): - item_id = self.random_string() - item = self.random_item() - index.set(item_id, item) - - # Search items based on a query - query = {"is_active": True} - results = list(index.search(query)) - - # Verify that search results match the query - for result in results: - _, item = result - self.assertTrue(item["is_active"]) - - # Add tests for count, sum, and average methods - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_multiprocessing_script.py b/tests/test_multiprocessing_script.py deleted file mode 100644 index d659179..0000000 --- a/tests/test_multiprocessing_script.py +++ /dev/null @@ -1,32 +0,0 @@ -from number_index import NumberIndex -from multiprocessing import Pool - -test_index = NumberIndex("test_number_index", "test_database.sqlite") - -from diskcache import Index - -test_index_diskcache = Index("test_database_diskcache") - - -def f(x): - test_index[f"{x}"] = x - - -def diskcache_f(x): - test_index_diskcache[f"{x}"] = x - - -if __name__ == "__main__": - pool = Pool(16) - - from time import time - - start_time = time() - pool.map(f, range(10000)) - end_time = time() - print(f"Set 10000 items: {end_time - start_time:.2f} seconds") - - start_time = time() - pool.map(diskcache_f, range(10000)) - end_time = time() - print(f"DiskCache Set 10000 items: {end_time - start_time:.2f} seconds") diff --git a/tests/test_number_index.py b/tests/test_number_index.py deleted file mode 100644 index baa35d6..0000000 --- a/tests/test_number_index.py +++ /dev/null @@ -1,126 +0,0 @@ -import unittest -import os -from random import randint, choice -from string import ascii_lowercase -from time import time -from concurrent.futures import ThreadPoolExecutor -import diskcache -import shutil - -import sys - -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from liteindex import NumberIndex - - -class TestNumberIndex(unittest.TestCase): - def setUp(self): - self.db_path = "test_database.sqlite" - self.index_name = "test_index" - self.index = NumberIndex(self.index_name, self.db_path) - - def tearDown(self): - os.remove(self.db_path) - os.remove(path=f"{self.db_path}-wal") - os.remove(path=f"{self.db_path}-shm") - - def test_basic_operations(self): - self.index["one"] = 1 - self.index["two"] = 2.0 - self.index["three"] = 3 - - self.assertEqual(self.index["one"], 1) - self.assertEqual(self.index["two"], 2.0) - self.assertEqual(self.index["three"], 3) - - self.index["one"] = 1.1 - self.assertEqual(self.index["one"], 1.1) - - del self.index["one"] - self.assertIsNone(self.index.get("one")) - - self.index.clear() - self.assertEqual(len(self.index), 0) - - def generate_random_key(self): - return "".join(choice(ascii_lowercase) for _ in range(10)) - - def test_performance(self): - num_items = 10000 - - # Test batch_set performance - items = { - self.generate_random_key(): randint(1, num_items) for _ in range(num_items) - } - - start_time = time() - self.index.update(items) - end_time = time() - print(f"Batch set {num_items} items: {end_time - start_time:.2f} seconds") - - # Test single set performance - items = { - self.generate_random_key(): randint(1, num_items) for _ in range(num_items) - } - start_time = time() - for key, value in items.items(): - self.index[key] = value - end_time = time() - print(f"Set {num_items} items: {end_time - start_time:.2f} seconds") - - # Test retrieval performance - start_time = time() - for key in items.keys(): - _ = self.index[key] - end_time = time() - print(f"Retrieve {num_items} items: {end_time - start_time:.2f} seconds") - - def concurrent_write(self, key, value): - index = NumberIndex(self.index_name, self.db_path) - index[key] = value - - def test_concurrency(self): - num_items = 1000 - items = { - self.generate_random_key(): randint(1, num_items) for _ in range(num_items) - } - - with ThreadPoolExecutor() as executor: - for key, value in items.items(): - executor.submit(self.concurrent_write, key, value) - - for key, value in items.items(): - self.assertEqual(self.index[key], value) - - def test_diskcache_performance(self): - cache_path = "test_diskcache" - cache = diskcache.Index(cache_path) - num_items = 10000 - - # Test set performance - start_time = time() - items = { - self.generate_random_key(): randint(1, num_items) for _ in range(num_items) - } - for key, value in items.items(): - cache[key] = value - end_time = time() - print( - f"Set {num_items} items in diskcache: {end_time - start_time:.2f} seconds" - ) - - # Test retrieval performance - start_time = time() - for key in items.keys(): - _ = cache[key] - end_time = time() - print( - f"Retrieve {num_items} items from diskcache: {end_time - start_time:.2f} seconds" - ) - - shutil.rmtree(cache_path) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_query_parser.py b/tests/test_query_parser.py deleted file mode 100644 index af91506..0000000 --- a/tests/test_query_parser.py +++ /dev/null @@ -1,63 +0,0 @@ -from defined_index import DefinedIndex - -m = DefinedIndex("a", {"a": 10}) -_process_query_conditions = m._process_query_conditions - -def test_process_query_conditions(): - # Test simple equality query - where_conditions, params = _process_query_conditions(query={"age": 30}) - assert where_conditions == ["age = ?"] - assert params == [30] - - # Test simple range query - where_conditions, params = _process_query_conditions(query={"age": (30, 40)}) - assert where_conditions == ["age >= ?", "age <= ?"] - assert params == [30, 40] - - # Test simple inclusion query - where_conditions, params = _process_query_conditions(query={"age": [11, 14, 16]}) - assert where_conditions == ["age IN (?, ?, ?)"] - assert params == [11, 14, 16] - - # Test nested JSON range query - where_conditions, params = _process_query_conditions(query={"birth_day": {"year": (None, 2019)}}) - assert where_conditions == ["json_extract(birth_day, '$.year') <= ?"] - assert params == [2019] - - # Test nested JSON inclusion query - where_conditions, params = _process_query_conditions(query={"birth_day": {"year": [2017, 2018, 2019]}}) - assert where_conditions == ["json_extract(birth_day, '$.year') IN (?, ?, ?)"] - assert params == [2017, 2018, 2019] - - # Test multiple top-level conditions - where_conditions, params = _process_query_conditions(query={"age": 30, "city": "New York"}) - assert set(where_conditions) == {"age = ?", "city = ?"} - assert set(params) == {30, "New York"} - - # Test deeply nested JSON range query - where_conditions, params = _process_query_conditions(query={"person": {"birth_day": {"year": (None, 2019)}}}) - assert where_conditions == ["json_extract(person, '$.birth_day.year') <= ?"] - assert params == [2019] - - # Test deeply nested JSON inclusion query - where_conditions, params = _process_query_conditions(query={"person": {"birth_day": {"year": [2017, 2018, 2019]}}}) - assert where_conditions == ["json_extract(person, '$.birth_day.year') IN (?, ?, ?)"] - assert params == [2017, 2018, 2019] - - # Test a mix of different query types - where_conditions, params = _process_query_conditions(query={ - "age": (30, 40), - "city": "New York", - "birth_day": {"year": (None, 2019)}, - "gender": ["male", "female"] - }) - assert set(where_conditions) == { - "age >= ?", - "age <= ?", - "city = ?", - "json_extract(birth_day, '$.year') <= ?", - "gender IN (?, ?)" - } - assert set(params) == {30, 40, "New York", 2019, "male", "female"} - -test_process_query_conditions()