diff --git a/README.md b/README.md new file mode 100644 index 0000000..451fd13 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# OpenAgents - search + +A node for batched similarity search on in-memory FAISS indexes. \ No newline at end of file diff --git a/src/main.py b/src/main.py index 15ba199..7df6a22 100644 --- a/src/main.py +++ b/src/main.py @@ -11,6 +11,7 @@ import time import os import gc + class Runner (JobRunner): INDEXES={} SEARCH_QUEUE = [] @@ -21,23 +22,6 @@ def __init__(self, filters, meta, template, sockets): self.MAX_MEMORY_CACHE_GB = float(os.getenv('MAX_MEMORY_CACHE_GB', self.MAX_MEMORY_CACHE_GB)) - async def loadEmbeddingsFromBlobstore(self, i, blobStorage, f, out_vectors, out_content): - # Binary read - sentence_bytes=await blobStorage.readBytes(f) - vectors_bytes=await blobStorage.readBytes(f+".vectors") - shape_bytes=await blobStorage.readBytes(f+".shape") - dtype_bytes=await blobStorage.readBytes(f+".dtype") - # sentence_marker_bytes=blobStorage.readBytes(f+".kind") - # Decode - sentence = sentence_bytes.decode("utf-8") - dtype = dtype_bytes.decode("utf-8") - shape = json.loads(shape_bytes.decode("utf-8")) - embeddings = np.frombuffer(vectors_bytes, dtype=dtype).reshape(shape) - out_vectors[i] = embeddings - out_content[i] = sentence - - - async def deserializeFromBlob(self, url, out_vectors , out_content): blobDisk = await self.openStorage( url) self.log("Reading embeddings from "+url) @@ -46,10 +30,6 @@ async def deserializeFromBlob(self, url, out_vectors , out_content): sentencesIn = await blobDisk.openReadStream("sentences.bin") embeddingsIn = await blobDisk.openReadStream("embeddings.bin") - # embeddings_files = [f for f in files if f.endswith(".embeddings")] - # self.log("Found "+str(len(embeddings_files))+" embeddings files") - # sentences = [] - # vectors = [] dtype = None shape = None @@ -89,18 +69,12 @@ async def deserializeFromJSON( self, data, out_vectors ,out_content): embeddings_b64 = part[1] _dtype = part[2] _shape = part[3] - - - # part_marker=part[4] if len(part)>4 else None - if dtype is None: dtype = _dtype elif dtype != _dtype: raise Exception("Data type mismatch") if shape is None: shape = _shape elif shape != _shape: raise Exception("Shape mismatch") - # Decode embeddings_bytes = base64.b64decode(embeddings_b64) embeddings = np.frombuffer(embeddings_bytes, dtype=dtype).reshape(shape) - # Append out_vectors.append(embeddings) out_content.append(text) return [dtype,shape] @@ -117,8 +91,8 @@ async def deserialize( self, jin,out_vectors ,out_content): [dtype,shape] = await self.deserializeFromJSON(data, out_vectors, out_content) return [dtype,shape] - async def loop(self ): - + + async def loop(self ): if len(self.SEARCH_QUEUE) == 0: await asyncio.sleep(10.0/1000.0) return @@ -223,7 +197,6 @@ def getParamValue(key,default=None): searches_content = [] [dtype,shape] = await self.deserialize(jin, searches_vectors, searches_content) searches_vectors = np.array(searches_vectors) - if normalize and dtype == "float32": faiss.normalize_L2(searches_vectors) queries=searches_vectors