Skip to content

Commit

Permalink
Merge pull request #7 from zzstoatzz/improve-ingest
Browse files Browse the repository at this point in the history
improve ingest
  • Loading branch information
zzstoatzz authored Nov 4, 2024
2 parents 9ca4ba1 + 6e90adf commit e2b8c0b
Show file tree
Hide file tree
Showing 3 changed files with 3,445 additions and 20 deletions.
89 changes: 69 additions & 20 deletions examples/refresh_tpuf/refresh_namespace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# /// script
# dependencies = [
# "prefect",
# "raggy[tpuf]@git+https://github.com/zzstoatzz/raggy@improve-ingest",
# "trafilatura",
# ]
# ///

from datetime import timedelta

from bs4 import BeautifulSoup
Expand Down Expand Up @@ -29,20 +37,44 @@ def html_parser(html: str) -> str:
raggy.settings.html_parser = html_parser


prefect_loaders = [
SitemapLoader(
url_processor=lambda x: x.replace("docs.", "docs-2."),
urls=[
"https://docs-2.prefect.io/sitemap.xml",
"https://prefect.io/sitemap.xml",
],
exclude=["api-ref", "www.prefect.io/events"],
),
GitHubRepoLoader(
repo="PrefectHQ/prefect",
include_globs=["flows/"],
),
]
loaders = {
"prefect-2": [
SitemapLoader(
url_processor=lambda x: x.replace("docs.", "docs-2."),
urls=[
"https://docs-2.prefect.io/sitemap.xml",
"https://prefect.io/sitemap.xml",
],
exclude=["api-ref", "www.prefect.io/events"],
),
GitHubRepoLoader(
repo="PrefectHQ/prefect",
include_globs=["flows/"],
),
],
"prefect-3": [
SitemapLoader(
urls=[
"https://docs.prefect.io/sitemap.xml",
"https://prefect.io/sitemap.xml",
],
exclude=["api-ref", "www.prefect.io/events"],
),
GitHubRepoLoader(
repo="PrefectHQ/prefect",
include_globs=["flows/"],
),
],
"controlflow": [
SitemapLoader(
urls=["https://controlflow.ai/sitemap.xml"],
),
GitHubRepoLoader(
repo="PrefectHQ/controlflow",
include_globs=["examples/"],
),
],
}


@task(
Expand All @@ -58,28 +90,45 @@ async def run_loader(loader: Loader) -> list[Document]:
return await loader.load()


@flow(name="Update Knowledge", log_prints=True)
async def refresh_tpuf_namespace(namespace: str = "testing", reset: bool = False):
@flow(
name="Update Namespace",
flow_run_name="Refreshing {namespace}",
log_prints=True,
)
async def refresh_tpuf_namespace(
namespace: str,
namespace_loaders: list[Loader],
reset: bool = False,
batch_size: int = 100,
):
"""Flow updating vectorstore with info from the Prefect community."""
documents: list[Document] = [
doc
for future in run_loader.map(quote(prefect_loaders)) # type: ignore
for future in run_loader.map(quote(namespace_loaders)) # type: ignore
for doc in future.result()
]

print(f"Loaded {len(documents)} documents from the Prefect community.")

async with TurboPuffer(namespace=namespace) as tpuf:
if reset:
await tpuf.reset()
await task(tpuf.reset)()
print(f"RESETTING: Deleted all documents from tpuf ns {namespace!r}.")

await tpuf.upsert(documents=documents)
await task(tpuf.upsert_batched)(documents=documents, batch_size=batch_size)

print(f"Updated tpuf ns {namespace!r} with {len(documents)} documents.")


@flow(name="Refresh Namespaces", log_prints=True)
async def refresh_tpuf(reset: bool = False, batch_size: int = 100):
for namespace, namespace_loaders in loaders.items():
await refresh_tpuf_namespace(
namespace, namespace_loaders, reset=reset, batch_size=batch_size
)


if __name__ == "__main__":
import asyncio

asyncio.run(refresh_tpuf_namespace(namespace="prefect-2", reset=True)) # type: ignore
asyncio.run(refresh_tpuf(reset=True))
19 changes: 19 additions & 0 deletions src/raggy/vectorstores/tpuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,25 @@ async def ok(self) -> bool:
return False
raise

async def upsert_batched(
self,
documents: Iterable[Document],
batch_size: int = 100,
):
"""Upsert documents in batches to avoid memory issues with large datasets.
Args:
documents: Iterable of documents to upsert
batch_size: Maximum number of documents to upsert in each batch
"""
document_list = list(documents)
total_docs = len(document_list)

for i in range(0, total_docs, batch_size):
batch = document_list[i : i + batch_size]
await self.upsert(documents=batch)
print(f"Upserted batch {i//batch_size + 1} ({len(batch)} documents)")


async def query_namespace(
query_text: str,
Expand Down
Loading

0 comments on commit e2b8c0b

Please sign in to comment.