diff --git a/.gitignore b/.gitignore index 68bc17f..bae21e5 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +*.c +*.cpp diff --git a/LICENSE b/LICENSE index 261eeb9..b2b9241 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2024 Justin Joyce Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/draken/__version__.py b/draken/__version__.py index e4554ea..8e51970 100644 --- a/draken/__version__.py +++ b/draken/__version__.py @@ -1,4 +1,4 @@ -__build__ = 3 +__build__ = 4 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/draken/compiled/__init__.py b/draken/compiled/__init__.py new file mode 100644 index 0000000..6e244a9 --- /dev/null +++ b/draken/compiled/__init__.py @@ -0,0 +1,2 @@ +from .binary_search import StringBinaryIndex +from .hadro import create_sstable diff --git a/draken/compiled/binary_search.pyx b/draken/compiled/binary_search.pyx new file mode 100644 index 0000000..e9e2e33 --- /dev/null +++ b/draken/compiled/binary_search.pyx @@ -0,0 +1,78 @@ +# cython: language_level=3 + +from typing import List, Tuple, Dict +from libc.stdint cimport int32_t, int64_t + +cdef class StringBinaryIndex: + cdef list key_store + cdef list value_store + cdef dict key_to_pointer + + def __init__(self): + self.key_store = [] + self.value_store = [] + self.key_to_pointer = {} + + def add_entry(self, key: str, filename: str, offset: int): + # Add individual (filename, offset) tuples for the given key + if key not in self.key_to_pointer: + self.key_to_pointer[key] = len(self.value_store) + self.key_store.append((len(key.encode('utf-8')), key.encode('utf-8'), self.key_to_pointer[key])) + self.value_store.append([]) # Initialize empty list for new key + + # Add value to the corresponding value list + pointer = self.key_to_pointer[key] + self.value_store[pointer].append((len(filename.encode('utf-8')), filename.encode('utf-8'), offset)) + + def finalize_index(self): + # Sort key_store by keys for binary search + self.key_store.sort() + + def lookup_eq(self, key: str) -> List[Tuple[str, int]]: + # Perform binary search on key_store + pointer = self.key_to_pointer.get(key) + if pointer is None: + return [] + + value_data = self.value_store[pointer] + return [(filename.decode('utf-8'), offset) for _, filename, offset in value_data] + + def lookup_in_list(self, keys: List[str]) -> Dict[str, List[Tuple[str, int]]]: + result = {} + for key in keys: + result[key] = self.lookup_eq(key) + return result + + def lookup_range(self, start_key: str, end_key: str) -> Dict[str, List[Tuple[str, int]]]: + result = {} + start_index = self._binary_search(start_key, find_start=True) + end_index = self._binary_search(end_key, find_start=False) + + for index in range(start_index, end_index + 1): + key_len, key_bytes, pointer = self.key_store[index] + key = key_bytes.decode('utf-8') + result[key] = self.lookup_eq(key) + + return result + + def _binary_search(self, key: str, find_start: bool) -> int: + # Implement binary search on the sorted key_store + key_bytes = key.encode('utf-8') + low, high = 0, len(self.key_store) - 1 + while low <= high: + mid = (low + high) // 2 + mid_key_bytes = self.key_store[mid][1] + if mid_key_bytes < key_bytes: + low = mid + 1 + elif mid_key_bytes > key_bytes: + high = mid - 1 + else: + if find_start: + if mid == 0 or self.key_store[mid - 1][1] != key_bytes: + return mid + high = mid - 1 + else: + if mid == len(self.key_store) - 1 or self.key_store[mid + 1][1] != key_bytes: + return mid + low = mid + 1 + return low if find_start else high diff --git a/draken/compiled/bloom_filter.pyx b/draken/compiled/bloom_filter.pyx new file mode 100644 index 0000000..725ca48 --- /dev/null +++ b/draken/compiled/bloom_filter.pyx @@ -0,0 +1,56 @@ +# cython: language_level=3 +# cython: boundscheck=False +# cython: wraparound=False +""" +This is not a general perpose Bloom Filter, if used outside of Draken, it may not +perform entirely as expected. +""" + +from libc.stdlib cimport malloc, free +from libc.string cimport memset, memcpy + +# Define constants for the fixed size +BIT_ARRAY_SIZE = 64 * 1024 # 64 KB = 512 Kbits +BYTE_ARRAY_SIZE = BIT_ARRAY_SIZE // 8 + +cdef class BloomFilter: + cdef unsigned char* bit_array + + def __cinit__(self): + # Allocate memory for the bit array and initialize to 0 + self.bit_array = malloc(BYTE_ARRAY_SIZE) + if not self.bit_array: + raise MemoryError("Failed to allocate memory for the bit array.") + memset(self.bit_array, 0, BYTE_ARRAY_SIZE) + + def __dealloc__(self): + if self.bit_array: + free(self.bit_array) + + cpdef void add(self, long item): + """Add an item to the Bloom filter""" + h1 = item % BIT_ARRAY_SIZE + # Apply the golden ratio to the item and use modulo to wrap within the size of the bit array + h2 = (item * 1.618033988749895) % BIT_ARRAY_SIZE + # Set bits using bitwise OR + self.bit_array[h1 // 8] |= 1 << (h1 % 8) + self.bit_array[h2 // 8] |= 1 << (h2 % 8) + + cpdef int possibly_contains(self, long item): + """Check if the item might be in the set""" + h1 = item % BIT_ARRAY_SIZE + # Apply the golden ratio to the item and use modulo to wrap within the size of the bit array + h2 = (item * 1.618033988749895) % BIT_ARRAY_SIZE + # Check bits using bitwise AND + return (self.bit_array[h1 // 8] & (1 << (h1 % 8))) and \ + (self.bit_array[h2 // 8] & (1 << (h2 % 8))) + + cpdef memoryview serialize(self): + """Serialize the Bloom filter to a memory view""" + return memoryview(self.bit_array[:BYTE_ARRAY_SIZE]) + +cpdef BloomFilter deserialize(const unsigned char* data): + """Deserialize a memory view to a Bloom filter""" + bf = BloomFilter() + memcpy(bf.bit_array, data, BYTE_ARRAY_SIZE) + return bf diff --git a/draken/compiled/hadro.pyx b/draken/compiled/hadro.pyx new file mode 100644 index 0000000..e53775b --- /dev/null +++ b/draken/compiled/hadro.pyx @@ -0,0 +1,276 @@ +# cython: language_level=3 + +# Import necessary modules +from libc.stdint cimport uint8_t +from libc.stdint cimport uint32_t +from libc.stdint cimport uint64_t + +from cpython.mem cimport PyMem_Malloc, PyMem_Free +import struct +import zlib +import time +import zstd +import ormsgpack +from hashlib import sha256 +from libc.stdlib cimport malloc, free +from libc.string cimport memset + +# Define Header class +cdef class Header: + cdef public uint32_t version + cdef public uint64_t creation_timestamp + cdef public uint32_t bloom_filter_offset + cdef public uint32_t term_dictionary_offset + cdef public uint32_t postings_list_offset + cdef public uint32_t data_block_offset + cdef public uint32_t num_terms + cdef public uint8_t endianness + cdef public uint32_t checksum + + def __init__(self, version: uint32_t, creation_timestamp: uint64_t, bloom_filter_offset: uint32_t, + term_dictionary_offset: uint32_t, postings_list_offset: uint32_t, data_block_offset: uint3232_t, + num_terms: uint32_t, endianness: uint8_t): + self.version = version + self.creation_timestamp = creation_timestamp + self.bloom_filter_offset = bloom_filter_offset + self.term_dictionary_offset = term_dictionary_offset + self.postings_list_offset = postings_list_offset + self.data_block_offset = data_block_offset + self.num_terms = num_terms + self.endianness = endianness + self.checksum = 0 # to be computed later + + def serialize(self) -> bytes: + return struct.pack(' uint32_t: + return 37 + +# Define BloomFilter class +cdef class BloomFilter: + cdef uint32_t size + cdef uint8_t* bit_array + cdef list hash_functions + + def __init__(self, size: int, hash_functions: list): + self.size = size + self.hash_functions = hash_functions + self.bit_array = malloc(size * sizeof(uint8_t)) + memset(self.bit_array, 0, size * sizeof(uint8_t)) + + def add(self, key: bytes): + for func in self.hash_functions: + index = func(key) % self.size + self.bit_array[index] = 1 + + def might_contain(self, key: bytes) -> bool: + for func in self.hash_functions: + index = func(key) % self.size + if self.bit_array[index] == 0: + return False + return True + + def serialize(self) -> bytes: + return bytes(self.bit_array[:self.size]) + + def __dealloc__(self): + free(self.bit_array) + +# Define TermEntry class +cdef class TermEntry: + cdef uint32_t term_length + cdef bytes term + cdef uint32_t postings_offset + + def __init__(self, term: bytes, postings_offset: uint32_t): + self.term_length = min(len(term), 64) # cap term length to 64 characters + self.term = term[:64] + self.postings_offset = postings_offset + + def serialize(self) -> bytes: + return struct.pack(' bytes: + pos_bytes = b''.join([struct.pack(' bytes: + return struct.pack(' bytes: + def hash_function_1(key: bytes) -> int: + return int.from_bytes(sha256(key).digest(), byteorder='little') + + def hash_function_2(key: bytes) -> int: + return int.from_bytes(sha256(key[::-1]).digest(), byteorder='little') + + # Define header + creation_timestamp = int(time.time()) + header = Header(version=1, creation_timestamp=creation_timestamp, bloom_filter_offset=0, + term_dictionary_offset=0, postings_list_offset=0, data_block_offset=0, + num_terms=len(data), endianness=0) + + # Create Bloom Filter + bloom_filter = BloomFilter(size=524288, hash_functions=[hash_function_1, hash_function_2]) + + # Collect serialized term dictionary and postings list + term_entries = [] + postings_entries = [] + current_postings_offset = 0 + current_data_offset = 0 + data_block_bytes = b'' + + for term, postings in data.items(): + term_bytes = term.encode('utf-8') + + # Add to Bloom filter + bloom_filter.add(term_bytes) + + # Create Postings Entry + postings_entry = PostingsEntry(postings['doc_id'], postings['frequency'], postings['positions']) + postings_entries.append(postings_entry) + + # Create Term Entry + term_entry = TermEntry(term_bytes, current_postings_offset) + term_entries.append(term_entry) + + # Update postings offset + current_postings_offset += len(postings_entry.serialize()) + + # Serialize data block + value_bytes = ormsgpack.packb(postings) + data_entry = DataEntry(value_bytes) + data_block_bytes += data_entry.serialize() + current_data_offset += len(data_entry.serialize()) + + # Serialize blocks + bloom_filter_bytes = bloom_filter.serialize() + term_dictionary_bytes = b''.join([entry.serialize() for entry in term_entries]) + postings_list_bytes = b''.join([entry.serialize() for entry in postings_entries]) + + # Set offsets in header + header.bloom_filter_offset = len(header.serialize()) + header.term_dictionary_offset = header.bloom_filter_offset + len(bloom_filter_bytes) + header.postings_list_offset = header.term_dictionary_offset + len(term_dictionary_bytes) + header.data_block_offset = header.postings_list_offset + len(postings_list_bytes) + + # Compute checksum for the header + header.compute_checksum() + + # Construct SSTable + sstable_bytes = header.serialize() + bloom_filter_bytes + term_dictionary_bytes + postings_list_bytes + data_block_bytes + return sstable_bytes + +# Define the SSTable lookup functions +def lookup_eq(bytes sstable, str key) -> dict: + header = Header.deserialize(sstable[:Header.size()]) + bloom_filter = BloomFilter.deserialize(sstable[header.bloom_filter_offset:header.term_dictionary_offset]) + + key_bytes = key.encode('utf-8') + + if not bloom_filter.might_contain(key_bytes): + return None + + # Binary search in term dictionary block + term_dictionary = sstable[header.term_dictionary_offset:header.postings_list_offset] + low, high = 0, header.num_terms - 1 + + while low <= high: + mid = (low + high) // 2 + mid_term_entry = TermEntry.deserialize(term_dictionary[mid * TermEntry.size():(mid + 1) * TermEntry.size()]) + + if mid_term_entry.term == key_bytes: + postings_offset = mid_term_entry.postings_offset + postings_entry = PostingsEntry.deserialize(sstable[header.postings_list_offset + postings_offset:]) + return { + 'doc_id': postings_entry.doc_id, + 'frequency': postings_entry.frequency, + 'positions': postings_entry.positions + } + elif mid_term_entry.term < key_bytes: + low = mid + 1 + else: + high = mid - 1 + + return None + +def lookup_in_list(bytes sstable, list keys) -> dict: + result = {} + for key in keys: + value = lookup_eq(sstable, key) + if value is not None: + result[key] = value + return result + +def lookup_range(bytes sstable, str key, str comparison) -> list: + header = Header.deserialize(sstable[:Header.size()]) + term_dictionary = sstable[header.term_dictionary_offset:header.postings_list_offset] + + key_bytes = key.encode('utf-8') + result = [] + + if comparison == 'GT': + for i in range(header.num_terms): + entry = TermEntry.deserialize(term_dictionary[i * TermEntry.size():(i + 1) * TermEntry.size()]) + if entry.term > key_bytes: + postings_offset = entry.postings_offset + postings_entry = PostingsEntry.deserialize(sstable[header.postings_list_offset + postings_offset:]) + result.append({ + 'doc_id': postings_entry.doc_id, + 'frequency': postings_entry.frequency, + 'positions': postings_entry.positions + }) + elif comparison == 'LT': + for i in range(header.num_terms): + entry = TermEntry.deserialize(term_dictionary[i * TermEntry.size():(i + 1) * TermEntry.size()]) + if entry.term < key_bytes: + postings_offset = entry.postings_offset + postings_entry = PostingsEntry.deserialize(sstable[header.postings_list_offset + postings_offset:]) + result.append({ + 'doc_id': postings_entry.doc_id, + 'frequency': postings_entry.frequency, + 'positions': postings_entry.positions + }) + + return result diff --git a/draken/compiled/memtable.pyx b/draken/compiled/memtable.pyx new file mode 100644 index 0000000..393fd2e --- /dev/null +++ b/draken/compiled/memtable.pyx @@ -0,0 +1,30 @@ +# distutils: language=c++ +# cython: language_level=3 + +from libcpp.map cimport map +from libcpp.string cimport string +from libc.stdint cimport int32_t +from libc.string cimport memcpy +from cpython.bytes cimport PyBytes_AsString # Import the helper to get C char* from Python bytes + +cdef class _MemTable: + cdef map[int32_t, string] cpp_map + + def __init__(self): + self.cpp_map = map[int32_t, string]() + + def add(self, int key, bytes value): + # Convert Python bytes to std::string + cdef string cpp_value = string() + cpp_value.resize(len(value)) # Ensure cpp_value is large enough to hold the bytes + # Use PyBytes_AsString to safely access the bytes buffer + memcpy(&cpp_value[0], PyBytes_AsString(value), len(value)) + self.cpp_map[key] = cpp_value + + def get(self, int key): + # Retrieve std::string as Python bytes + cdef string cpp_value = self.cpp_map[key] + return cpp_value.data()[:cpp_value.size()] + + def remove(self, int key): + self.cpp_map.erase(key) diff --git a/draken/compiled/murmur_hash3.pyx b/draken/compiled/murmur_hash3.pyx new file mode 100644 index 0000000..6ccf273 --- /dev/null +++ b/draken/compiled/murmur_hash3.pyx @@ -0,0 +1,51 @@ +# MurmurHash3 implementation for better performance +cdef inline uint32_t murmurhash3_32(const void *key, int len, uint32_t seed): + cdef uint32_t c1 = 0xcc9e2d51 + cdef uint32_t c2 = 0x1b873593 + cdef uint32_t r1 = 15 + cdef uint32_t r2 = 13 + cdef uint32_t m = 5 + cdef uint32_t n = 0xe6546b64 + + cdef const unsigned char *data = key + cdef const int nblocks = len // 4 + cdef uint32_t h1 = seed + cdef uint32_t k1 = 0 + + # body + cdef const uint32_t *blocks = data + for i in range(nblocks): + k1 = blocks[i] + + k1 *= c1 + k1 = (k1 << r1) | (k1 >> (32 - r1)) + k1 *= c2 + + h1 ^= k1 + h1 = (h1 << r2) | (h1 >> (32 - r2)) + h1 = h1 * m + n + + # tail + cdef const unsigned char *tail = (data + nblocks * 4) + cdef uint32_t k1_ = 0 + + if len & 3 == 3: + k1_ ^= tail[2] << 16 + if len & 3 >= 2: + k1_ ^= tail[1] << 8 + if len & 3 >= 1: + k1_ ^= tail[0] + k1_ *= c1 + k1_ = (k1_ << r1) | (k1_ >> (32 - r1)) + k1_ *= c2 + h1 ^= k1_ + + # finalization + h1 ^= len + h1 ^= (h1 >> 16) + h1 *= 0x85ebca6b + h1 ^= (h1 >> 13) + h1 *= 0xc2b2ae35 + h1 ^= (h1 >> 16) + + return h1 \ No newline at end of file diff --git a/draken/indexes/__init__.py b/draken/indexes/__init__.py new file mode 100644 index 0000000..586e8a2 --- /dev/null +++ b/draken/indexes/__init__.py @@ -0,0 +1,7 @@ +""" +With a relatively simple LSM tree where we have the records of + +(index_value, version, [(blob, row), (blob, row)]) + + +""" diff --git a/draken/indexes/global_index/__init__.py b/draken/indexes/global_index/__init__.py new file mode 100644 index 0000000..87e7ec9 --- /dev/null +++ b/draken/indexes/global_index/__init__.py @@ -0,0 +1,23 @@ +""" +we should try to return something like: +[ + { + "dataset": "dataset1", + "record": { + "id": 1, + "cve": "CVE-2021-34527", + "description": "Example vulnerability" + } + }, + { + "dataset": "dataset2", + "record": { + "id": 1, + "cve_id": "CVE-2021-34527", + "info": "Some details about the CVE" + } + } +] + + +""" diff --git a/draken/routes/__init__.py b/draken/indexes/key_index/__init__.py similarity index 100% rename from draken/routes/__init__.py rename to draken/indexes/key_index/__init__.py diff --git a/draken/lamport/__init__.py b/draken/lamport/__init__.py new file mode 100644 index 0000000..1cb20cb --- /dev/null +++ b/draken/lamport/__init__.py @@ -0,0 +1 @@ +from hadro.lamport.simple_lamport_provider import SimpleLamportProvider diff --git a/draken/lamport/firestore_lamport_provider.py b/draken/lamport/firestore_lamport_provider.py new file mode 100644 index 0000000..7bde277 --- /dev/null +++ b/draken/lamport/firestore_lamport_provider.py @@ -0,0 +1,47 @@ +from .simple_lamport_provider import SimpleLamportProvider + + +class FirestoreLamportProvider(SimpleLamportProvider): + def __init__(self, collection_id, document_id): + + try: + import firebase_admin + from firebase_admin import credentials + from firebase_admin import firestore + except ImportError as err: # pragma: no cover + from hadro.exceptions import MissingDependencyError + + raise MissingDependencyError(err.name) from err + + # Initialize Firebase admin SDK + cred = credentials.ApplicationDefault() + firebase_admin.initialize_app( + cred, + { + "projectId": "your-project-id", + }, + ) + + self.db = firestore.client() + self.collection_id = collection_id + self.document_id = document_id + + def get_and_increment_counter(self): + """Atomically retrieve and increment the Lamport counter stored in Firestore.""" + doc_ref = self.db.collection(self.collection_id).document(self.document_id) + + # Transaction to increment the counter in Firestore atomically + @firestore.transactional + def update_in_transaction(transaction): + snapshot = doc_ref.get(transaction=transaction) + current_value = snapshot.get("counter") if snapshot.exists else 0 + new_value = current_value + 1 + transaction.update(doc_ref, {"counter": new_value}) + return current_value + + return update_in_transaction(self.db.transaction()) + + +# Usage in your application (specifying Firestore collection and document) +lamport_clock = FirestoreLamportProvider("lamport_counters", "my_counter") +print(lamport_clock.get_and_increment_counter()) diff --git a/draken/lamport/simple_lamport_provider.py b/draken/lamport/simple_lamport_provider.py new file mode 100644 index 0000000..51ecb0a --- /dev/null +++ b/draken/lamport/simple_lamport_provider.py @@ -0,0 +1,25 @@ +class SimpleLamportProvider: + """ + A simple Lamport clock provider for managing a logical clock in a single-threaded or isolated + testing environment. This class simulates the basic behavior of a Lamport clock, which is + used to order events in a distributed system without relying on synchronized physical clocks. + + Methods: + get_and_increment_counter: Returns the current Lamport counter value and increments it for + the next use. + """ + + def __init__(self): + """Initialize the Lamport clock with a counter starting at zero.""" + self._counter = 0 + + def get_and_increment_counter(self): + """ + Retrieve the current Lamport counter value and increment it. + + Returns: + int: The current value of the Lamport counter before it is incremented. + """ + pre_update = self._counter + self._counter += 1 + return pre_update diff --git a/draken/memtable/__init__.py b/draken/memtable/__init__.py new file mode 100644 index 0000000..b815ca5 --- /dev/null +++ b/draken/memtable/__init__.py @@ -0,0 +1 @@ +from .memtable import MemTable diff --git a/draken/memtable/memtable.py b/draken/memtable/memtable.py new file mode 100644 index 0000000..4d60346 --- /dev/null +++ b/draken/memtable/memtable.py @@ -0,0 +1,125 @@ +""" + + +Each record is stored in a hashtable, where each record is a tuple of (Primary Key, Version, Bytes) + + +""" + +import datetime +import time +from typing import Any +from typing import Dict +from typing import Optional +from typing import Tuple + +import numpy +import ormsgpack +from hadro.exceptions import MaximumRecordsExceeded +from orso.schema import RelationSchema +from orso.tools import monitor + + +class MemTable: + """ + In-memory table structure designed to store records with versioning, acting as a Write-Ahead Log (WAL). + This class manages an in-memory buffer of records, indexed by a primary key and supports automatic flushing + of the buffer based on a maximum record count. + + Attributes: + buffer (Dict[Any, Tuple[int, bytes]]): A dictionary that maps primary keys to tuples containing a timestamp + (in nanoseconds) and the serialized bytes of the record. + buffer_size (int): Total size of all records currently stored in the buffer in bytes. + max_records (int): Maximum number of records the buffer can hold before triggering an automatic flush. + pk_field_name (str): The name of the field that acts as the primary key within the records. + schema (RelationSchema): Schema definition including the primary key and columns. + column_names (Tuple[str]): Sorted tuple of column names derived from the schema for consistent record serialization. + """ + + def __init__(self, schema: RelationSchema, max_records: int = 50000): + """ + Initializes the MemoryTable with an empty buffer and specified configurations based on the provided schema. + + Parameters: + schema (RelationSchema): The schema defining the structure of records and the primary key. + max_records (int, optional): The maximum number of records to store before flushing to durable storage. + Defaults to 10,000. + """ + self.buffer = {} + self.buffer_size = 0 + self.max_records = max_records + self.pk_field_name = schema.primary_key + self.schema: RelationSchema = schema + self.column_names = tuple(sorted(schema.column_names)) + + def append(self, record: Dict): + """ + Appends a record to the MemoryTable. If the primary key of the record is already present, the existing + record is overwritten. The function automatically triggers a flush if the number of records exceeds + the configured maximum. + + Parameters: + record (Dict): A dictionary representing the record to append, must include the primary key. + + Raises: + ValueError: If the primary key is missing from the record. + """ + + def serialize(value): + if isinstance(value, numpy.datetime64): + if numpy.isnat(value): + return None + return ("__datetime__", value.astype("datetime64[s]").astype("int")) + if isinstance(value, datetime.datetime): + return ("__datetime__", value.timestamp()) + if isinstance(value, numpy.ndarray): + return list(value) + return str(value) + + if hasattr(record, "as_dict"): + record = record.as_dict + primary_key = record.get(self.pk_field_name) + if primary_key is None: + raise ValueError("Primary Key cannot be missing or have None value") + + serialized_record = tuple(record.get(field) for field in self.column_names) + record_bytes = ormsgpack.packb( + serialized_record, option=ormsgpack.OPT_SERIALIZE_NUMPY, default=serialize + ) + + # Adjust buffer size for overwritten records + if primary_key in self.buffer: + self.buffer_size -= len(self.buffer[primary_key][1]) + self.buffer[primary_key] = (time.time_ns(), record_bytes) + self.buffer_size += len(record_bytes) + + # Trigger flush if needed + if len(self.buffer) >= self.max_records: + raise MaximumRecordsExceeded(self) + + @monitor() + def flush(self): + """ + Flushes the current in-memory buffer to durable storage. This operation clears the buffer and resets + the buffer size counter. + """ + from hadro.serde import commit_sstable + + commit_sstable(memory_table=self, location=f"data/{hex(time.time_ns())}.hadro") + self.buffer.clear() + self.buffer_size = 0 + + def _get(self, pk: Any) -> Optional[Tuple[int, bytes]]: + """ + Retrieves the record associated with the given primary key. + + Parameters: + pk (Any): The primary key of the record to retrieve. + + Returns: + Optional[Tuple[int, bytes]]: The tuple containing the timestamp and record bytes, or None if not found. + """ + return self.buffer.get(pk) + + def __repr__(self): + return f"" diff --git a/draken/v1/__init__.py b/draken/v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/draken/v1/models.py b/draken/v1/models.py new file mode 100644 index 0000000..e69de29 diff --git a/draken/routes/v1.0.py b/draken/v1/routes.py similarity index 59% rename from draken/routes/v1.0.py rename to draken/v1/routes.py index 6497660..199dde4 100644 --- a/draken/routes/v1.0.py +++ b/draken/v1/routes.py @@ -1,13 +1,19 @@ -from fastapi import FastAPI, HTTPException from dataclasses import dataclass -from typing import List, Dict +from typing import List + +from fastapi import FastAPI +from fastapi import HTTPException +from fastapi.encoders import jsonable_encoder +from fastapi.responses import JSONResponse app = FastAPI() + @dataclass -class FileList(): +class FileList: files: List[str] + # Example index manager class (simplified for illustration) class IndexManager: def __init__(self): @@ -39,48 +45,88 @@ def rebuild_index(self, table: str, column: str, files: List[str] = None): key = (table, column) if key not in self.indexes: raise ValueError("Index not found") - self.indexes[key] = files if files else [] # Simplified; replace with actual rebuilding logic + self.indexes[key] = ( + files if files else [] + ) # Simplified; replace with actual rebuilding logic + index_manager = IndexManager() -@app.post("/index/{table}/{column}") + +@app.post("/v1/indexes/{table}/{column}") async def create_index(table: str, column: str, files: FileList, hint: str = None): try: index_manager.create_index(table, column, files.files, hint) - return {"message": "Index created", "table": table, "column": column, "files": files.files} + return JSONResponse( + content={ + "message": "Index created", + "table": table, + "column": column, + "files": files.files, + } + ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) -@app.patch("/index/{table}/{column}") + +@app.patch("/v1/indexes/{table}/{column}/files") async def add_files_to_index(table: str, column: str, files: FileList): try: index_manager.add_files_to_index(table, column, files.files) - return {"message": "Files added to index", "table": table, "column": column, "files": files.files} + return JSONResponse( + content={ + "message": "Files added to index", + "table": table, + "column": column, + "files": files.files, + } + ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) -@app.delete("/index/{table}/{column}") + +@app.delete("/v1/indexes/{table}/{column}") async def delete_index(table: str, column: str): try: index_manager.delete_index(table, column) - return {"message": "Index deleted", "table": table, "column": column} + return JSONResponse(content={"message": "Index deleted", "table": table, "column": column}) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) -@app.get("/index/{table}") + +@app.get("/v1/indexes/{table}") async def get_indexes(table: str): indexes = index_manager.get_indexes(table) - return {"table": table, "indexes": indexes} + return JSONResponse(content={"table": table, "indexes": indexes}) + + +@app.get("/v1/indexes/{table}/{column}") +async def get_index(table: str, column: str): + indexes = index_manager.get_indexes(table) + index = indexes.get(column) + if not index: + raise HTTPException(status_code=404, detail="Index not found") + return JSONResponse(content={"table": table, "column": column, "index": index}) -@app.post("/index/{table}/{column}/rebuild") + +@app.post("/v1/indexes/{table}/{column}/rebuild") async def rebuild_index(table: str, column: str, files: FileList = None): try: index_manager.rebuild_index(table, column, files.files if files else None) - return {"message": "Index rebuilt", "table": table, "column": column, "files": files.files if files else []} + return JSONResponse( + content={ + "message": "Index rebuilt", + "table": table, + "column": column, + "files": files.files if files else [], + } + ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) + # Run the application if __name__ == "__main__": import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4d22e5a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[tool.black] +line-length = 100 +target-version = ['py310'] +fast = true + +[tool.isort] +profile = "black" +extend_skip_glob = ["tests/**", "*.pyx", "testdata/**"] +skip_gitignore = true +line_length = 100 +multi_line_output = 9 +force_single_line = true +float_to_top = true +ensure_newline_before_comments = true + +[tool.pylint.messages_control] +disable = "C0103,C0415" + +[tool.mypy] +exclude = ["bench", "#"] + +[tool.bandit] +exclude_dirs = ["**/test_*.py",] +skips = ["B101", "B105", "B324", "B608"] +# B101 - Use of ASSERT +# B105 - Hardcoded passwords +# B324 - Use of weak crypto +# B608 - Hardcoded SQL diff --git a/requirements.txt b/requirements.txt index 170703d..3b3fbee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -fastapi \ No newline at end of file +fastapi +ormsgpack +orso \ No newline at end of file diff --git a/scratch/binary_search.py b/scratch/binary_search.py new file mode 100644 index 0000000..0108d66 --- /dev/null +++ b/scratch/binary_search.py @@ -0,0 +1,62 @@ +import os +import random +import sys +import time + +from orso.tools import random_string + +from draken.compiled import StringBinaryIndex + +sys.path.insert(1, os.path.join(sys.path[0], "..")) + + +# Performance test harness +def performance_test(): + index = StringBinaryIndex() + + # Generate 10 million items for about 50,000 keys + num_keys = 50000 + num_items = 10000000 + + keys = [random_string(10) for _ in range(num_keys)] + filenames = [random_string(10) for _ in range(num_items)] + + start_time = time.time() + for i in range(num_items): + key = random.choice(keys) + filename = filenames[i] + offset = random.randint(0, 1000000) + index.add_entry(key, filename, offset) + + print(f"Time to add entries: {time.time() - start_time:.4f} seconds") + + start_time = time.time() + index.finalize_index() + print(f"Time to finalize index: {time.time() - start_time:.4f} seconds") + + # Perform lookup_eq + lookup_key = random.choice(keys) + start_time = time.monotonic_ns() + result_eq = index.lookup_eq(lookup_key) + print(f"Time for lookup_eq: {(time.monotonic_ns() - start_time)/1e6:.4f} milliseconds") + print(f"Results for lookup_eq: {len(result_eq)} items") + + # Perform lookup_in_list + lookup_keys = random.sample(keys, 100) + start_time = time.monotonic_ns() + result_in_list = index.lookup_in_list(lookup_keys) + print(f"Time for lookup_in_list: {(time.monotonic_ns() - start_time)/1e6:.4f} milliseconds") + print(f"Results for lookup_in_list: {sum(len(v) for v in result_in_list.values())} items") + + # Perform lookup_range + start_key = random.choice(keys) + end_key = random.choice(keys) + start_key, end_key = min(start_key, end_key), max(start_key, end_key) + start_time = time.time() + result_range = index.lookup_range(start_key, end_key) + print(f"Time for lookup_range: {time.time() - start_time:.4f} seconds") + print(f"Results for lookup_range: {sum(len(v) for v in result_range.values())} items") + + +if __name__ == "__main__": + performance_test() diff --git a/scratch/bloom.py b/scratch/bloom.py new file mode 100644 index 0000000..69e1be9 --- /dev/null +++ b/scratch/bloom.py @@ -0,0 +1,28 @@ +import os +import sys + +from draken.compiled.bloom_filter import BloomFilter +from draken.compiled.bloom_filter import deserialize + +sys.path.insert(1, os.path.join(sys.path[0], "..")) + + +data = { + "term1": {"doc_id": 1, "frequency": 3, "positions": [1, 5, 9]}, + "term2": {"doc_id": 2, "frequency": 2, "positions": [3, 7]}, + "term3": {"doc_id": 1, "frequency": 1, "positions": [4]}, + "term4": {"doc_id": 3, "frequency": 4, "positions": [2, 6, 8, 10]}, + "term5": {"doc_id": 2, "frequency": 1, "positions": [11]}, +} + +bf = BloomFilter() + +for rec in data.keys(): + bf.add(hash(rec)) + +ser = bf.serialize() +print(bytes(ser)) +de = deserialize(ser) + +print(de.possibly_contains(hash("term2"))) +print(de.possibly_contains(hash("term9"))) diff --git a/scratch/had.py b/scratch/had.py new file mode 100644 index 0000000..4e23dfa --- /dev/null +++ b/scratch/had.py @@ -0,0 +1,46 @@ +import os +import sys + +import ormsgpack +import zstd + +from draken.compiled import create_sstable +from draken.compiled.hadro import lookup_eq +from draken.compiled.hadro import lookup_in_list +from draken.compiled.hadro import lookup_range + +sys.path.insert(1, os.path.join(sys.path[0], "..")) + + +def generate_sample_data(): + data = { + "term1": {"doc_id": 1, "frequency": 3, "positions": [1, 5, 9]}, + "term2": {"doc_id": 2, "frequency": 2, "positions": [3, 7]}, + "term3": {"doc_id": 1, "frequency": 1, "positions": [4]}, + "term4": {"doc_id": 3, "frequency": 4, "positions": [2, 6, 8, 10]}, + "term5": {"doc_id": 2, "frequency": 1, "positions": [11]}, + } + return data + + +sample_data = generate_sample_data() + + +sstable_bytes = create_sstable(sample_data) +print(f"SSTable created with size: {len(sstable_bytes)} bytes") + +term_to_lookup = "term2" +result = lookup_eq(sstable_bytes, term_to_lookup) +if result: + print(f"Lookup result for '{term_to_lookup}': {result}") +else: + print(f"'{term_to_lookup}' not found in SSTable") + +terms_to_lookup = ["term1", "term3", "term6"] # 'term6' does not exist +results = lookup_in_list(sstable_bytes, terms_to_lookup) +print(f"Lookup results for terms {terms_to_lookup}: {results}") + +key_to_lookup = "term3" +comparison = "GT" +results = lookup_range(sstable_bytes, key_to_lookup, comparison) +print(f"Lookup range '{comparison}' results for '{key_to_lookup}': {results}") diff --git a/scratch/skip_list.py b/scratch/skip_list.py new file mode 100644 index 0000000..f4dad8f --- /dev/null +++ b/scratch/skip_list.py @@ -0,0 +1,89 @@ +import random +from typing import Any +from typing import Optional +from typing import Tuple + + +class SkipNode: + def __init__(self, key: Any, value: Any, level: int): + self.key = key + self.value = value + self.forward = [None] * (level + 1) + + +class SkipList: + def __init__(self, max_level: int): + self.max_level = max_level + self.head = SkipNode(None, None, max_level) + self.level = 0 + + def random_level(self) -> int: + level = 0 + while random.random() < 0.5 and level < self.max_level: + level += 1 + return level + + def search(self, key: Any) -> Optional[Tuple[Any, Any]]: + current = self.head + for i in range(self.level, -1, -1): + while current.forward[i] and current.forward[i].key < key: + current = current.forward[i] + current = current.forward[0] + if current and current.key == key: + return current.key, current.value + return None + + def insert(self, key: Any, value: Any) -> None: + update = [None] * (self.max_level + 1) + current = self.head + + for i in range(self.level, -1, -1): + while current.forward[i] and current.forward[i].key < key: + current = current.forward[i] + update[i] = current + + level = self.random_level() + if level > self.level: + for i in range(self.level + 1, level + 1): + update[i] = self.head + self.level = level + + new_node = SkipNode(key, value, level) + for i in range(level + 1): + new_node.forward[i] = update[i].forward[i] + update[i].forward[i] = new_node + + def delete(self, key: Any) -> None: + update = [None] * (self.max_level + 1) + current = self.head + + for i in range(self.level, -1, -1): + while current.forward[i] and current.forward[i].key < key: + current = current.forward[i] + update[i] = current + + current = current.forward[0] + if current and current.key == key: + for i in range(self.level + 1): + if update[i].forward[i] != current: + break + update[i].forward[i] = current.forward[i] + + while self.level > 0 and not self.head.forward[self.level]: + self.level -= 1 + + +# Example usage +skip_list = SkipList(3) +skip_list.insert(3, "three") +skip_list.insert(6, "six") +skip_list.insert(7, "seven") +skip_list.insert(9, "nine") +skip_list.insert(12, "twelve") +skip_list.insert(19, "nineteen") + +print(skip_list.search(6)) # Output: (6, 'six') +print(skip_list.search(15)) # Output: None + +skip_list.delete(6) +print(skip_list.search(6)) # Output: None diff --git a/scratch/uwheel.py b/scratch/uwheel.py new file mode 100644 index 0000000..b3cfc3d --- /dev/null +++ b/scratch/uwheel.py @@ -0,0 +1,228 @@ +from collections import defaultdict +from datetime import datetime +from datetime import timedelta +from typing import Dict +from typing import List +from typing import Tuple + +import opteryx + + +class MutablePartialAggregate: + def __init__( + self, + count: int = 0, + sum: float = 0.0, + min_: float = float("inf"), + max_: float = float("-inf"), + missing: int = 0, + ): + self.count = count + self.sum = sum + self.min = min_ + self.max = max_ + self.missing = missing + + +class PartialAggregate: + def __init__(self, count: int, sum: float, min_: float, max_: float, missing: int): + self.count = count + self.sum = sum + self.min = min_ + self.max = max_ + self.missing = missing + + +class Aggregate: + def __init__(self, count: int, sum: float, avg: float, min_: float, max_: float, missing: int): + self.count = count + self.sum = sum + self.avg = avg + self.min = min_ + self.max = max_ + self.missing = missing + + +class AggregationFramework: + def lift(self, input_data: float) -> MutablePartialAggregate: + return MutablePartialAggregate( + count=1, sum=input_data, min_=input_data, max_=input_data, missing=input_data + ) + + def combine_mutable(self, mutable: MutablePartialAggregate, input_data: float) -> None: + mutable.count += 1 + if input_data is None: + mutable.missing += 1 + else: + mutable.sum += input_data + mutable.min = min(mutable.min, input_data) + mutable.max = max(mutable.max, input_data) + + def freeze(self, mutable: MutablePartialAggregate) -> PartialAggregate: + return PartialAggregate( + count=mutable.count, + sum=mutable.sum, + min_=mutable.min, + max_=mutable.max, + missing=mutable.missing, + ) + + def combine(self, a: PartialAggregate, b: PartialAggregate) -> PartialAggregate: + return PartialAggregate( + count=a.count + b.count, + sum=a.sum + b.sum, + min_=min(a.min, b.min), + max_=max(a.max, b.max), + missing=a.missing + b.missing, + ) + + def lower(self, partial: PartialAggregate) -> Aggregate: + avg = partial.sum / partial.count if partial.count != 0 else 0 + return Aggregate( + count=partial.count, + sum=partial.sum, + avg=avg, + min_=partial.min, + max_=partial.max, + missing=partial.missing, + ) + + +class HierarchicalAggregateWheel: + def __init__(self): + self.seconds = defaultdict(MutablePartialAggregate) + self.minutes = defaultdict(MutablePartialAggregate) + self.hours = defaultdict(MutablePartialAggregate) + self.days = defaultdict(MutablePartialAggregate) + self.weeks = defaultdict(MutablePartialAggregate) + self.years = defaultdict(MutablePartialAggregate) + + def aggregate(self, timestamp: datetime, value: float, framework: AggregationFramework): + # Aggregate by second + second_key = timestamp.replace(microsecond=0) + if second_key not in self.seconds: + self.seconds[second_key] = MutablePartialAggregate() + framework.combine_mutable(self.seconds[second_key], value) + + # Aggregate by minute + minute_key = second_key.replace(second=0) + if minute_key not in self.minutes: + self.minutes[minute_key] = MutablePartialAggregate() + framework.combine_mutable(self.minutes[minute_key], value) + + # Aggregate by hour + hour_key = minute_key.replace(minute=0) + if hour_key not in self.hours: + self.hours[hour_key] = MutablePartialAggregate() + framework.combine_mutable(self.hours[hour_key], value) + + # Aggregate by day + day_key = hour_key.replace(hour=0) + if day_key not in self.days: + self.days[day_key] = MutablePartialAggregate() + framework.combine_mutable(self.days[day_key], value) + + # Aggregate by week + week_key = day_key - timedelta(days=day_key.weekday()) + if week_key not in self.weeks: + self.weeks[week_key] = MutablePartialAggregate() + framework.combine_mutable(self.weeks[week_key], value) + + # Aggregate by year + year_key = day_key.replace(month=1, day=1) + if year_key not in self.years: + self.years[year_key] = MutablePartialAggregate() + framework.combine_mutable(self.years[year_key], value) + + def freeze(self, framework: AggregationFramework): + self.seconds = {k: framework.freeze(v) for k, v in self.seconds.items()} + self.minutes = {k: framework.freeze(v) for k, v in self.minutes.items()} + self.hours = {k: framework.freeze(v) for k, v in self.hours.items()} + self.days = {k: framework.freeze(v) for k, v in self.days.items()} + self.weeks = {k: framework.freeze(v) for k, v in self.weeks.items()} + self.years = {k: framework.freeze(v) for k, v in self.years.items()} + + def combine_wheels(self, other, framework: AggregationFramework): + self.seconds = self._combine_dicts(self.seconds, other.seconds, framework) + self.minutes = self._combine_dicts(self.minutes, other.minutes, framework) + self.hours = self._combine_dicts(self.hours, other.hours, framework) + self.days = self._combine_dicts(self.days, other.days, framework) + self.weeks = self._combine_dicts(self.weeks, other.weeks, framework) + self.years = self._combine_dicts(self.years, other.years, framework) + + def _combine_dicts(self, dict_a, dict_b, framework: AggregationFramework): + combined = defaultdict(lambda: PartialAggregate(0, 0.0)) + for k, v in dict_a.items(): + combined[k] = v + for k, v in dict_b.items(): + if k in combined: + combined[k] = framework.combine(combined[k], v) + else: + combined[k] = v + return combined + + +class µWheelIndex: + def __init__(self, framework: AggregationFramework): + self.framework = framework + self.haw = HierarchicalAggregateWheel() + + def build_index_for_file(self, data: List[Tuple[datetime, float]]): + for timestamp, value in data: + self.haw.aggregate(timestamp, value, self.framework) + self.haw.freeze(self.framework) + return self.haw + + def combine_indices(self, haw_a: HierarchicalAggregateWheel, haw_b: HierarchicalAggregateWheel): + haw_a.combine_wheels(haw_b, self.framework) + return haw_a + + def query(self, granularity: str, start: datetime, end: datetime): + aggregates = [] + if granularity == "second": + aggregates = self._query_range(self.haw.seconds, start, end) + elif granularity == "minute": + aggregates = self._query_range(self.haw.minutes, start, end) + elif granularity == "hour": + aggregates = self._query_range(self.haw.hours, start, end) + elif granularity == "day": + aggregates = self._query_range(self.haw.days, start, end) + elif granularity == "week": + aggregates = self._query_range(self.haw.weeks, start, end) + elif granularity == "year": + aggregates = self._query_range(self.haw.years, start, end) + return aggregates + + def _query_range(self, wheel: Dict[datetime, PartialAggregate], start: datetime, end: datetime): + results = [] + for timestamp, aggregate in wheel.items(): + if start <= timestamp <= end: + results.append((timestamp, self.framework.lower(aggregate))) + return results + + +data_file1 = zip( + *opteryx.query( + "select Launched_at, Price from $missions WHERE Launched_at is not null" + ).collect([0, 1]) +) + + +# Example usage +framework = AggregationFramework() +index_service = µWheelIndex(framework) + + +haw1 = index_service.build_index_for_file(data_file1) + +# combined_haw = index_service.combine_indices(haw1, haw2) + +# Querying the combined HAW +start_time = datetime(1960, 5, 15, 12, 0, 0) +end_time = datetime(2024, 5, 15, 12, 59, 59) +results = index_service.query("hour", start_time, end_time) + +for timestamp, aggregate in results: + print( + f"Timestamp: {timestamp}, Count: {aggregate.count}, Sum: {aggregate.sum}, Avg: {aggregate.avg}, Min: {aggregate.min}, Max: {aggregate.max}, missing: {aggregate.missing}" + ) diff --git a/setup.py b/setup.py index 9ac4aaa..ff7c84b 100644 --- a/setup.py +++ b/setup.py @@ -1,12 +1,28 @@ -from setuptools import find_packages, setup +import platform + +from Cython.Build import cythonize +from setuptools import Extension +from setuptools import find_packages +from setuptools import setup LIBRARY = "draken" __author__ = "notset" __version__ = "0.0.0" -#with open(f"{LIBRARY}/__version__.py", mode="r") as v: -# vers = v.read() -#exec(vers) # nosec +with open(f"{LIBRARY}/__version__.py", mode="r") as v: + vers = v.read() +# xec(vers) # nosec + + +def is_mac(): # pragma: no cover + return platform.system().lower() == "darwin" + + +if is_mac(): + COMPILE_FLAGS = ["-O2"] +else: + COMPILE_FLAGS = ["-O2", "-march=native"] + with open("README.md", "r") as rm: long_description = rm.read() @@ -18,16 +34,42 @@ with open("draken.egg-info/requires.txt", "r") as f: required = f.read().splitlines() -setup( - name=LIBRARY, - version=__version__, - description="External Index", - long_description=long_description, - long_description_content_type="text/markdown", - maintainer="@joocer", - author=__author__, - author_email="justin.joyce@joocer.com", - packages=find_packages(include=[LIBRARY, f"{LIBRARY}.*"]), - url=f"https://github.com/mabel-dev/{LIBRARY}/", - install_requires=required, -) +extensions = [ + Extension( + name="draken.compiled.hadro", + sources=["draken/compiled/hadro.pyx"], + extra_compile_args=COMPILE_FLAGS, + ), + Extension( + name="draken.compiled.binary_search", + sources=["draken/compiled/binary_search.pyx"], + extra_compile_args=COMPILE_FLAGS, + ), + Extension( + name="draken.compiled.bloom_filter", + sources=["draken/compiled/bloom_filter.pyx"], + language="c++", + extra_compile_args=COMPILE_FLAGS, + ), +] + +setup_config = { + "name": LIBRARY, + "version": __version__, + "description": "Draken - External Indexes", + "long_description": long_description, + "long_description_content_type": "text/markdown", + "maintainer": "@joocer", + "author": __author__, + "author_email": "justin.joyce@joocer.com", + "packages": find_packages(include=[LIBRARY, f"{LIBRARY}.*"]), + "python_requires": ">=3.9", + "url": "https://github.com/mabel-dev/{LIBRARY}/", + "install_requires": required, + "ext_modules": cythonize(extensions), + "package_data": { + "": ["*.pyx", "*.pxd"], + }, +} + +setup(**setup_config)