Skip to content

Commit

Permalink
Set maximum memory to be used for indexes, when the limit is reached …
Browse files Browse the repository at this point in the history
…the oldest used index is dropped
  • Loading branch information
riccardobl committed Apr 26, 2024
1 parent e4112e4 commit 15ddb8a
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
import hashlib
import asyncio
import time
import gc
class Runner (JobRunner):
INDEXES={}
SEARCH_QUEUE = []
MAX_MEMORY_CACHE_GB = 1

def __init__(self, filters, meta, template, sockets):
super().__init__(filters, meta, template, sockets)
self.MAX_MEMORY_CACHE_GB = int(os.getenv('MAX_MEMORY_CACHE_GB', self.MAX_MEMORY_CACHE_GB))


async def deserializeFromBlob(self, url, out_vectors , out_content):
blobStorage = await self.openStorage( url)
Expand Down Expand Up @@ -162,10 +167,20 @@ def getParamValue(key,default=None):
self.log("Creating faiss index")
faiss_index = faiss.IndexFlatL2(shape[0])
faiss_index.add(index_vectors)
index = [faiss_index, time.time(), index_content]
indexSizeGB = faiss_index.ntotal * shape[0] * 4 / 1024 / 1024 / 1024
index = [faiss_index, time.time(), index_content, indexSizeGB]
self.INDEXES[indexId] = index

# drop oldest index if out of memory limit
totalSize = sum([x[3] for x in self.INDEXES.values()])
while totalSize > self.MAX_MEMORY_CACHE_GB:
oldest = min(self.INDEXES.values(), key=lambda x: x[1])
self.log("Max cache size reached. Dropping oldest index.")
del self.INDEXES[oldest]
totalSize -= oldest[3]
gc.collect()


# TODO: drop oldest index if out of memory limit

else:
self.log("Index already loaded")
Expand Down

0 comments on commit 15ddb8a

Please sign in to comment.