diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index 21205cc..207c037 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -4,6 +4,10 @@ on: push: tags: - v* + branches: + - main + paths: + - "docs/**" workflow_dispatch: permissions: @@ -31,4 +35,4 @@ jobs: cairosvg - name: Publish docs - run: mkdocs gh-deploy --force \ No newline at end of file + run: mkdocs gh-deploy --force diff --git a/README.md b/README.md index b02c9f7..4ac843f 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,16 @@ pip install raggy Read the [docs](https://zzstoatzz.github.io/raggy/) -### examples +### What is it? + +A Python library for: - scraping the web to produce rich documents - putting these documents in vectorstores - querying the vectorstores to find documents similar to a query -see this [example](https://github.com/zzstoatzz/raggy/blob/main/examples/refresh_vectorstore/refresh_tpuf.py) I use to refresh a chatbot that knows about `prefect`. +See this [example](https://github.com/zzstoatzz/raggy/blob/main/examples/chat_with_X/website.py) to chat with any website, or this [example](https://github.com/zzstoatzz/raggy/blob/main/examples/chat_with_X/repo.py) to chat with any GitHub repo. + +### Contributing + +We welcome contributions! See our [contributing guide](https://zzstoatzz.github.io/raggy/contributing) for details. diff --git a/docs/contributing.md b/docs/contributing.md new file mode 100644 index 0000000..f36a819 --- /dev/null +++ b/docs/contributing.md @@ -0,0 +1,67 @@ +# Contributing to Raggy + +We love your input! We want to make contributing to Raggy as easy and transparent as possible. + +## Development Setup + +We recommend using [uv](https://github.com/astral-sh/uv) for Python environment management and package installation: + +```bash +# Install uv +curl -LsSf https://astral.sh/uv/install.sh | sh + +# Clone the repo +git clone https://github.com/zzstoatzz/raggy.git +cd raggy + +# Create and activate a virtual environment +uv venv + +# Install in editable mode with dev dependencies +uv pip install -e ".[dev]" +``` + +## Running Tests + +```bash +# Install test dependencies +uv pip install -e ".[test]" + +# Run tests +pytest +``` + +## Building Documentation + +```bash +# Install docs dependencies +uv pip install -e ".[docs]" + +# Serve docs locally +mkdocs serve +``` + +## Code Style + +``` +pre-commit install +pre-commit run --all-files # happens automatically on commit +``` + +## Running Examples + +All examples can be run using uv: + +!!! question "where are the dependencies?" + `uv` will run the example in an isolated environment using [inline script dependencies](https://docs.astral.sh/uv/guides/scripts/#declaring-script-dependencies). + +```bash +# Run example +uv run examples/chat_with_X/website.py +``` + +See our [example gallery](examples/index.md) for more details. + +## Versioning + +We use [Semantic Versioning](http://semver.org/). For the versions available, see the [tags on this repository](https://github.com/zzstoatzz/raggy/tags). diff --git a/docs/examples/index.md b/docs/examples/index.md new file mode 100644 index 0000000..18e450e --- /dev/null +++ b/docs/examples/index.md @@ -0,0 +1,35 @@ +# Example Gallery + +Here are some practical examples of using `raggy` in real-world scenarios. + +## Chat with Content + +Ye old "chat your data" examples. + +#### Chat with a Website + +```bash +uv run examples/chat_with_X/website.py "let's chat about docs.astral.sh/uv" +``` + +#### Chat with a GitHub Repo + +```bash +uv run examples/chat_with_X/repo.py "let's chat about astral-sh/uv" +``` + +## Refresh Vectorstores + +A `prefect` flow to gather documents from sources of knowledge, embed them and put them in a vectorstore. + +#### Refresh TurboPuffer + +```bash +uv run examples/refresh_vectorstore/tpuf_namespace.py +``` + +#### Refresh Chroma + +```bash +uv run examples/refresh_vectorstore/chroma_collection.py +``` diff --git a/docs/hooks.py b/docs/hooks.py index 9ec138e..cb20052 100644 --- a/docs/hooks.py +++ b/docs/hooks.py @@ -1,21 +1,8 @@ import logging -import subprocess log = logging.getLogger("mkdocs") def on_pre_build(config, **kwargs): - """Add a custom route to the server.""" - try: - subprocess.run( - [ - "npx", - "tailwindcss", - "-i", - "./docs/overrides/tailwind.css", - "-o", - "./docs/static/css/tailwind.css", - ] - ) - except Exception: - log.error("You need to install tailwindcss using npx install tailwindcss") + """Add any pre-build hooks here.""" + pass diff --git a/docs/ingest_strategy.md b/docs/ingest_strategy.md index 1762d0c..1c08488 100644 --- a/docs/ingest_strategy.md +++ b/docs/ingest_strategy.md @@ -1 +1,86 @@ -# Coming soon! \ No newline at end of file +# Ingest Strategy + +When building RAG applications, you often need to load and refresh content from multiple sources. This can involve: +- Expensive API calls +- Large document processing +- Concurrent embedding operations + +We use [Prefect](https://docs.prefect.io) to handle these challenges, giving us: + +- Automatic caching of expensive operations +- Concurrent processing with backpressure +- Observability and retries + +Let's look at a real example that demonstrates these concepts. + +## Building a Knowledge Base + +```python +from datetime import timedelta +import httpx +from prefect import flow, task +from prefect.tasks import task_input_hash + +from raggy.loaders.github import GitHubRepoLoader +from raggy.loaders.web import SitemapLoader +from raggy.vectorstores.tpuf import TurboPuffer + +# Cache based on content changes +def get_last_modified(context, parameters): + """Only reload if the content has changed.""" + try: + return httpx.head(parameters["urls"][0]).headers.get("Last-Modified", "") + except Exception: + return None + +@task( + cache_key_fn=get_last_modified, + cache_expiration=timedelta(hours=24), + retries=2, +) +async def gather_documents(urls: list[str]): + return await SitemapLoader(urls=urls).load() + +@flow +async def refresh_knowledge(): + # Load from multiple sources + documents = [] + for loader in [ + SitemapLoader(urls=["https://docs.prefect.io/sitemap.xml"]), + GitHubRepoLoader(repo="PrefectHQ/prefect", include_globs=["README.md"]), + ]: + documents.extend(await gather_documents(loader)) + + # Store efficiently with concurrent embedding + with TurboPuffer(namespace="knowledge") as tpuf: + await tpuf.upsert_batched( + documents, + batch_size=100, # tune based on document size + max_concurrent=8 # tune based on rate limits + ) +``` + +This example shows key patterns: + +1. Content-aware caching (`Last-Modified` headers, commit SHAs, etc) +2. Automatic retries for resilience +3. Concurrent processing with backpressure +4. Efficient batching of embedding operations + +See the [refresh examples](https://github.com/zzstoatzz/raggy/tree/main/examples/refresh_vectorstore) for complete implementations using both Chroma and TurboPuffer. + +## Performance Tips + +For production workloads: +```python +@task( + retries=2, + retry_delay_seconds=[3, 60], # exponential backoff + cache_expiration=timedelta(days=1), + persist_result=True, # save results to storage +) +async def gather_documents(loader): + return await loader.load() +``` + +See [Prefect's documentation](https://docs.prefect.io/latest/concepts/tasks/) for more on task configuration and caching strategies. \ No newline at end of file diff --git a/docs/overrides/main.html b/docs/overrides/main.html new file mode 100644 index 0000000..d081af8 --- /dev/null +++ b/docs/overrides/main.html @@ -0,0 +1,20 @@ +{% extends "base.html" %} + +{% block announce %} + + + {{ config.extra.announcement.text }} + +{% endblock %} \ No newline at end of file diff --git a/docs/welcome/tutorial.md b/docs/welcome/tutorial.md index 4a84eeb..d309685 100644 --- a/docs/welcome/tutorial.md +++ b/docs/welcome/tutorial.md @@ -16,21 +16,44 @@ print(documents[0]) ## Adding documents to a vectorstore -```python -from raggy.vectorstores.tpuf import Turbopuffer +!!! note "New in 0.2.0" +Vectorstore operations are now synchronous by default, with async batching available via `upsert_batched`. -async with Turbopuffer() as vectorstore: # uses default `raggy` namespace - await vectorstore.upsert(documents) +```python +from raggy.vectorstores.tpuf import TurboPuffer + +with TurboPuffer(namespace="my_documents") as vectorstore: + # Synchronous operation + vectorstore.upsert(documents) + + # Async batched usage for large document sets + await vectorstore.upsert_batched( + documents, + batch_size=100, + max_concurrent=8 + ) ``` ## Querying the vectorstore ```python -from raggy.vectorstores.tpuf import query_namespace - -print(await query_namespace("how do I get started with raggy?")) +from raggy.vectorstores.tpuf import query_namespace, multi_query_tpuf + +# Single query +result = query_namespace("how do I get started with raggy?") +print(result) + +# Multiple related queries for better coverage +result = multi_query_tpuf([ + "how to install raggy", + "basic raggy usage", + "raggy getting started" +]) +print(result) ``` -## Real-world example +## Real-world examples -See [this example](https://github.com/zzstoatzz/raggy/blob/main/examples/refresh_vectorstore/refresh_tpuf.py) I use to refresh a chatbot that knows about `prefect`. +- [Chat with a GitHub repo](https://github.com/zzstoatzz/raggy/blob/main/examples/chat_with_X/repo.py) +- [Chat with a website](https://github.com/zzstoatzz/raggy/blob/main/examples/chat_with_X/website.py) +- [Refresh a vectorstore](https://github.com/zzstoatzz/raggy/blob/main/examples/refresh_vectorstore/tpuf_namespace.py) diff --git a/examples/chat_with_X/repo.py b/examples/chat_with_X/repo.py index 0771512..f11454b 100644 --- a/examples/chat_with_X/repo.py +++ b/examples/chat_with_X/repo.py @@ -1,41 +1,73 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "marvin", +# "prefect", +# "raggy[tpuf]", +# "trafilatura" +# ] +# /// + +import asyncio import warnings import httpx -import prefect.runtime.flow_run as run from marvin.beta.assistants import Assistant -from marvin.utilities.tools import custom_partial from prefect import flow, task -from prefect.utilities.asyncutils import run_coro_as_sync from rich.status import Status from raggy.documents import Document from raggy.loaders.github import GitHubRepoLoader -from raggy.vectorstores.tpuf import TurboPuffer, query_namespace +from raggy.vectorstores.tpuf import TurboPuffer, multi_query_tpuf TPUF_NS = "demo" +def get_last_commit_sha(context, parameters) -> str | None: + """Cache based on Last-Modified header of the first URL.""" + try: + return httpx.get( + f"https://api.github.com/repos/{parameters['repo']}/commits/main" + ).json()["sha"] + except Exception: + return None + + @task( task_run_name="load documents from {repo}", - cache_key_fn=lambda *_, **__: httpx.get( # update embeddings on changes to the repo - f"https://api.github.com/repos/{run.parameters['repo']}/commits/main" - ).json()["sha"], + cache_key_fn=get_last_commit_sha, ) async def gather_documents(repo: str) -> list[Document]: return await GitHubRepoLoader(repo=repo).load() -@task -async def upsert_documents(documents: list[Document]): - async with TurboPuffer(namespace=TPUF_NS) as tpuf: - print(f"Upserting {len(documents)} documents into {TPUF_NS}") - await tpuf.upsert(documents) - - @flow(flow_run_name="{repo}") async def ingest_repo(repo: str): + """Ingest a GitHub repository into the vector database. + + Args: + repo: The repository to ingest (format: "owner/repo"). + """ documents = await gather_documents(repo) - await upsert_documents(documents) + with TurboPuffer(namespace=TPUF_NS) as tpuf: + print(f"Upserting {len(documents)} documents into {TPUF_NS}") + await task(tpuf.upsert_batched)(documents) + + +@task(task_run_name="querying: {query_texts}") +def do_research(query_texts: list[str]): + """Query the vector database. + + Args: + query_texts: The queries to search for. + + Examples: + ```python + >>> "user says: what does this repo use for packaging?" + >>> "assistant: do_research(['packaging', 'dependencies', 'setuptools'])" + ``` + """ + return multi_query_tpuf(queries=query_texts, namespace=TPUF_NS) @flow(log_prints=True) @@ -43,7 +75,7 @@ async def chat_with_repo(initial_message: str | None = None, clean_up: bool = Tr try: with Assistant( model="gpt-4o", - name="Raggy Expert", + name="Repo Researcher", instructions=( "Use your tools to ingest and chat about a GitHub repo. " "Let the user choose questions, query as needed to get good answers. " @@ -51,23 +83,26 @@ async def chat_with_repo(initial_message: str | None = None, clean_up: bool = Tr ), tools=[ ingest_repo, - custom_partial( - task(task_run_name="Q: {query_text}")(query_namespace), - namespace=TPUF_NS, - ), + do_research, ], ) as assistant: assistant.chat(initial_message=initial_message) # type: ignore finally: if clean_up: - async with TurboPuffer(namespace=TPUF_NS) as tpuf: + with TurboPuffer(namespace=TPUF_NS) as tpuf: with Status(f"Cleaning up namespace {TPUF_NS}"): - await task(tpuf.reset)() + task(tpuf.reset)() if __name__ == "__main__": + import sys + warnings.filterwarnings("ignore", category=UserWarning) - run_coro_as_sync( - chat_with_repo("lets chat about zzstoatzz/prefect-bot - please ingest it") - ) + + if len(sys.argv) > 1: + initial_message = sys.argv[1] + else: + initial_message = "lets chat about zzstoatzz/raggy - please ingest it and tell me how to contribute" + + asyncio.run(chat_with_repo(initial_message)) diff --git a/examples/chat_with_X/requirements.txt b/examples/chat_with_X/requirements.txt deleted file mode 100644 index b9d59c3..0000000 --- a/examples/chat_with_X/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -marvin -raggy -turbopuffer \ No newline at end of file diff --git a/examples/chat_with_X/website.py b/examples/chat_with_X/website.py index e69de29..d381724 100644 --- a/examples/chat_with_X/website.py +++ b/examples/chat_with_X/website.py @@ -0,0 +1,130 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "marvin", +# "prefect", +# "raggy[tpuf]", +# "trafilatura" +# ] +# /// + +import asyncio +import re +import warnings +from datetime import timedelta + +import httpx +import trafilatura +from bs4 import BeautifulSoup +from marvin.beta.assistants import Assistant +from prefect import flow, task +from rich.status import Status + +import raggy +from raggy.documents import Document +from raggy.loaders.web import SitemapLoader +from raggy.vectorstores.tpuf import TurboPuffer, multi_query_tpuf + +TPUF_NS = "demo" + + +def html_parser(html: str) -> str: + return trafilatura.extract(html) or BeautifulSoup(html, "html.parser").get_text() + + +raggy.settings.html_parser = html_parser + + +def get_last_modified(context, parameters): + """Cache based on Last-Modified header of the first URL.""" + try: + with httpx.Client() as client: + response = client.head(parameters["urls"][0]) + return response.headers.get("Last-Modified", "") + except Exception: + return None + + +@task( + task_run_name="load documents from {urls}", + cache_key_fn=get_last_modified, + cache_expiration=timedelta(hours=24), +) +async def gather_documents( + urls: list[str], exclude: list[str | re.Pattern] | None = None +) -> list[Document]: + return await SitemapLoader(urls=urls, exclude=exclude or []).load() + + +@flow(flow_run_name="{urls}") +async def ingest_website( + urls: list[str], exclude: list[str | re.Pattern] | None = None +): + """Ingest a website into the vector database. + + Args: + urls: The URLs to ingest (exact or glob patterns). + exclude: The URLs to exclude (exact or glob patterns). + """ + documents = await gather_documents(urls, exclude) + with TurboPuffer(namespace=TPUF_NS) as tpuf: + print(f"Upserting {len(documents)} documents into {TPUF_NS}") + await tpuf.upsert_batched(documents) + + +@task(task_run_name="querying: {query_texts}") +def do_research(query_texts: list[str]): + """Query the vector database. + + Args: + query_texts: The queries to search for. + + Examples: + ```python + >>> "user says: how to create a flow in Prefect?" + >>> "assistant: do_research(['create flows', 'prefect overview'])" + ``` + """ + return multi_query_tpuf(queries=query_texts, namespace=TPUF_NS) + + +@flow(log_prints=True) +async def chat_with_website(initial_message: str | None = None, clean_up: bool = True): + try: + with Assistant( + model="gpt-4o", + name="Website Expert", + instructions=( + "Use your tools to ingest and chat about website content. " + "Let the user choose questions, query as needed to get good answers. " + "You must find documented content on the website before making claims." + ), + tools=[ + ingest_website, + do_research, + ], + ) as assistant: + assistant.chat(initial_message=initial_message) # type: ignore + + finally: + if clean_up: + with TurboPuffer(namespace=TPUF_NS) as tpuf: + with Status(f"Cleaning up namespace {TPUF_NS}"): + task(tpuf.reset)() + + +if __name__ == "__main__": + import sys + + warnings.filterwarnings("ignore", category=UserWarning) + + if len(sys.argv) > 1: + initial_message = sys.argv[1] + else: + initial_message = ( + "let's chat about this project - " + "please ingest the docs @ https://zzstoatzz.github.io/raggy/sitemap.xml " + "and then tell me how it works" + ) + + asyncio.run(chat_with_website(initial_message)) diff --git a/examples/reddit_thread.py b/examples/reddit_thread.py index 508454c..8806c54 100644 --- a/examples/reddit_thread.py +++ b/examples/reddit_thread.py @@ -1,7 +1,15 @@ +# /// script +# dependencies = [ +# "marvin", +# "praw", +# "raggy[tpuf]", +# ] +# /// + from functools import lru_cache -import marvin # pip install marvin -import praw # pip install praw +import marvin +import praw from marvin.utilities.logging import get_logger from pydantic_settings import BaseSettings, SettingsConfigDict @@ -43,35 +51,30 @@ def read_thread(submission_id: str): return text_buffer -async def save_thread(thread_text: str): - logger.info("Saving thread") - chunked_documents = await document_to_excerpts(Document(text=thread_text)) - - async with TurboPuffer(namespace="reddit_thread") as tpuf: - await tpuf.upsert(chunked_documents) - - return "Thread saved!" - - @marvin.fn def summarize_results(relevant_excerpts: str) -> str: # type: ignore[empty-body] """give a summary of the relevant excerpts""" -async def main(): +async def main(thread_id: str): logger.info("Starting Reddit thread example") - thread_text = read_thread("1bpf4lr") # r/Chicago thread - await save_thread(thread_text) + thread_text = read_thread(thread_id) + chunked_documents = await document_to_excerpts(Document(text=thread_text)) + + with TurboPuffer(namespace="reddit_thread") as tpuf: + tpuf.upsert(chunked_documents) + + logger.info("Thread saved!") query = "how do people feel about the return of the water taxis?" - results = await query_namespace(query, namespace="reddit_thread") + results = query_namespace(query, namespace="reddit_thread") print(summarize_results(results)) if __name__ == "__main__": import asyncio - asyncio.run(main()) + asyncio.run(main(thread_id="1bpf4lr")) # r/Chicago thread """ The consensus among several comments is a positive reaction to the Chicago Water Taxi resuming 7-day service, which hadn't occurred since 2019. People express that this marks the city's recovery and share their enthusiasm for the convenient transportation it provides. Some commenters also discuss potential improvements and expansions, such as increased service locations and eco-friendly options like electric hydrofoils. diff --git a/examples/refresh_vectorstore/refresh_chroma.py b/examples/refresh_vectorstore/chroma_collection.py similarity index 82% rename from examples/refresh_vectorstore/refresh_chroma.py rename to examples/refresh_vectorstore/chroma_collection.py index 4c8d2da..ef9f117 100644 --- a/examples/refresh_vectorstore/refresh_chroma.py +++ b/examples/refresh_vectorstore/chroma_collection.py @@ -1,8 +1,10 @@ # /// script # dependencies = [ -# "raggy[chroma]", -# "trafilatura", +# "prefect", +# "raggy[chroma]", +# "trafilatura", # ] +# /// from datetime import timedelta from typing import Literal @@ -51,26 +53,26 @@ def html_parser(html: str) -> str: cache_expiration=timedelta(days=1), task_run_name="Run {loader.__class__.__name__}", persist_result=True, - refresh_cache=True, + # refresh_cache=True, ) async def run_loader(loader: Loader) -> list[Document]: return await loader.load() @task -async def add_documents( +def add_documents( chroma: Chroma, documents: list[Document], mode: Literal["upsert", "reset"] ) -> list[ChromaDocument]: if mode == "reset": - await chroma.reset_collection() - docs = await chroma.add(documents) + chroma.reset_collection() + docs = chroma.add(documents) elif mode == "upsert": - docs = await chroma.upsert(documents) + docs = chroma.upsert(documents) return docs @flow(name="Update Knowledge", log_prints=True) -async def refresh_chroma( +def refresh_chroma( collection_name: str = "default", chroma_client_type: ChromaClientType = "base", mode: Literal["upsert", "reset"] = "upsert", @@ -84,17 +86,13 @@ async def refresh_chroma( print(f"Loaded {len(documents)} documents from the Prefect community.") - async with Chroma( + with Chroma( collection_name=collection_name, client_type=chroma_client_type ) as chroma: - docs = await add_documents(chroma, documents, mode) + docs = add_documents(chroma, documents, mode) print(f"Added {len(docs)} documents to the {collection_name} collection.") # type: ignore if __name__ == "__main__": - import asyncio - - asyncio.run( - refresh_chroma(collection_name="test", chroma_client_type="cloud", mode="reset") # type: ignore - ) + refresh_chroma(collection_name="test", chroma_client_type="cloud", mode="reset") diff --git a/examples/refresh_vectorstore/refresh_tpuf.py b/examples/refresh_vectorstore/tpuf_namespace.py similarity index 88% rename from examples/refresh_vectorstore/refresh_tpuf.py rename to examples/refresh_vectorstore/tpuf_namespace.py index 0acb067..627904f 100644 --- a/examples/refresh_vectorstore/refresh_tpuf.py +++ b/examples/refresh_vectorstore/tpuf_namespace.py @@ -1,7 +1,7 @@ # /// script # dependencies = [ # "prefect", -# "raggy[tpuf]@git+https://github.com/zzstoatzz/raggy", +# "raggy[tpuf]", # "trafilatura", # ] # /// @@ -95,7 +95,7 @@ async def run_loader(loader: Loader) -> list[Document]: flow_run_name="Refreshing {namespace}", log_prints=True, ) -async def refresh_tpuf_namespace( +def refresh_tpuf_namespace( namespace: str, namespace_loaders: list[Loader], reset: bool = False, @@ -111,32 +111,29 @@ async def refresh_tpuf_namespace( print(f"Loaded {len(documents)} documents from the Prefect community.") - async with TurboPuffer(namespace=namespace) as tpuf: + with TurboPuffer(namespace=namespace) as tpuf: if reset: - await task(tpuf.reset)() + task(tpuf.reset)() print(f"RESETTING: Deleted all documents from tpuf ns {namespace!r}.") - await task(tpuf.upsert_batched)( + task(tpuf.upsert_batched).submit( documents=documents, batch_size=batch_size, max_concurrent=max_concurrent - ) + ).wait() print(f"Updated tpuf ns {namespace!r} with {len(documents)} documents.") @flow(name="Refresh Namespaces", log_prints=True) -async def refresh_tpuf( - reset: bool = False, batch_size: int = 100, test_mode: bool = False -): +def refresh_tpuf(reset: bool = False, batch_size: int = 100, test_mode: bool = False): for namespace, namespace_loaders in loaders.items(): if test_mode: namespace = f"TESTING-{namespace}" - await refresh_tpuf_namespace( + refresh_tpuf_namespace( namespace, namespace_loaders, reset=reset, batch_size=batch_size ) if __name__ == "__main__": - import asyncio import sys if len(sys.argv) > 1: @@ -144,4 +141,4 @@ async def refresh_tpuf( else: test_mode = True - asyncio.run(refresh_tpuf(reset=True, test_mode=test_mode)) + refresh_tpuf(reset=True, test_mode=test_mode) diff --git a/mkdocs.yml b/mkdocs.yml index 23c238b..7582b1f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,40 +1,37 @@ site_name: raggy -site_description: 'A R.A.G toolkit' +site_description: "A R.A.G toolkit" site_url: https://zzstoatzz.github.io/raggy/ docs_dir: docs repo_url: https://github.com/zzstoatzz/raggy nav: - - Docs: - - Getting started: + - Home: index.md + - Getting Started: - Installation: welcome/installation.md - Tutorial: welcome/tutorial.md - - Ingest: - - Strategy: ingest_strategy.md - - - API reference: - - Home: - - raggy: api_reference/index.md - - Loaders: - - raggy.loaders.base: api_reference/loaders/base.md - - raggy.loaders.github: api_reference/loaders/github.md - - raggy.loaders.pdf: api_reference/loaders/pdf.md - - raggy.loaders.web: api_reference/loaders/web.md - - Vectorstores: - - raggy.vectorstores.base: api_reference/vectorstores/base.md - - raggy.vectorstores.chroma: api_reference/vectorstores/chroma.md - - raggy.vectorstores.tpuf: api_reference/vectorstores/tpuf.md - - Settings: - - raggy.settings: api_reference/settings.md - - Utilities: - - raggy.utilities.asyncutils: api_reference/utilities/asyncutils.md - - raggy.utilities.collections: api_reference/utilities/collections.md - - raggy.utilities.embeddings: api_reference/utilities/embeddings.md - - raggy.utilities.filesystem: api_reference/utilities/filesystem.md - - raggy.utilities.ids: api_reference/utilities/ids.md - - raggy.utilities.logging: api_reference/utilities/logging.md - - raggy.utilities.text: api_reference/utilities/text.md - + - Examples: examples/index.md + - Contributing: contributing.md + - Ingest Strategy: ingest_strategy.md + - API Reference: + - Overview: api_reference/index.md + - Loaders: + - Base: api_reference/loaders/base.md + - GitHub: api_reference/loaders/github.md + - PDF: api_reference/loaders/pdf.md + - Web: api_reference/loaders/web.md + - Vectorstores: + - Base: api_reference/vectorstores/base.md + - Chroma: api_reference/vectorstores/chroma.md + - TurboPuffer: api_reference/vectorstores/tpuf.md + - Settings: api_reference/settings.md + - Utilities: + - Async: api_reference/utilities/asyncutils.md + - Collections: api_reference/utilities/collections.md + - Embeddings: api_reference/utilities/embeddings.md + - Filesystem: api_reference/utilities/filesystem.md + - IDs: api_reference/utilities/ids.md + - Logging: api_reference/utilities/logging.md + - Text: api_reference/utilities/text.md theme: features: @@ -48,12 +45,17 @@ theme: - content.code.copy - content.code.select - content.code.annotate + - announce.dismiss palette: primary: green accent: teal name: material logo: assets/logos/raggy.png favicon: assets/logos/raggy.png + custom_dir: docs/overrides + font: + text: Roboto + code: Roboto Mono plugins: - search @@ -61,10 +63,9 @@ plugins: - social: cards_layout_options: font_family: Inter - # background_color: "#2d6df6" background_color: "#181544" - awesome-pages - - autolinks + - autolinks - mkdocstrings: handlers: python: @@ -84,7 +85,8 @@ plugins: heading_level: 2 filters: ["!^_"] import: - - https://docs.python-requests.org/en/master/objects.inv + - https://docs.python-requests.org/en/master/objects.inv + watch: - src - docs @@ -112,18 +114,20 @@ markdown_extensions: - toc: permalink: true title: On this page - extra: - get_started: welcome/what_is_raggy - hero: - title: 'raggy' - description: 'a R.A.G toolkit' - announcement: - title: "so you want to do R.A.G?" - url: https://github.com/zzstoatzz/raggy/releases/tag/v0.2.0 + analytics: + provider: google + property: !ENV GOOGLE_ANALYTICS_KEY social: - icon: fontawesome/brands/github - link: https://github.com/prefecthq/raggy + link: https://github.com/zzstoatzz/raggy - icon: fontawesome/brands/twitter - link: https://twitter.com/Nathan_Nowack \ No newline at end of file + link: https://twitter.com/Nathan_Nowack + version: + provider: mike + generator: false + homepage: https://zzstoatzz.github.io/raggy + announcement: + text: "New in 0.2.0: Simplified vectorstore operations" + link: https://github.com/zzstoatzz/raggy/releases/tag/v0.2.0 diff --git a/src/raggy/loaders/web.py b/src/raggy/loaders/web.py index 74d0ed8..8d17e9d 100644 --- a/src/raggy/loaders/web.py +++ b/src/raggy/loaders/web.py @@ -159,7 +159,8 @@ async def _get_loader(self: Self) -> MultiLoader: loaders=[ type(self.url_loader)(urls=url_batch, headers=await self.get_headers()) # type: ignore for url_batch in batched( - [self.url_processor(u) for url_list in urls for u in url_list], 10 + [self.url_processor(u) for url_list in urls for u in url_list], # type: ignore + 10, ) ] ) diff --git a/src/raggy/utilities/asyncutils.py b/src/raggy/utilities/asyncutils.py index cecce0e..f33b384 100644 --- a/src/raggy/utilities/asyncutils.py +++ b/src/raggy/utilities/asyncutils.py @@ -1,11 +1,12 @@ from functools import partial -from typing import Any, Callable, TypeVar +from typing import Any, Callable, ParamSpec, TypeVar import anyio from anyio import create_task_group, to_thread from raggy import settings +P = ParamSpec("P") T = TypeVar("T") RAGGY_THREAD_LIMITER: anyio.CapacityLimiter | None = None @@ -32,9 +33,9 @@ async def run_sync_in_worker_thread( async def run_concurrent_tasks( - tasks: list[Callable], + tasks: list[Callable[P, T]], max_concurrent: int = settings.max_concurrent_tasks, -): +) -> list[T]: """Run multiple tasks concurrently with a limit on concurrent execution. Args: diff --git a/src/raggy/utilities/embeddings.py b/src/raggy/utilities/embeddings.py index 54855c6..cac4181 100644 --- a/src/raggy/utilities/embeddings.py +++ b/src/raggy/utilities/embeddings.py @@ -1,4 +1,4 @@ -from typing import overload +from typing import Any, TypeAlias, overload from openai import APIConnectionError, AsyncOpenAI from openai.types import CreateEmbeddingResponse @@ -6,13 +6,15 @@ import raggy +Embedding: TypeAlias = Any + @overload async def create_openai_embeddings( input_: str, timeout: int = 60, model: str = raggy.settings.openai_embeddings_model, -) -> list[float]: +) -> Embedding: ... @@ -21,7 +23,7 @@ async def create_openai_embeddings( input_: list[str], timeout: int = 60, model: str = raggy.settings.openai_embeddings_model, -) -> list[list[float]]: +) -> list[Embedding]: ... @@ -34,7 +36,7 @@ async def create_openai_embeddings( input_: str | list[str], timeout: int = 60, model: str = raggy.settings.openai_embeddings_model, -) -> list[float] | list[list[float]]: +) -> Embedding | list[Embedding]: """Create OpenAI embeddings for a list of texts. Args: diff --git a/src/raggy/vectorstores/base.py b/src/raggy/vectorstores/base.py index 07ccd26..91bcbac 100644 --- a/src/raggy/vectorstores/base.py +++ b/src/raggy/vectorstores/base.py @@ -6,10 +6,10 @@ class Vectorstore(BaseModel): """Base class for vectorstores. - Allows for easy logging and async context management. + Allows for easy logging and context management. Attributes: - _in_context: Whether the vectorstore is currently in an async context. + _in_context: Whether the vectorstore is currently in a context. Example: Basic Usage of `Vectorstore` @@ -19,7 +19,7 @@ class Vectorstore(BaseModel): class MyVectorstore(Vectorstore): pass - async with MyVectorstore() as vectorstore: + with MyVectorstore() as vectorstore: ... ``` """ @@ -32,9 +32,9 @@ class MyVectorstore(Vectorstore): def logger(self) -> RaggyLogger: return get_logger(self.__class__.__name__) - async def __aenter__(self): + def __enter__(self): self._in_context = True return self - async def __aexit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback): self._in_context = False diff --git a/src/raggy/vectorstores/chroma.py b/src/raggy/vectorstores/chroma.py index a5f82e0..988ab54 100644 --- a/src/raggy/vectorstores/chroma.py +++ b/src/raggy/vectorstores/chroma.py @@ -1,26 +1,17 @@ -import asyncio -import re -from typing import Literal - -from raggy.utilities.collections import distinct - -try: - from chromadb import Client, CloudClient, HttpClient - from chromadb.api import ClientAPI - from chromadb.api.models.Collection import Collection - from chromadb.api.models.Collection import Document as ChromaDocument - from chromadb.api.types import QueryResult - from chromadb.utils.batch_utils import create_batches -except ImportError: - raise ImportError( - "You must have `chromadb` installed to use the Chroma vector store. " - "Install it with `pip install 'raggy[chroma]'`." - ) +from typing import Literal, Sequence + +from chromadb import Client, CloudClient, HttpClient, Include +from chromadb.api import ClientAPI +from chromadb.api.models.Collection import Collection +from chromadb.api.models.Collection import Document as ChromaDocument +from chromadb.api.types import Embedding, OneOrMany, PyEmbedding, QueryResult +from chromadb.utils.batch_utils import create_batches +from prefect.utilities.asyncutils import run_coro_as_sync from raggy.documents import Document as RaggyDocument from raggy.documents import DocumentMetadata from raggy.settings import settings -from raggy.utilities.asyncutils import run_sync_in_worker_thread +from raggy.utilities.asyncutils import run_concurrent_tasks from raggy.utilities.embeddings import create_openai_embeddings from raggy.utilities.text import slice_tokens from raggy.vectorstores.base import Vectorstore @@ -45,23 +36,7 @@ def get_client(client_type: ChromaClientType) -> ClientAPI: class Chroma(Vectorstore): - """A wrapper for chromadb.Client - used as an async context manager. - - Attributes: - client_type: The type of client to use. Must be one of "base" or "http". - collection_name: The name of the collection to use. - - Example: - Query a collection: - ```python - from raggy.vectorstores.chroma import Chroma - - async with Chroma(collection_name="my-collection") as chroma: - result = await chroma.query(query_texts=["Hello, world!"]) - print(result) - ``` - - """ + """A wrapper for chromadb.Client.""" client_type: ChromaClientType = "base" collection_name: str = "raggy" @@ -72,32 +47,29 @@ def collection(self) -> Collection: name=self.collection_name ) - async def delete( + def delete( self, ids: list[str] | None = None, where: dict | None = None, where_document: ChromaDocument | None = None, ): - await run_sync_in_worker_thread( - self.collection.delete, + self.collection.delete( ids=ids, where=where, where_document=where_document, ) - async def add(self, documents: list[RaggyDocument]) -> list[ChromaDocument]: - unique_documents = list(distinct(documents, key=lambda doc: doc.text)) - - ids = [doc.id for doc in unique_documents] - texts = [doc.text for doc in unique_documents] + def add(self, documents: Sequence[RaggyDocument]) -> list[ChromaDocument]: + ids = [doc.id for doc in documents] + texts = [doc.text for doc in documents] metadatas = [ doc.metadata.model_dump(exclude_none=True) if isinstance(doc.metadata, DocumentMetadata) else None - for doc in unique_documents + for doc in documents ] - embeddings = await create_openai_embeddings(texts) + embeddings = run_coro_as_sync(create_openai_embeddings(texts)) data = { "ids": ids, @@ -111,27 +83,28 @@ async def add(self, documents: list[RaggyDocument]) -> list[ChromaDocument]: **data, ) - await asyncio.gather( - *(asyncio.to_thread(self.collection.add, *batch) for batch in batched_data) - ) - - get_result = await asyncio.to_thread(self.collection.get, ids=ids) + for batch in batched_data: + self.collection.add(*batch) + get_result = self.collection.get(ids=ids) return get_result.get("documents") or [] - async def query( + def query( self, - query_embeddings: list[list[float]] | None = None, + query_embeddings: OneOrMany[Embedding] | OneOrMany[PyEmbedding] | None = None, query_texts: list[str] | None = None, n_results: int = 10, where: dict | None = None, where_document: dict | None = None, - include: list[str] = ["metadatas"], + include: Include = ["metadatas"], **kwargs, - ) -> "QueryResult": - return await run_sync_in_worker_thread( - self.collection.query, - query_embeddings=query_embeddings, + ) -> QueryResult: + return self.collection.query( + query_embeddings=( + run_coro_as_sync(create_openai_embeddings(query_texts)) + if query_texts and not query_embeddings + else query_embeddings + ), query_texts=query_texts, n_results=n_results, where=where, @@ -140,11 +113,10 @@ async def query( **kwargs, ) - async def count(self) -> int: - return await run_sync_in_worker_thread(self.collection.count) + def count(self) -> int: + return self.collection.count() - async def upsert(self, documents: list[RaggyDocument]) -> list[ChromaDocument]: - documents = list(distinct(documents, key=lambda doc: hash(doc.text))) + def upsert(self, documents: Sequence[RaggyDocument]) -> list[ChromaDocument]: kwargs = dict( ids=[document.id for document in documents], documents=[document.text for document in documents], @@ -154,45 +126,81 @@ async def upsert(self, documents: list[RaggyDocument]) -> list[ChromaDocument]: else None for document in documents ], - embeddings=await create_openai_embeddings( - [document.text for document in documents] + embeddings=run_coro_as_sync( + create_openai_embeddings([document.text for document in documents]) ), ) - await run_sync_in_worker_thread(self.collection.upsert, **kwargs) - - get_result = await run_sync_in_worker_thread( - self.collection.get, ids=kwargs["ids"] - ) + self.collection.upsert(**kwargs) + get_result = self.collection.get(ids=kwargs["ids"]) return get_result.get("documents") or [] - async def reset_collection(self): + def reset_collection(self): client = get_client(self.client_type) try: - await run_sync_in_worker_thread( - client.delete_collection, self.collection_name - ) + client.delete_collection(self.collection_name) except Exception: self.logger.warning_kv( "Collection not found", f"Creating a new collection {self.collection_name!r}", ) - await run_sync_in_worker_thread(client.create_collection, self.collection_name) + client.create_collection(self.collection_name) def ok(self) -> bool: try: version = get_client(self.client_type).get_version() - except Exception as e: - self.logger.error_kv("Connection error", f"Cannot connect to Chroma: {e}") - if re.match(r"^\d+\.\d+\.\d+$", version): self.logger.debug_kv("OK", f"Connected to Chroma {version!r}") return True - return False + except Exception as e: + self.logger.error_kv("Connection error", f"Cannot connect to Chroma: {e}") + return False + + async def upsert_batched( + self, + documents: Sequence[RaggyDocument], + batch_size: int = 100, + max_concurrent: int = 8, + ): + """Upsert documents in batches concurrently.""" + document_list = list(documents) + batches = [ + document_list[i : i + batch_size] + for i in range(0, len(document_list), batch_size) + ] + + # Create tasks that will run concurrently + tasks = [] + for i, batch in enumerate(batches): + async def _upsert(b=batch, n=i): + # Get embeddings for the entire batch at once + texts = [doc.text for doc in b] + embeddings = await create_openai_embeddings(texts) -async def query_collection( + # Prepare the batch data + kwargs = dict( + ids=[doc.id for doc in b], + documents=texts, + metadatas=[ + doc.metadata.model_dump(exclude_none=True) + if isinstance(doc.metadata, DocumentMetadata) + else None + for doc in b + ], + embeddings=embeddings, + ) + + # Do the upsert + self.collection.upsert(**kwargs) + print(f"Upserted batch {n + 1}/{len(batches)} ({len(b)} documents)") + + tasks.append(_upsert) + + await run_concurrent_tasks(tasks, max_concurrent=max_concurrent) + + +def query_collection( query_text: str, - query_embedding: list[float] | None = None, collection_name: str = "raggy", top_k: int = 10, where: dict | None = None, @@ -200,29 +208,10 @@ async def query_collection( max_tokens: int = 500, client_type: ChromaClientType = "base", ) -> str: - """Query a Chroma collection. - - Args: - query_text: The text to query for. - filters: Filters to apply to the query. - collection: The collection to query. - top_k: The number of results to return. - - Example: - Basic query of a collection: - ```python - from raggy.vectorstores.chroma import query_collection - - print(await query_collection("How to create a flow in Prefect?")) - ``` - """ - async with Chroma( - collection_name=collection_name, client_type=client_type - ) as chroma: - query_embedding = query_embedding or await create_openai_embeddings(query_text) - - query_result = await chroma.query( - query_embeddings=[query_embedding], + """Query a Chroma collection.""" + with Chroma(collection_name=collection_name, client_type=client_type) as chroma: + query_result = chroma.query( + query_texts=[query_text], n_results=top_k, where=where, where_document=where_document, @@ -232,5 +221,4 @@ async def query_collection( assert ( result := query_result.get("documents") ) is not None, "No documents found" - return slice_tokens("\n".join(result[0]), max_tokens) diff --git a/src/raggy/vectorstores/tpuf.py b/src/raggy/vectorstores/tpuf.py index 6160ec2..09c4b3f 100644 --- a/src/raggy/vectorstores/tpuf.py +++ b/src/raggy/vectorstores/tpuf.py @@ -1,17 +1,13 @@ -import asyncio -from typing import Iterable +from typing import Awaitable, Callable, Sequence import turbopuffer as tpuf -from pydantic import ( - Field, - SecretStr, - model_validator, -) +from prefect.utilities.asyncutils import run_coro_as_sync +from pydantic import Field, SecretStr, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict from turbopuffer.vectors import VectorResult from raggy.documents import Document -from raggy.utilities.asyncutils import run_concurrent_tasks, run_sync_in_worker_thread +from raggy.utilities.asyncutils import run_concurrent_tasks from raggy.utilities.embeddings import create_openai_embeddings from raggy.utilities.text import slice_tokens from raggy.vectorstores.base import Vectorstore @@ -43,7 +39,7 @@ def set_api_key(self): class TurboPuffer(Vectorstore): - """Wrapper for turbopuffer.Namespace as an async context manager. + """Wrapper for turbopuffer.Namespace as a context manager. Attributes: namespace: The namespace to use for the TurboPuffer instance. @@ -54,16 +50,16 @@ class TurboPuffer(Vectorstore): from raggy.documents import Document from raggy.vectorstores.tpuf import TurboPuffer - async with TurboPuffer() as tpuf: # default namespace is "raggy" - await tpuf.upsert(documents=[Document(id="1", text="Hello, world!")]) + with TurboPuffer() as tpuf: # default namespace is "raggy" + tpuf.upsert(documents=[Document(id="1", text="Hello, world!")]) ``` Query a namespace: ```python from raggy.vectorstores.tpuf import TurboPuffer - async with TurboPuffer() as tpuf: - result = await tpuf.query(text="Hello, world!") + with TurboPuffer() as tpuf: + result = tpuf.query(text="Hello, world!") print(result) ``` """ @@ -74,35 +70,37 @@ class TurboPuffer(Vectorstore): def ns(self): return tpuf.Namespace(self.namespace) - async def upsert( + def upsert( self, - documents: Iterable[Document] | None = None, + documents: Sequence[Document] | None = None, ids: list[str] | list[int] | None = None, vectors: list[list[float]] | None = None, attributes: dict | None = None, ): attributes = attributes or {} + _vectors = vectors or [] if documents is None and vectors is None: raise ValueError("Either `documents` or `vectors` must be provided.") if documents: ids = [document.id for document in documents] - vectors = await create_openai_embeddings( - [document.text for document in documents] - ) # type: ignore - if not isinstance(vectors[0], list): # type: ignore - vectors = [vectors] # type: ignore + _vectors = run_coro_as_sync( + create_openai_embeddings([document.text for document in documents]) + ) + assert _vectors is not None + if not isinstance(_vectors[0], list): + _vectors = [_vectors] if attributes.get("text"): raise ValueError( "The `text` attribute is reserved and cannot be used as a custom attribute." ) attributes |= {"text": [document.text for document in documents]} - await run_sync_in_worker_thread( - self.ns.upsert, ids=ids, vectors=vectors, attributes=attributes - ) - async def query( + assert ids is not None, "ids cannot be none" + self.ns.upsert(ids=ids, vectors=_vectors, attributes=attributes) # type: ignore + + def query( self, text: str | None = None, vector: list[float] | None = None, @@ -113,13 +111,11 @@ async def query( include_vectors: bool = False, ) -> VectorResult: if text: - vector = await create_openai_embeddings(text) - else: - if vector is None: - raise ValueError("Either `text` or `vector` must be provided.") + vector = run_coro_as_sync(create_openai_embeddings(text)) + elif vector is None: + raise ValueError("Either `text` or `vector` must be provided.") - return await run_sync_in_worker_thread( - self.ns.query, + return self.ns.query( vector=vector, top_k=top_k, distance_metric=distance_metric, @@ -128,21 +124,21 @@ async def query( include_vectors=include_vectors, ) - async def delete(self, ids: str | int | list[str] | list[int]): - await run_sync_in_worker_thread(self.ns.delete, ids=ids) + def delete(self, ids: str | int | list[str] | list[int]): + self.ns.delete(ids=ids) - async def reset(self): + def reset(self): try: - await run_sync_in_worker_thread(self.ns.delete_all) + self.ns.delete_all() except tpuf.APIError as e: if e.status_code == 404: self.logger.debug_kv("404", "Namespace already empty.") else: raise - async def ok(self) -> bool: + def ok(self) -> bool: try: - return await run_sync_in_worker_thread(self.ns.exists) + return self.ns.exists() except tpuf.APIError as e: if e.status_code == 404: self.logger.debug_kv("404", "Namespace does not exist.") @@ -151,14 +147,14 @@ async def ok(self) -> bool: async def upsert_batched( self, - documents: Iterable[Document], + documents: Sequence[Document], batch_size: int = 100, - max_concurrent: int = 25, + max_concurrent: int = 8, ): """Upsert documents in batches concurrently. Args: - documents: Iterable of documents to upsert + documents: Sequence of documents to upsert batch_size: Maximum number of documents per batch max_concurrent: Maximum number of concurrent upsert operations """ @@ -168,83 +164,62 @@ async def upsert_batched( for i in range(0, len(document_list), batch_size) ] - async def process_batch(batch: list[Document], batch_num: int): - await self.upsert(documents=batch) - print( - f"Upserted batch {batch_num + 1}/{len(batches)} ({len(batch)} documents)" - ) + # Create tasks that will run concurrently + tasks: list[Callable[[], Awaitable[None]]] = [] + for i, batch in enumerate(batches): - tasks = [ - lambda b=batch, i=i: process_batch(b, i) for i, batch in enumerate(batches) - ] + async def _upsert(b=batch, n=i): + texts = [doc.text for doc in b] + embeddings = await create_openai_embeddings(texts) + + self.ns.upsert( + ids=[doc.id for doc in b], + vectors=embeddings, + attributes={"text": texts}, + ) + print(f"Upserted batch {n + 1}/{len(batches)} ({len(b)} documents)") + + tasks.append(_upsert) await run_concurrent_tasks(tasks, max_concurrent=max_concurrent) -async def query_namespace( +def query_namespace( query_text: str, filters: dict | None = None, namespace: str = "raggy", top_k: int = 10, max_tokens: int = 500, ) -> str: - """Query a TurboPuffer namespace. - - Args: - query_text: The text to query for. - filters: Filters to apply to the query. - namespace: The namespace to query. - top_k: The number of results to return. - - Examples: - Basic Usage of `query_namespace` - ```python - from raggy.vectorstores.tpuf import query_namespace - - print(await query_namespace("How to create a flow in Prefect?")) - ``` - - Using `filters` with `query_namespace` - ```python - from raggy.vectorstores.tpuf import query_namespace - - filters={ - 'id': [['In', [1, 2, 3]]], - 'key1': [['Eq', 'one']], - 'filename': [['Or', [['Glob', '**.md'], ['Glob', '**.py']]], ['NotGlob', '/migrations/**']] - } - - print(await query_namespace("How to create a flow in Prefect?", filters=filters)) - ``` - """ - async with TurboPuffer(namespace=namespace) as tpuf: - vector_result = await tpuf.query( + """Query a TurboPuffer namespace.""" + with TurboPuffer(namespace=namespace) as tpuf: + vector_result = tpuf.query( text=query_text, filters=filters, top_k=top_k, ) + assert vector_result.data is not None, "No data found" concatenated_result = "\n".join( - row.attributes["text"] for row in vector_result.data + row.attributes["text"] # type: ignore + for row in vector_result.data ) return slice_tokens(concatenated_result, max_tokens) -async def multi_query_tpuf( +def multi_query_tpuf( queries: list[str], n_results: int = 3, namespace: str = "raggy" ) -> str: """searches a Turbopuffer namespace for the given queries""" - results = await asyncio.gather( - *[ - query_namespace( - query, - namespace=namespace, - top_k=n_results, - max_tokens=800 // len(queries), - ) - for query in queries - ] - ) + results = [ + query_namespace( + query, + namespace=namespace, + top_k=n_results, + max_tokens=800 // len(queries), + ) + for query in queries + ] return "\n\n".join(results)