Skip to content

Commit

Permalink
chore(api): Indexing performance improvements (#799)
Browse files Browse the repository at this point in the history
* Adds batched inserts during indexing
* Adds await during upload file read to reduce blocking
  • Loading branch information
CollectiveUnicorn authored Jul 22, 2024
1 parent 56274f6 commit e679ad2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
74 changes: 37 additions & 37 deletions src/leapfrogai_api/backend/rag/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,35 +319,40 @@ async def aadd_documents(
documents: list[Document],
vector_store_id: str,
file_id: str,
batch_size: int = 100,
) -> list[str]:
"""Adds documents to the vector store.
"""Adds documents to the vector store in batches.
Args:
documents (list[Document]): A list of Langchain Document objects to be added.
vector_store_id (str): The ID of the vector store where the documents will be added.
file_id (str): The ID of the file associated with the documents.
batch_size (int): The size of the batches that will be pushed to the db. This value defaults to 100
as a balance between the memory impact of large files and performance improvements from batching.
Returns:
List[str]: A list of IDs assigned to the added documents.
Raises:
Any exceptions that may occur during the execution of the method.
"""
ids = [] # Initialize the ids list
ids = []
embeddings = await self.embeddings.aembed_documents(
texts=[document.page_content for document in documents]
)

vectors = []
for document, embedding in zip(documents, embeddings):
response = await self._aadd_vector(
vector_store_id=vector_store_id,
file_id=file_id,
content=document.page_content,
metadata=document.metadata,
embedding=embedding,
vector = {
"content": document.page_content,
"metadata": document.metadata,
"embedding": embedding,
}
vectors.append(vector)

for i in range(0, len(vectors), batch_size):
batch = vectors[i : i + batch_size]
response = await self._aadd_vectors(
vector_store_id=vector_store_id, file_id=file_id, vectors=batch
)
ids.append(response[0]["id"])
ids.extend([item["id"] for item in response])

return ids

Expand Down Expand Up @@ -418,39 +423,34 @@ async def _adelete_vector(
)
return response

async def _aadd_vector(
self,
vector_store_id: str,
file_id: str,
content: str,
metadata: str,
embedding: list[float],
async def _aadd_vectors(
self, vector_store_id: str, file_id: str, vectors: list[dict[str, any]]
) -> dict:
"""Add a vector to the vector store.
"""Add multiple vectors to the vector store in a batch.
Args:
vector_store_id (str): The ID of the vector store.
file_id (str): The ID of the file associated with the vector.
content (str): The content of the vector.
metadata (str): The metadata associated with the vector.
embedding (list[float]): The embedding of the vector.
file_id (str): The ID of the file associated with the vectors.
vectors (list[dict]): A list of dictionaries containing vector data.
Returns:
dict: The response from the database after inserting the vector.
dict: The response from the database after inserting the vectors.
"""

user_id: str = (await self.db.auth.get_user()).user.id

row: dict[str, any] = {
"user_id": user_id,
"vector_store_id": vector_store_id,
"file_id": file_id,
"content": content,
"metadata": metadata,
"embedding": embedding,
}
data, _count = await self.db.from_(self.table_name).insert(row).execute()
rows = []
for vector in vectors:
row = {
"user_id": user_id,
"vector_store_id": vector_store_id,
"file_id": file_id,
"content": vector["content"],
"metadata": vector["metadata"],
"embedding": vector["embedding"],
}
rows.append(row)

data, _count = await self.db.from_(self.table_name).insert(rows).execute()

_, response = data

Expand Down
2 changes: 1 addition & 1 deletion src/leapfrogai_api/data/crud_file_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def upload(self, file: UploadFile, id_: str):
"""Upload a file to the file bucket."""

return await self.client.storage.from_("file_bucket").upload(
file=file.file.read(), path=f"{id_}"
file=await file.read(), path=f"{id_}"
)

async def download(self, id_: str):
Expand Down

0 comments on commit e679ad2

Please sign in to comment.