diff --git a/README.md b/README.md index 69e4679..87c411e 100644 --- a/README.md +++ b/README.md @@ -1,35 +1,42 @@ -# Async database transactions and FastAPI +# DBHub -This repo aims to provide working code and reproducible setups for asynchronous data ingestion and querying from numerous databases via Python. Wherever possible, data is ingested to a database via their supported async Python drivers, and the data is also queried in async fashion on top of FastAPI endpoints. +## Boilerplate for async ingestion and querying of DBs -Example code is provided for numerous databases, along with FastAPI docker deployments that allow you to easily supply complex query results to downstream applications. +This repo aims to provide working code and reproducible setups for bulk data ingestion and querying from numerous databases via their Python clients. Wherever possible, async database client APIs are utilized for data ingestion. The query interface to the data is exposed via async FastAPI endpoints. To enable reproducibility across environments, Dockerfiles are provided as well. -#### Currently implemented +The `docker-compose.yml` does the following: +1. Set up a local DB server in a container +2. Set up local volume mounts to persist the data +3. Set up a FastAPI server in another container +4. Set up a network bridge such that the DB server can be accessed from the FastAPI server +5. Tear down all the containers once development and testing is complete + +### Currently implemented * Neo4j * Elasticsearch * Meilisearch * Qdrant * Weaviate -#### 🚧 Coming soon +### 🚧 In the pipeline * LanceDB * SurrealDB +* MongoDB + ## Goals -The primary aim is to compare the data ingestion and query performance of various databases that can be used for a host of downstream use cases. Two use cases are of particular interest: +The main goals of this repo are explained as follows. -1. We may want to expose (potentially sensitive) data to downstream client applications, so building an API on top of the database can be a very useful tool to share the data in a controlled manner +1. **Ease of setup**: There are tons of databases and client APIs out there, so it's useful to have a clean, efficient and reproducible workflow to experiment with a range of datasets, as well as databases for the problem at hand. -2. Databases or data stores in general can be important "sources of truth" for contextual querying via LLMs like ChatGPT, allowing us to ground our model's results with factual data. APIs allow us to add another layer to simplify querying a host of backends in a way that doesn't rely on the LLM learning a specific query language. +2. **Ease of distribution**: We may want to expose (potentially sensitive) data to downstream client applications, so building an API on top of the database can be a very useful tool to share the data in a controlled manner -In general, it's useful to have a clean, efficient and reproducible workflow to experiment with each database in question. +3. **Ease of testing advanced use cases**: Search databases (either full-text keyword search or vector DBs) can be important "sources of truth" for contextual querying via LLMs like ChatGPT, allowing us to ground our model's results with factual data. ## Pre-requisites -Install Docker and the latest version of Python (3.11+), as recent syntactic improvements in Python are extensively utilized in the code provided. - -## About the dataset - -The [dataset provided](https://github.com/prrao87/async-db-fastapi/tree/main/data) in this repo is a formatted version of the version obtained from Kaggle datasets. Full credit is due to [the original author](https://www.kaggle.com/zynicide) via Kaggle for curating this dataset. +* Python 3.10+ +* Docker +* A passion to learn more about and experiment with databases! diff --git a/dbs/elasticsearch/.env.example b/dbs/elasticsearch/.env.example index d457b50..5a11bce 100644 --- a/dbs/elasticsearch/.env.example +++ b/dbs/elasticsearch/.env.example @@ -10,7 +10,7 @@ ELASTIC_SERVICE = "elasticsearch" API_PORT = 8002 # Container image tag -TAG = "0.1.0" +TAG = "0.2.0" # Docker project namespace (defaults to the current folder name if not set) COMPOSE_PROJECT_NAME = elastic_wine \ No newline at end of file diff --git a/dbs/elasticsearch/Dockerfile b/dbs/elasticsearch/Dockerfile index 002a9a1..96be2d7 100644 --- a/dbs/elasticsearch/Dockerfile +++ b/dbs/elasticsearch/Dockerfile @@ -7,6 +7,5 @@ COPY ./requirements.txt /wine/requirements.txt RUN pip install --no-cache-dir --upgrade -r /wine/requirements.txt COPY ./api /wine/api -COPY ./schemas /wine/schemas EXPOSE 8000 \ No newline at end of file diff --git a/dbs/elasticsearch/api/config.py b/dbs/elasticsearch/api/config.py index 78fd3e8..d40bc03 100644 --- a/dbs/elasticsearch/api/config.py +++ b/dbs/elasticsearch/api/config.py @@ -1,7 +1,12 @@ -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + extra="allow", + ) + elastic_service: str elastic_user: str elastic_password: str @@ -9,6 +14,3 @@ class Settings(BaseSettings): elastic_port: int elastic_index_alias: str tag: str - - class Config: - env_file = ".env" diff --git a/dbs/elasticsearch/api/routers/rest.py b/dbs/elasticsearch/api/routers/rest.py index fcc35d1..97e8d6b 100644 --- a/dbs/elasticsearch/api/routers/rest.py +++ b/dbs/elasticsearch/api/routers/rest.py @@ -1,6 +1,7 @@ from elasticsearch import AsyncElasticsearch from fastapi import APIRouter, HTTPException, Query, Request -from schemas.retriever import ( + +from api.schemas.rest import ( CountByCountry, FullTextSearch, TopWinesByCountry, diff --git a/dbs/elasticsearch/schemas/__init__.py b/dbs/elasticsearch/api/schemas/__init__.py similarity index 100% rename from dbs/elasticsearch/schemas/__init__.py rename to dbs/elasticsearch/api/schemas/__init__.py diff --git a/dbs/elasticsearch/schemas/retriever.py b/dbs/elasticsearch/api/schemas/rest.py similarity index 88% rename from dbs/elasticsearch/schemas/retriever.py rename to dbs/elasticsearch/api/schemas/rest.py index 2b12ef4..ad4f6ef 100644 --- a/dbs/elasticsearch/schemas/retriever.py +++ b/dbs/elasticsearch/api/schemas/rest.py @@ -1,18 +1,9 @@ -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class FullTextSearch(BaseModel): - id: int - country: str - title: str - description: str | None - points: int - price: float | str | None - variety: str | None - winery: str | None - - class Config: - schema_extra = { + model_config = ConfigDict( + json_schema_extra={ "example": { "id": 3845, "country": "Italy", @@ -24,6 +15,16 @@ class Config: "winery": "Castellinuzza e Piuca", } } + ) + + id: int + country: str + title: str + description: str | None + points: int + price: float | str | None + variety: str | None + winery: str | None class TopWinesByCountry(BaseModel): @@ -36,9 +37,6 @@ class TopWinesByCountry(BaseModel): variety: str | None winery: str | None - class Config: - validate_assignment = True - class TopWinesByProvince(BaseModel): id: int @@ -51,9 +49,6 @@ class TopWinesByProvince(BaseModel): variety: str | None winery: str | None - class Config: - validate_assignment = True - class MostWinesByVariety(BaseModel): country: str diff --git a/dbs/elasticsearch/requirements.txt b/dbs/elasticsearch/requirements.txt index 1c31bed..d9c251b 100644 --- a/dbs/elasticsearch/requirements.txt +++ b/dbs/elasticsearch/requirements.txt @@ -1,6 +1,8 @@ -elasticsearch>=8.7.0 -pydantic[dotenv]>=1.10.7, <2.0.0 -fastapi>=0.95.0, <1.0.0 +elasticsearch~=8.7.0 +pydantic~=2.0.0 +pydantic-settings~=2.0.0 +python-dotenv>=1.0.0 +fastapi~=0.100.0 httpx>=0.24.0 aiohttp>=3.8.4 uvicorn>=0.21.0, <1.0.0 diff --git a/dbs/elasticsearch/scripts/bulk_index.py b/dbs/elasticsearch/scripts/bulk_index.py index 8f7fb75..7e8d31c 100644 --- a/dbs/elasticsearch/scripts/bulk_index.py +++ b/dbs/elasticsearch/scripts/bulk_index.py @@ -11,11 +11,10 @@ import srsly from dotenv import load_dotenv from elasticsearch import AsyncElasticsearch, helpers -from pydantic.main import ModelMetaclass +from schemas.wine import Wine sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) from api.config import Settings -from schemas.wine import Wine load_dotenv() # Custom types @@ -59,15 +58,14 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: def validate( data: tuple[JsonBlob], - model: ModelMetaclass, exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).dict(exclude_none=exclude_none) for item in data] return validated_data def process_chunks(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]: - validated_data = validate(data, Wine, exclude_none=True) + validated_data = validate(data, exclude_none=True) return validated_data diff --git a/dbs/neo4j/schemas/__init__.py b/dbs/elasticsearch/scripts/schemas/__init__.py similarity index 100% rename from dbs/neo4j/schemas/__init__.py rename to dbs/elasticsearch/scripts/schemas/__init__.py diff --git a/dbs/elasticsearch/scripts/schemas/wine.py b/dbs/elasticsearch/scripts/schemas/wine.py new file mode 100644 index 0000000..29c4dd0 --- /dev/null +++ b/dbs/elasticsearch/scripts/schemas/wine.py @@ -0,0 +1,80 @@ +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class Wine(BaseModel): + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + extra="allow", + str_strip_whitespace=True, + json_schema_extra={ + "example": { + "id": 45100, + "points": 85, + "title": "Balduzzi 2012 Reserva Merlot (Maule Valley)", + "description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.", + "price": 10.0, + "variety": "Merlot", + "winery": "Balduzzi", + "vineyard": "Reserva", + "country": "Chile", + "province": "Maule Valley", + "region_1": "null", + "region_2": "null", + "taster_name": "Michael Schachner", + "taster_twitter_handle": "@wineschach", + } + }, + ) + + id: int + points: int + title: str + description: str | None + price: float | None + variety: str | None + winery: str | None + vineyard: str | None = Field(..., alias="designation") + country: str | None + province: str | None + region_1: str | None + region_2: str | None + taster_name: str | None + taster_twitter_handle: str | None + + @model_validator(mode="before") + def _fill_country_unknowns(cls, values): + "Fill in missing country values with 'Unknown', as we always want this field to be queryable" + country = values.get("country") + if country is None or country == "null": + values["country"] = "Unknown" + return values + + @model_validator(mode="before") + def _create_id(cls, values): + "Create an _id field because Elastic needs this to store as primary key" + values["_id"] = values["id"] + return values + + +if __name__ == "__main__": + data = { + "id": 45100, + "points": 85, + "title": "Balduzzi 2012 Reserva Merlot (Maule Valley)", + "description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.", + "price": 10, # Test if field is cast to float + "variety": "Merlot", + "winery": "Balduzzi", + "designation": "Reserva", # Test if field is renamed + "country": "null", # Test unknown country + "province": " Maule Valley ", # Test if field is stripped + "region_1": "null", + "region_2": "null", + "taster_name": "Michael Schachner", + "taster_twitter_handle": "@wineschach", + } + from pprint import pprint + + wine = Wine(**data) + pprint(wine.model_dump(), sort_dicts=False) diff --git a/dbs/meilisearch/.env.example b/dbs/meilisearch/.env.example index 3d90dc5..fcc5ec6 100644 --- a/dbs/meilisearch/.env.example +++ b/dbs/meilisearch/.env.example @@ -1,13 +1,13 @@ # Master key must be at least 16 bytes, composed of valid UTF-8 characters MEILI_MASTER_KEY = "" -MEILI_VERSION = "v1.1.1" +MEILI_VERSION = "v1.2.0" MEILI_PORT = 7700 MEILI_URL = "localhost" MEILI_SERVICE = "meilisearch" API_PORT = 8003 # Container image tag -TAG = "0.1.0" +TAG = "0.2.0" # Docker project namespace (defaults to the current folder name if not set) COMPOSE_PROJECT_NAME = meili_wine \ No newline at end of file diff --git a/dbs/meilisearch/Dockerfile b/dbs/meilisearch/Dockerfile index 002a9a1..96be2d7 100644 --- a/dbs/meilisearch/Dockerfile +++ b/dbs/meilisearch/Dockerfile @@ -7,6 +7,5 @@ COPY ./requirements.txt /wine/requirements.txt RUN pip install --no-cache-dir --upgrade -r /wine/requirements.txt COPY ./api /wine/api -COPY ./schemas /wine/schemas EXPOSE 8000 \ No newline at end of file diff --git a/dbs/meilisearch/api/config.py b/dbs/meilisearch/api/config.py index 4fede23..ed574ec 100644 --- a/dbs/meilisearch/api/config.py +++ b/dbs/meilisearch/api/config.py @@ -1,12 +1,14 @@ -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + extra="allow", + ) + meili_service: str meili_master_key: str meili_port: int meili_url: str tag: str - - class Config: - env_file = ".env" diff --git a/dbs/meilisearch/api/main.py b/dbs/meilisearch/api/main.py index 66b81d7..1a6ee2d 100644 --- a/dbs/meilisearch/api/main.py +++ b/dbs/meilisearch/api/main.py @@ -29,6 +29,7 @@ async def get_search_api_key(settings) -> str: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Search for wines by keyword phrase using Meilisearch settings = get_settings() + print(settings) search_key = await get_search_api_key(settings) URI = f"http://{settings.meili_service}:{settings.meili_port}" async with Client(URI, search_key) as client: diff --git a/dbs/meilisearch/api/routers/rest.py b/dbs/meilisearch/api/routers/rest.py index 2a566d4..168f0e2 100644 --- a/dbs/meilisearch/api/routers/rest.py +++ b/dbs/meilisearch/api/routers/rest.py @@ -1,6 +1,7 @@ -from meilisearch_python_async import Client from fastapi import APIRouter, HTTPException, Query, Request -from schemas.retriever import ( +from meilisearch_python_async import Client + +from api.schemas.rest import ( FullTextSearch, TopWinesByCountry, TopWinesByProvince, @@ -102,6 +103,7 @@ async def _top_by_country( sort=["points:desc", "price:asc"], ) if response: + print(response.hits) return response.hits return None diff --git a/dbs/meilisearch/api/schemas/__init__.py b/dbs/meilisearch/api/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbs/meilisearch/schemas/retriever.py b/dbs/meilisearch/api/schemas/rest.py similarity index 84% rename from dbs/meilisearch/schemas/retriever.py rename to dbs/meilisearch/api/schemas/rest.py index 9244a1a..94e0496 100644 --- a/dbs/meilisearch/schemas/retriever.py +++ b/dbs/meilisearch/api/schemas/rest.py @@ -1,18 +1,9 @@ -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class FullTextSearch(BaseModel): - id: int - country: str - title: str - description: str | None - points: int - price: float | str | None - variety: str | None - winery: str | None - - class Config: - schema_extra = { + model_config = ConfigDict( + json_schema_extra={ "example": { "id": 3845, "country": "Italy", @@ -24,9 +15,23 @@ class Config: "winery": "Castellinuzza e Piuca", } } + ) + + id: int + country: str + title: str + description: str | None + points: int + price: float | str | None + variety: str | None + winery: str | None class TopWinesByCountry(BaseModel): + model_config = ConfigDict( + validate_assignment=True, + ) + id: int country: str title: str @@ -36,11 +41,12 @@ class TopWinesByCountry(BaseModel): variety: str | None winery: str | None - class Config: - validate_assignment = True - class TopWinesByProvince(BaseModel): + model_config = ConfigDict( + validate_assignment=True, + ) + id: int country: str province: str @@ -50,6 +56,3 @@ class TopWinesByProvince(BaseModel): price: float | str | None = "Not available" variety: str | None winery: str | None - - class Config: - validate_assignment = True diff --git a/dbs/meilisearch/requirements.txt b/dbs/meilisearch/requirements.txt index 832bb52..491d474 100644 --- a/dbs/meilisearch/requirements.txt +++ b/dbs/meilisearch/requirements.txt @@ -1,6 +1,9 @@ -meilisearch-python-async==1.2.0 -pydantic[dotenv]>=1.10.7, <2.0.0 -fastapi>=0.95.0, <1.0.0 +meilisearch-python-async~=1.4.0 +meilisearch~=0.28.0 +pydantic~=2.0.0 +pydantic-settings~=2.0.0 +python-dotenv>=1.0.0 +fastapi~=0.100.0 httpx>=0.24.0 aiohttp>=3.8.4 uvicorn>=0.21.0, <1.0.0 diff --git a/dbs/meilisearch/schemas/wine.py b/dbs/meilisearch/schemas/wine.py index 92924df..708363c 100644 --- a/dbs/meilisearch/schemas/wine.py +++ b/dbs/meilisearch/schemas/wine.py @@ -1,26 +1,13 @@ -from pydantic import BaseModel, root_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator class Wine(BaseModel): - id: int - points: int - title: str - description: str | None - price: float | None - variety: str | None - winery: str | None - vineyard: str | None - country: str | None - province: str | None - region_1: str | None - region_2: str | None - taster_name: str | None - taster_twitter_handle: str | None - - class Config: - allow_population_by_field_name = True - validate_assignment = True - schema_extra = { + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + extra="allow", + str_strip_whitespace=True, + json_schema_extra={ "example": { "id": 45100, "points": 85, @@ -37,20 +24,51 @@ class Config: "taster_name": "Michael Schachner", "taster_twitter_handle": "@wineschach", } - } + }, + ) - @root_validator(pre=True) - def _get_vineyard(cls, values): - "Rename designation to vineyard" - vineyard = values.pop("designation", None) - if vineyard: - values["vineyard"] = vineyard.strip() - return values + id: int + points: int + title: str + description: str | None + price: float | None + variety: str | None + winery: str | None + vineyard: str | None = Field(..., alias="designation") + country: str | None + province: str | None + region_1: str | None + region_2: str | None + taster_name: str | None + taster_twitter_handle: str | None - @root_validator + @model_validator(mode="before") def _fill_country_unknowns(cls, values): "Fill in missing country values with 'Unknown', as we always want this field to be queryable" country = values.get("country") - if not country: + if country is None or country == "null": values["country"] = "Unknown" return values + + +if __name__ == "__main__": + data = { + "id": 45100, + "points": 85, + "title": "Balduzzi 2012 Reserva Merlot (Maule Valley)", + "description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.", + "price": 10, # Test if field is cast to float + "variety": "Merlot", + "winery": "Balduzzi", + "designation": "Reserva", # Test if field is renamed + "country": "null", # Test unknown country + "province": " Maule Valley ", # Test if field is stripped + "region_1": "null", + "region_2": "null", + "taster_name": "Michael Schachner", + "taster_twitter_handle": "@wineschach", + } + from pprint import pprint + + wine = Wine(**data) + pprint(wine.model_dump(), sort_dicts=False) diff --git a/dbs/meilisearch/scripts/bulk_index.py b/dbs/meilisearch/scripts/bulk_index.py deleted file mode 100644 index 31b8b3f..0000000 --- a/dbs/meilisearch/scripts/bulk_index.py +++ /dev/null @@ -1,146 +0,0 @@ -from __future__ import annotations - -import argparse -import asyncio -import os -import sys -from concurrent.futures import ProcessPoolExecutor -from functools import lru_cache, partial -from pathlib import Path -from typing import Any, Iterator - -import srsly -from dotenv import load_dotenv -from meilisearch_python_async import Client -from meilisearch_python_async.index import Index -from meilisearch_python_async.models.settings import MeilisearchSettings -from pydantic.main import ModelMetaclass - -sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) -from api.config import Settings -from schemas.wine import Wine - -load_dotenv() -# Custom types -JsonBlob = dict[str, Any] - - -class FileNotFoundError(Exception): - pass - - -# --- Blocking functions --- - - -@lru_cache() -def get_settings(): - # Use lru_cache to avoid loading .env file for every request - return Settings() - - -def chunk_iterable(item_list: list[JsonBlob], chunksize: int) -> Iterator[tuple[JsonBlob, ...]]: - """ - Break a large iterable into an iterable of smaller iterables of size `chunksize` - """ - for i in range(0, len(item_list), chunksize): - yield tuple(item_list[i : i + chunksize]) - - -def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: - """Get all line-delimited json files (.jsonl) from a directory with a given prefix""" - file_path = data_dir / filename - if not file_path.is_file(): - # File may not have been uncompressed yet so try to do that first - data = srsly.read_gzip_jsonl(file_path) - # This time if it isn't there it really doesn't exist - if not file_path.is_file(): - raise FileNotFoundError(f"No valid .jsonl file found in `{data_dir}`") - else: - data = srsly.read_gzip_jsonl(file_path) - return data - - -def validate( - data: list[JsonBlob], - model: ModelMetaclass, - exclude_none: bool = False, -) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] - return validated_data - - -def process_chunks(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]: - validated_data = validate(data, Wine, exclude_none=True) - return validated_data - - -def get_meili_settings(filename: str) -> MeilisearchSettings: - settings = dict(srsly.read_json(filename)) - # Convert to MeilisearchSettings pydantic model object - settings = MeilisearchSettings(**settings) - return settings - - -# --- Async functions --- - - -async def update_documents_to_index(index: Index, primary_key: str, data: list[JsonBlob]) -> None: - ids = [item[primary_key] for item in data] - await index.update_documents(data, primary_key) - print(f"Processed ids in range {min(ids)}-{max(ids)}") - - -async def main(data: list[JsonBlob], meili_settings: MeilisearchSettings) -> None: - settings = Settings() - URI = f"http://{settings.meili_url}:{settings.meili_port}" - MASTER_KEY = settings.meili_master_key - index_name = "wines" - primary_key = "id" - async with Client(URI, MASTER_KEY) as client: - # Create index - index = client.index(index_name) - # Update settings - await client.index(index_name).update_settings(meili_settings) - print("Finished updating database index settings") - - # Process multiple chunks of data in a process pool to avoid blocking the event loop - print("Processing chunks") - chunked_data = chunk_iterable(data, CHUNKSIZE) - - with ProcessPoolExecutor() as pool: - loop = asyncio.get_running_loop() - executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data] - awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks] - # Attach process pool to running event loop so that we can process multiple chunks in parallel - validated_data = await asyncio.gather(*awaitables) - tasks = [update_documents_to_index(index, primary_key, data) for data in validated_data] - try: - await asyncio.gather(*tasks) - print("Finished execution!") - except Exception as e: - print(f"{e}: Error while indexing to db") - - -if __name__ == "__main__": - # fmt: off - parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data") - parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes") - parser.add_argument("--chunksize", type=int, default=10_000, help="Size of each chunk to break the dataset into before processing") - parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use") - args = vars(parser.parse_args()) - # fmt: on - - LIMIT = args["limit"] - DATA_DIR = Path(__file__).parents[3] / "data" - FILENAME = args["filename"] - CHUNKSIZE = args["chunksize"] - - data = list(get_json_data(DATA_DIR, FILENAME)) - if LIMIT > 0: - data = data[:LIMIT] - - meili_settings = get_meili_settings(filename="settings/settings.json") - - # Run main async event loop - if data: - asyncio.run(main(data, meili_settings)) diff --git a/dbs/meilisearch/scripts/bulk_index_async.py b/dbs/meilisearch/scripts/bulk_index_async.py new file mode 100644 index 0000000..0388d14 --- /dev/null +++ b/dbs/meilisearch/scripts/bulk_index_async.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from functools import lru_cache +from pathlib import Path +from typing import Any, Iterator + +import srsly +from codetiming import Timer +from dotenv import load_dotenv +from meilisearch_python_async import Client +from meilisearch_python_async.index import Index +from meilisearch_python_async.models.settings import MeilisearchSettings +from tqdm import tqdm +from tqdm.asyncio import tqdm_asyncio + +sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) +from api.config import Settings +from schemas.wine import Wine + +load_dotenv() +# Custom types +JsonBlob = dict[str, Any] + + +class FileNotFoundError(Exception): + pass + + +# --- Blocking functions --- + + +@lru_cache() +def get_settings(): + # Use lru_cache to avoid loading .env file for every request + return Settings() + + +def chunk_files(item_list: list[Any], file_chunksize: int) -> Iterator[tuple[JsonBlob, ...]]: + """ + Break a large list of files into a list of lists of files, where each inner list is of size `file_chunksize` + """ + for i in range(0, len(item_list), file_chunksize): + yield tuple(item_list[i : i + file_chunksize]) + + +def get_json_data(file_path: Path) -> list[JsonBlob]: + """Get all line-delimited json files (.jsonl) from a directory with a given prefix""" + if not file_path.is_file(): + # File may not have been uncompressed yet so try to do that first + data = srsly.read_gzip_jsonl(file_path) + # This time if it isn't there it really doesn't exist + if not file_path.is_file(): + raise FileNotFoundError( + f"`{file_path}` doesn't contain a valid `.jsonl.gz` file - check and try again." + ) + else: + data = srsly.read_gzip_jsonl(file_path) + return data + + +def validate( + data: list[JsonBlob], + exclude_none: bool = True, +) -> list[JsonBlob]: + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] + return validated_data + + +def get_meili_settings(filename: str) -> MeilisearchSettings: + settings = dict(srsly.read_json(filename)) + # Convert to MeilisearchSettings pydantic model object + settings = MeilisearchSettings(**settings) + return settings + + +# --- Async functions --- + + +async def update_documents(filepath: Path, index: Index, primary_key: str, batch_size: int): + data = list(get_json_data(filepath)) + if LIMIT > 0: + data = data[:LIMIT] + validated_data = validate(data) + await index.update_documents_in_batches( + validated_data, + batch_size=batch_size, + primary_key=primary_key, + ) + + +async def main(data_files: list[Path]) -> None: + meili_settings = get_meili_settings(filename="settings/settings.json") + config = Settings() + URI = f"http://{config.meili_url}:{config.meili_port}" + MASTER_KEY = config.meili_master_key + index_name = "wines" + primary_key = "id" + async with Client(URI, MASTER_KEY) as client: + with Timer(name="Bulk Index", text="Bulk index took {:.4f} seconds"): + # Create index + index = client.index(index_name) + # Update settings + await client.index(index_name).update_settings(meili_settings) + print("Finished updating database index settings") + file_chunks = chunk_files(data_files, file_chunksize=FILE_CHUNKSIZE) + for chunk in tqdm( + file_chunks, desc="Handling file chunks", total=len(data_files) // FILE_CHUNKSIZE + ): + try: + tasks = [ + # Update index + update_documents( + filepath, + index, + primary_key=primary_key, + batch_size=BATCHSIZE, + ) + # In a real case we'd be iterating through a list of files + # For this example, it's just looping through the same file N times + for filepath in chunk + ] + await tqdm_asyncio.gather(*tasks) + except Exception as e: + print(f"{e}: Error while indexing to db") + print(f"Finished running benchmarks") + + +if __name__ == "__main__": + # fmt: off + parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data") + parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes") + parser.add_argument("--batchsize", "-b", type=int, default=10_000, help="Size of each batch to break the dataset into before ingesting") + parser.add_argument("--file_chunksize", "-c", type=int, default=5, help="Size of file chunk that will be concurrently processed and passed to the client in batches") + parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use") + parser.add_argument("--benchmark_num", "-n", type=int, default=1, help="Run a benchmark of the script N times") + args = vars(parser.parse_args()) + # fmt: on + + LIMIT = args["limit"] + DATA_DIR = Path(__file__).parents[3] / "data" + FILENAME = args["filename"] + BATCHSIZE = args["batchsize"] + BENCHMARK_NUM = args["benchmark_num"] + FILE_CHUNKSIZE = args["file_chunksize"] + + # Get a list of all files in the data directory + data_files = [f for f in DATA_DIR.glob("*.jsonl.gz") if f.is_file()] + # For benchmarking, we want to run on the same data multiple times (in the real world this would be many different files) + benchmark_data_files = data_files * BENCHMARK_NUM + + meili_settings = get_meili_settings(filename="settings/settings.json") + + # Run main async event loop + asyncio.run(main(benchmark_data_files)) diff --git a/dbs/meilisearch/scripts/bulk_index_sync.py b/dbs/meilisearch/scripts/bulk_index_sync.py new file mode 100644 index 0000000..c5f8eea --- /dev/null +++ b/dbs/meilisearch/scripts/bulk_index_sync.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import argparse +import os +import sys +from functools import lru_cache +from pathlib import Path +from typing import Any + +import srsly +from codetiming import Timer +from dotenv import load_dotenv +from meilisearch import Client +from meilisearch.index import Index +from schemas.wine import Wine +from tqdm import tqdm + +sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) +from api.config import Settings + +load_dotenv() +# Custom types +JsonBlob = dict[str, Any] + + +class FileNotFoundError(Exception): + pass + + +# --- Blocking functions --- + + +@lru_cache() +def get_settings(): + # Use lru_cache to avoid loading .env file for every request + return Settings() + + +def get_json_data(file_path: Path) -> list[JsonBlob]: + """Get all line-delimited json files (.jsonl) from a directory with a given prefix""" + if not file_path.is_file(): + # File may not have been uncompressed yet so try to do that first + data = srsly.read_gzip_jsonl(file_path) + # This time if it isn't there it really doesn't exist + if not file_path.is_file(): + raise FileNotFoundError( + f"`{file_path}` doesn't contain a valid `.jsonl.gz` file - check and try again." + ) + else: + data = srsly.read_gzip_jsonl(file_path) + return data + + +def validate( + data: list[JsonBlob], + exclude_none: bool = True, +) -> list[JsonBlob]: + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] + return validated_data + + +def get_meili_settings(filename: str) -> dict[str, Any]: + settings = dict(srsly.read_json(filename)) + return settings + + +def update_documents(filepath: Path, index: Index, primary_key: str, batch_size: int): + data = list(get_json_data(filepath)) + if LIMIT > 0: + data = data[:LIMIT] + validated_data = validate(data) + index.update_documents_in_batches( + validated_data, + batch_size=batch_size, + primary_key=primary_key, + ) + + +def main(data_files: list[Path]) -> None: + meili_settings = get_meili_settings(filename="settings/settings.json") + config = Settings() + URI = f"http://{config.meili_url}:{config.meili_port}" + MASTER_KEY = config.meili_master_key + index_name = "wines" + primary_key = "id" + + client = Client(URI, MASTER_KEY) + with Timer(name="Bulk Index", text="Bulk index took {:.4f} seconds"): + # Create index + index = client.index(index_name) + # Update settings + client.index(index_name).update_settings(meili_settings) + print("Finished updating database index settings") + try: + # In a real case we'd be iterating through a list of files + # For this example, it's just looping through the same file N times + for filepath in tqdm(data_files): + # Update index + update_documents(filepath, index, primary_key=primary_key, batch_size=BATCHSIZE) + except Exception as e: + print(f"{e}: Error while indexing to db") + + +if __name__ == "__main__": + # fmt: off + parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data") + parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes") + parser.add_argument("--batchsize", "-b", type=int, default=10_000, help="Size of each chunk to break the dataset into before processing") + parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use") + parser.add_argument("--benchmark_num", "-n", type=int, default=1, help="Run a benchmark of the script N times") + args = vars(parser.parse_args()) + # fmt: on + + LIMIT = args["limit"] + DATA_DIR = Path(__file__).parents[3] / "data" + FILENAME = args["filename"] + BATCHSIZE = args["batchsize"] + BENCHMARK_NUM = args["benchmark_num"] + + # Get a list of all files in the data directory + data_files = [f for f in DATA_DIR.glob("*.jsonl.gz") if f.is_file()] + # For benchmarking, we want to run on the same data multiple times (in the real world this would be many different files) + benchmark_data_files = data_files * BENCHMARK_NUM + + meili_settings = get_meili_settings(filename="settings/settings.json") + + # Run main function + main(benchmark_data_files) diff --git a/dbs/meilisearch/scripts/settings/settings.json b/dbs/meilisearch/scripts/settings/settings.json index 258526a..8911898 100644 --- a/dbs/meilisearch/scripts/settings/settings.json +++ b/dbs/meilisearch/scripts/settings/settings.json @@ -26,10 +26,14 @@ "variety" ], "displayedAttributes": [ + "id", + "points", + "price", "title", "country", "province", "variety", + "winery", "taster_name", "description" ], diff --git a/dbs/neo4j/Dockerfile b/dbs/neo4j/Dockerfile index 002a9a1..96be2d7 100644 --- a/dbs/neo4j/Dockerfile +++ b/dbs/neo4j/Dockerfile @@ -7,6 +7,5 @@ COPY ./requirements.txt /wine/requirements.txt RUN pip install --no-cache-dir --upgrade -r /wine/requirements.txt COPY ./api /wine/api -COPY ./schemas /wine/schemas EXPOSE 8000 \ No newline at end of file diff --git a/dbs/neo4j/api/config.py b/dbs/neo4j/api/config.py index 0538151..8bb4194 100644 --- a/dbs/neo4j/api/config.py +++ b/dbs/neo4j/api/config.py @@ -1,12 +1,14 @@ -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + extra="allow", + ) + neo4j_service: str neo4j_url: str neo4j_user: str neo4j_password: str tag: str - - class Config: - env_file = ".env" diff --git a/dbs/neo4j/api/routers/rest.py b/dbs/neo4j/api/routers/rest.py index 3865fe9..be80ba5 100644 --- a/dbs/neo4j/api/routers/rest.py +++ b/dbs/neo4j/api/routers/rest.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, HTTPException, Query, Request from neo4j import AsyncManagedTransaction -from schemas.retriever import ( + +from api.schemas.rest import ( FullTextSearch, MostWinesByVariety, TopWinesByCountry, diff --git a/dbs/neo4j/api/schemas/__init__.py b/dbs/neo4j/api/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbs/neo4j/schemas/retriever.py b/dbs/neo4j/api/schemas/rest.py similarity index 92% rename from dbs/neo4j/schemas/retriever.py rename to dbs/neo4j/api/schemas/rest.py index 544a134..123cba7 100644 --- a/dbs/neo4j/schemas/retriever.py +++ b/dbs/neo4j/api/schemas/rest.py @@ -1,18 +1,9 @@ -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class FullTextSearch(BaseModel): - wineID: int - country: str - title: str - description: str | None - points: int - price: float | str - variety: str | None - winery: str | None - - class Config: - schema_extra = { + model_config = ConfigDict( + json_schema_extra={ "example": { "wineID": 3845, "country": "Italy", @@ -24,6 +15,16 @@ class Config: "winery": "Castellinuzza e Piuca", } } + ) + + wineID: int + country: str + title: str + description: str | None + points: int + price: float | str + variety: str | None + winery: str | None class TopWinesByCountry(BaseModel): diff --git a/dbs/neo4j/requirements.txt b/dbs/neo4j/requirements.txt index 251571b..6285713 100644 --- a/dbs/neo4j/requirements.txt +++ b/dbs/neo4j/requirements.txt @@ -1,6 +1,8 @@ -neo4j>=5.8.0 -pydantic[dotenv]>=1.10.7, <2.0.0 -fastapi>=0.95.0, <1.0.0 +neo4j~=5.9.0 +pydantic~=2.0.0 +pydantic-settings~=2.0.0 +python-dotenv>=1.0.0 +fastapi~=0.100.0 httpx>=0.24.0 aiohttp>=3.8.4 uvloop>=0.17.0 diff --git a/dbs/neo4j/schemas/wine.py b/dbs/neo4j/schemas/wine.py deleted file mode 100644 index 92924df..0000000 --- a/dbs/neo4j/schemas/wine.py +++ /dev/null @@ -1,56 +0,0 @@ -from pydantic import BaseModel, root_validator - - -class Wine(BaseModel): - id: int - points: int - title: str - description: str | None - price: float | None - variety: str | None - winery: str | None - vineyard: str | None - country: str | None - province: str | None - region_1: str | None - region_2: str | None - taster_name: str | None - taster_twitter_handle: str | None - - class Config: - allow_population_by_field_name = True - validate_assignment = True - schema_extra = { - "example": { - "id": 45100, - "points": 85, - "title": "Balduzzi 2012 Reserva Merlot (Maule Valley)", - "description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.", - "price": 10.0, - "variety": "Merlot", - "winery": "Balduzzi", - "vineyard": "Reserva", - "country": "Chile", - "province": "Maule Valley", - "region_1": "null", - "region_2": "null", - "taster_name": "Michael Schachner", - "taster_twitter_handle": "@wineschach", - } - } - - @root_validator(pre=True) - def _get_vineyard(cls, values): - "Rename designation to vineyard" - vineyard = values.pop("designation", None) - if vineyard: - values["vineyard"] = vineyard.strip() - return values - - @root_validator - def _fill_country_unknowns(cls, values): - "Fill in missing country values with 'Unknown', as we always want this field to be queryable" - country = values.get("country") - if not country: - values["country"] = "Unknown" - return values diff --git a/dbs/neo4j/scripts/build_graph.py b/dbs/neo4j/scripts/build_graph.py index c0e9970..dd33c90 100644 --- a/dbs/neo4j/scripts/build_graph.py +++ b/dbs/neo4j/scripts/build_graph.py @@ -3,18 +3,17 @@ import os import sys import time -from concurrent.futures import ProcessPoolExecutor -from functools import lru_cache, partial +from functools import lru_cache from pathlib import Path from typing import Any, Iterator import srsly from dotenv import load_dotenv from neo4j import AsyncGraphDatabase, AsyncManagedTransaction, AsyncSession -from pydantic.main import ModelMetaclass sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) from api.config import Settings + from schemas.wine import Wine # Custom types @@ -59,15 +58,14 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: def validate( data: list[JsonBlob], - model: ModelMetaclass, exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] return validated_data def process_chunks(data: list[JsonBlob]) -> list[JsonBlob]: - validated_data = validate(data, Wine, exclude_none=True) + validated_data = validate(data, exclude_none=True) return validated_data @@ -127,31 +125,25 @@ async def main(data: list[JsonBlob]) -> None: async with driver.session(database="neo4j") as session: # Create indexes and constraints await create_indexes_and_constraints(session) - + # Validate validation_start_time = time.time() - print("Processing chunks") - chunked_data = chunk_iterable(data, CHUNKSIZE) - - with ProcessPoolExecutor() as pool: - loop = asyncio.get_running_loop() - # Validate multiple chunks of data using pydantic in a process pool for faster performance - executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data] - awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks] - # Attach process pool to running event loop so that we can process multiple chunks in parallel - validated_data = await asyncio.gather(*awaitables) + print("Validating data...") + validated_data = validate(data, exclude_none=True) + chunked_data = chunk_iterable(validated_data, CHUNKSIZE) print( - f"Finished validating data in pydantic in {(time.time() - validation_start_time):.4f} sec." + f"Finished validating data in pydantic in {(time.time() - validation_start_time):.4f} sec" ) - + # Bulk ingest + ingestion_time = time.time() # Ingest the data into Neo4j - # For now, we cannot attach this to the running event loop because uvloop complains - for data in validated_data: - ids = [item["id"] for item in data] + for chunk in chunked_data: + ids = [item["id"] for item in chunk] try: - await session.execute_write(build_query, data) + await session.execute_write(build_query, chunk) print(f"Processed ids in range {min(ids)}-{max(ids)}") except Exception as e: print(f"{e}: Failed to ingest IDs in range {min(ids)}-{max(ids)}") + print(f"Finished ingesting data in {(time.time() - ingestion_time):.4f} sec") if __name__ == "__main__": diff --git a/dbs/neo4j/scripts/schemas/__init__.py b/dbs/neo4j/scripts/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbs/elasticsearch/schemas/wine.py b/dbs/neo4j/scripts/schemas/wine.py similarity index 51% rename from dbs/elasticsearch/schemas/wine.py rename to dbs/neo4j/scripts/schemas/wine.py index a25e1be..91b85eb 100644 --- a/dbs/elasticsearch/schemas/wine.py +++ b/dbs/neo4j/scripts/schemas/wine.py @@ -1,26 +1,13 @@ -from pydantic import BaseModel, root_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator class Wine(BaseModel): - id: int - points: int - title: str - description: str | None - price: float | None - variety: str | None - winery: str | None - vineyard: str | None - country: str | None - province: str | None - region_1: str | None - region_2: str | None - taster_name: str | None - taster_twitter_handle: str | None - - class Config: - allow_population_by_field_name = True - validate_assignment = True - schema_extra = { + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + extra="allow", + str_strip_whitespace=True, + json_schema_extra={ "example": { "id": 45100, "points": 85, @@ -37,26 +24,51 @@ class Config: "taster_name": "Michael Schachner", "taster_twitter_handle": "@wineschach", } - } - - @root_validator - def _create_id_field(cls, values): - "Elastic needs an _id field to create unique documents, so we just use the existing id field" - values["_id"] = values["id"] - return values + }, + ) - @root_validator(pre=True) - def _get_vineyard(cls, values): - "Rename designation to vineyard" - vineyard = values.pop("designation", None) - if vineyard: - values["vineyard"] = vineyard.strip() - return values + id: int + points: int + title: str + description: str | None + price: float | None + variety: str | None + winery: str | None + vineyard: str | None = Field(..., alias="designation") + country: str | None + province: str | None + region_1: str | None + region_2: str | None + taster_name: str | None + taster_twitter_handle: str | None - @root_validator + @model_validator(mode="before") def _fill_country_unknowns(cls, values): "Fill in missing country values with 'Unknown', as we always want this field to be queryable" country = values.get("country") - if not country: + if country is None or country == "null": values["country"] = "Unknown" return values + + +if __name__ == "__main__": + data = { + "id": 45100, + "points": 85, + "title": "Balduzzi 2012 Reserva Merlot (Maule Valley)", + "description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.", + "price": 10, # Test if field is cast to float + "variety": "Merlot", + "winery": "Balduzzi", + "designation": "Reserva", # Test if field is renamed + "country": "null", # Test unknown country + "province": " Maule Valley ", # Test if field is stripped + "region_1": "null", + "region_2": "null", + "taster_name": "Michael Schachner", + "taster_twitter_handle": "@wineschach", + } + from pprint import pprint + + wine = Wine(**data) + pprint(wine.model_dump()) diff --git a/dbs/qdrant/Dockerfile.onnxruntime b/dbs/qdrant/Dockerfile.onnxruntime deleted file mode 100644 index f62d37c..0000000 --- a/dbs/qdrant/Dockerfile.onnxruntime +++ /dev/null @@ -1,14 +0,0 @@ -FROM python:3.10-slim-bullseye - -WORKDIR /wine - -COPY ./requirements-onnx.txt /wine/requirements-onnx.txt - -RUN pip install --no-cache-dir -U pip wheel setuptools -RUN pip install --no-cache-dir -r /wine/requirements-onnx.txt - -COPY ./api /wine/api -COPY ./schemas /wine/schemas -COPY ./onnx_model /wine/onnx_model - -EXPOSE 8000 \ No newline at end of file diff --git a/dbs/qdrant/README.md b/dbs/qdrant/README.md index 8617967..15994b4 100644 --- a/dbs/qdrant/README.md +++ b/dbs/qdrant/README.md @@ -32,11 +32,11 @@ The database and API services can be restarted at any time for maintenance and u **💡 Note:** The setup shown here would not be ideal in production, as there are other details related to security and scalability that are not addressed via simple docker, but, this is a good starting point to begin experimenting! -### Option 1: Use `sbert` model +### Use `sbert` model If using the `sbert` model [from the sentence-transformers repo](https://www.sbert.net/) directly, use the provided `docker-compose.yml` to initiate separate containers, one that runs Qdrant, and another one that serves as an API on top of the database. -**⚠️ Note**: This approach will attempt to run `sbert` on a GPU if available, and if not, on CPU (while utilizing all CPU cores). This approach may not yield the fastest vectorization if using CPU-only -- a more optimized version is provided [below](#option-2-use-onnxruntime-model-highly-optimized-for-cpu). +**⚠️ Note**: This approach will attempt to run `sbert` on a GPU if available, and if not, on CPU (while utilizing all CPU cores). ``` docker compose -f docker-compose.yml up -d @@ -47,24 +47,6 @@ Tear down the services using the following command. docker compose -f docker-compose.yml down ``` -### Option 2: Use `onnxruntime` model - -An approach to make the sentence embedding vector generation process more efficient is to optimize and quantize the original `sbert` model via [ONNX (Open Neural Network Exchange)](https://huggingface.co/docs/transformers/serialization). This framework provides a standard interface for optimizing deep learning models and their computational graphs to be executed much faster and with lower resources on specialized runtimes and hardware. - -To deploy the services with the optimized `sbert` model, use the provided `docker-compose.yml` to initiate separate containers, one that runs Qdrant, and another one that serves as an API on top of the database. - -**⚠️ Note**: This approach requires some more additional packages from Hugging Face, on top of the `sbert` modules. **Currently (as of early 2023), they only work on Python 3.10**. For this section, make sure to only use Python 3.10 if ONNX complains about module installations via `pip`. - -``` -docker compose -f docker-compose-onnx.yml up -d -``` -Tear down the services using the following command. - -``` -docker compose -f docker-compose-onnx.yml down -``` - - ## Step 2: Ingest the data We ingest both the JSON data for full-text search and filtering, as well as the sentence embedding vectors (for similarity search) into Qdrant. For this dataset, it's reasonable to expect that a simple concatenation of fields like `title`, `variety` and `description` would result in a useful sentence embedding that can be compared against a search query which is also converted to a vector during query time. @@ -93,32 +75,12 @@ Although larger and more powerful text embedding models exist (such as [OpenAI e For this work, it makes sense to use among the fastest models in this list, which is the `multi-qa-MiniLM-L6-cos-v1` **uncased** model. As per the docs, it was tuned for semantic search and question answering, and generates sentence embeddings for single sentences or paragraphs up to a maximum sequence length of 512. It was trained on 215M question answer pairs from various sources. Compared to the more general-purpose `all-MiniLM-L6-v2` model, it shows slightly improved performance on semantic search tasks while offering a similar level of performance. [See the sbert docs](https://www.sbert.net/docs/pretrained_models.html) for more details on performance comparisons between the various pretrained models. -### Build ONNX optimized model files - -A key step, if using ONNX runtime to speed up vectorization, is to build optimized and quantized models from the base `sbert` model. This is done by running the script `onnx_optimizer.py` in the `onnx_model/` directory. - -The optimization/quantization are done using a modified version of [the methods in this blog post](https://www.philschmid.de/optimize-sentence-transformers). We ony perform dynamic quantization for now as static quantization requires a very hardware and OS-specific set of instructions that don't generalize -- it only makes sense to do this in a production environment that is expected to serve thousands of requests in short time. As further reading, a detailed explanation of the difference between static and dynamic quantization [is available in the Hugging Face docs](https://huggingface.co/docs/optimum/concept_guides/quantization). - -```sh -cd onnx_model -python onnx_optimizer.py # python -> python 3.10 -``` - -Running this script generates a new directory `onnx_models/onnx` with the optimized and quantized models, along with their associated model config files. - -* `model_optimized.onnx` -* `model_optimized_quantized.onnx` - -The `model_optimized_quantized.onnx` is a dynamically-quantized model file that is ~26% smaller in size than the original model in this case, and generates sentence embeddings roughly 1.8x faster than the original sentence transformers model, due to the optimized ONNX runtime. A more detailed blog post benchmarking these numbers will be published shortly! - ### Run data loader Data is ingested into the Qdrant database through the scripts in the `scripts` directly. The scripts validate the input JSON data via [Pydantic](https://docs.pydantic.dev), and then index both the JSON data and the vectors to Qdrant using the [Qdrant Python client](https://github.com/qdrant/qdrant-client). Prior to indexing and vectorizing, we simply concatenate the key fields that contain useful information about each wine and vectorize this instead. -#### Option 1: Use `sbert` - If running on a Macbook or other development machine, it's possible to generate sentence embeddings using the original `sbert` model as per the `EMBEDDING_MODEL_CHECKPOINT` variable in the `.env` file. ```sh @@ -126,15 +88,7 @@ cd scripts python bulk_index_sbert.py ``` -#### Option 2: Use `onnx` quantized model - -If running on a Linux server on a large dataset, it is highly recommended to use the ONNX quantized model for the `EMBEDDING_MODEL_CHECKPOINT` model specified in `.env`. If using the appropriate hardware on modern Intel chips, it can vastly outperform the original `sbert` model on CPU, allowing for lower-cost and higher-throughput indexing for much larger datasets. - -```sh -cd scripts -python bulk_index_onnx.py -``` - +Depending on the CPU on your machine, this may take a while. On a 2022 M2 Macbook Pro, vectorizing and bulk-indexing ~130k records took about 25 minutes. When tested on an AWS EC2 T2 medium instance, the same process took just over an hour. ## Step 3: Test API diff --git a/dbs/qdrant/api/config.py b/dbs/qdrant/api/config.py index de6c498..a9f7188 100644 --- a/dbs/qdrant/api/config.py +++ b/dbs/qdrant/api/config.py @@ -1,15 +1,17 @@ -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + extra="allow", + ) + qdrant_service: str qdrant_port: str qdrant_host: str qdrant_service: str - api_port = str + api_port: str embedding_model_checkpoint: str onnx_model_filename: str tag: str - - class Config: - env_file = ".env" diff --git a/dbs/qdrant/api/main.py b/dbs/qdrant/api/main.py index d24794f..17463ad 100644 --- a/dbs/qdrant/api/main.py +++ b/dbs/qdrant/api/main.py @@ -4,20 +4,12 @@ from fastapi import FastAPI from qdrant_client import QdrantClient +from sentence_transformers import SentenceTransformer from api.config import Settings from api.routers import rest -try: - from optimum.onnxruntime import ORTModelForCustomTasks - from optimum.pipelines import pipeline - from transformers import AutoTokenizer - - model_type = "onnx" -except ModuleNotFoundError: - from sentence_transformers import SentenceTransformer - - model_type = "sbert" +model_type = "sbert" @lru_cache() @@ -26,30 +18,13 @@ def get_settings(): return Settings() -def get_embedding_pipeline(onnx_path, model_filename: str): - """ - Create a sentence embedding pipeline using the optimized ONNX model, if available in the environment - """ - # Reload tokenizer - tokenizer = AutoTokenizer.from_pretrained(onnx_path) - optimized_model = ORTModelForCustomTasks.from_pretrained(onnx_path, file_name=model_filename) - embedding_pipeline = pipeline("feature-extraction", model=optimized_model, tokenizer=tokenizer) - return embedding_pipeline - - @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Async context manager for Qdrant database connection.""" settings = get_settings() model_checkpoint = settings.embedding_model_checkpoint - if model_type == "sbert": - app.model = SentenceTransformer(model_checkpoint) - app.model_type = "sbert" - elif model_type == "onnx": - app.model = get_embedding_pipeline( - "onnx_model/onnx", model_filename=settings.onnx_model_filename - ) - app.model_type = "onnx" + app.model = SentenceTransformer(model_checkpoint) + app.model_type = "sbert" # Define Qdrant client app.client = QdrantClient(host=settings.qdrant_service, port=settings.qdrant_port) print("Successfully connected to Qdrant") diff --git a/dbs/qdrant/api/routers/rest.py b/dbs/qdrant/api/routers/rest.py index 77877cc..98e37b0 100644 --- a/dbs/qdrant/api/routers/rest.py +++ b/dbs/qdrant/api/routers/rest.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, HTTPException, Query, Request from qdrant_client.http import models -from schemas.retriever import CountByCountry, SimilaritySearch +from api.schemas.rest import CountByCountry, SimilaritySearch router = APIRouter() @@ -124,14 +124,10 @@ def _search_by_similarity( collection: str, terms: str, ) -> list[SimilaritySearch] | None: - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, batch_size=64).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms, truncate=True)[0][0] - + vector = request.app.model.encode(terms, batch_size=64).tolist() # Use `vector` for similarity search on the closest vectors in the collection search_result = request.app.client.search( - collection_name=collection, query_vector=vector, top=5 + collection_name=collection, query_vector=vector, limit=5 ) # `search_result` contains found vector ids with similarity scores along with the stored payload # For now we are interested in payload only @@ -144,11 +140,7 @@ def _search_by_similarity( def _search_by_similarity_and_country( request: Request, collection: str, terms: str, country: str ) -> list[SimilaritySearch] | None: - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, batch_size=64).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms, truncate=True)[0][0] - + vector = request.app.model.encode(terms, batch_size=64).tolist() filter = models.Filter( **{ "must": [ @@ -162,7 +154,7 @@ def _search_by_similarity_and_country( } ) search_result = request.app.client.search( - collection_name=collection, query_vector=vector, query_filter=filter, top=5 + collection_name=collection, query_vector=vector, query_filter=filter, limit=5 ) payloads = [hit.payload for hit in search_result] if not payloads: @@ -178,11 +170,7 @@ def _search_by_similarity_and_filters( points: int, price: float, ) -> list[SimilaritySearch] | None: - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, batch_size=64).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms, truncate=True)[0][0] - + vector = request.app.model.encode(terms, batch_size=64).tolist() filter = models.Filter( **{ "must": [ @@ -208,7 +196,7 @@ def _search_by_similarity_and_filters( } ) search_result = request.app.client.search( - collection_name=collection, query_vector=vector, query_filter=filter, top=5 + collection_name=collection, query_vector=vector, query_filter=filter, limit=5 ) payloads = [hit.payload for hit in search_result] if not payloads: diff --git a/dbs/qdrant/api/schemas/__init__.py b/dbs/qdrant/api/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbs/qdrant/schemas/retriever.py b/dbs/qdrant/api/schemas/rest.py similarity index 84% rename from dbs/qdrant/schemas/retriever.py rename to dbs/qdrant/api/schemas/rest.py index 42903fb..906b33c 100644 --- a/dbs/qdrant/schemas/retriever.py +++ b/dbs/qdrant/api/schemas/rest.py @@ -1,22 +1,12 @@ -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class SimilaritySearch(BaseModel): - id: int - country: str - province: str | None - title: str - description: str | None - points: int - price: float | str | None - variety: str | None - winery: str | None - - class Config: - extra = "ignore" - schema_extra = { + model_config = ConfigDict( + extra="ignore", + json_schema_extra={ "example": { - "id": 3845, + "wineID": 3845, "country": "Italy", "title": "Castellinuzza e Piuca 2010 Chianti Classico", "description": "This gorgeous Chianti Classico boasts lively cherry, strawberry and violet aromas. The mouthwatering palate shows concentrated wild-cherry flavor layered with mint, white pepper and clove. It has fresh acidity and firm tannins that will develop complexity with more bottle age. A textbook Chianti Classico.", @@ -25,7 +15,18 @@ class Config: "variety": "Red Blend", "winery": "Castellinuzza e Piuca", } - } + }, + ) + + id: int + country: str + province: str | None + title: str + description: str | None + points: int + price: float | str | None + variety: str | None + winery: str | None class CountByCountry(BaseModel): diff --git a/dbs/qdrant/docker-compose-onnx.yml b/dbs/qdrant/docker-compose-onnx.yml deleted file mode 100644 index 1ea3334..0000000 --- a/dbs/qdrant/docker-compose-onnx.yml +++ /dev/null @@ -1,39 +0,0 @@ -version: "3.9" - -services: - qdrant: - image: qdrant/qdrant:${QDRANT_VERSION} - restart: unless-stopped - environment: - - QDRANT_HOST=${QDRANT_HOST} - ports: - - ${QDRANT_PORT}:6333 - volumes: - - qdrant_storage:/qdrant/storage - networks: - - wine - - fastapi: - image: qdrant_wine_fastapi:${TAG} - build: - context: . - dockerfile: Dockerfile.onnxruntime - restart: unless-stopped - env_file: - - .env - ports: - - ${API_PORT}:8000 - depends_on: - - qdrant - volumes: - - ./:/wine - networks: - - wine - command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload - -volumes: - qdrant_storage: - -networks: - wine: - driver: bridge \ No newline at end of file diff --git a/dbs/qdrant/onnx_model/onnx_optimizer.py b/dbs/qdrant/onnx_model/onnx_optimizer.py deleted file mode 100644 index 4ee37ca..0000000 --- a/dbs/qdrant/onnx_model/onnx_optimizer.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -This script is a modified version of the method shown in this blog post: -https://www.philschmid.de/optimize-sentence-transformers - -It uses the ONNX Runtime to dynamically optimize and quantize a sentence transformers model for better CPU performance. - -Using the quantized version of `sentence-transformers/multi-qa-MiniLM-L6-cos-v1` allows us to: - * Generate similar quality sentence embeddings as the original model, but with a roughly 1.8x speedup in vectorization time - * Reduce the model size from 86 MB to around 63 MB, a roughly 26% reduction in file size -""" -from pathlib import Path - -import torch -import torch.nn.functional as F -from optimum.onnxruntime import ORTModelForCustomTasks, ORTOptimizer, ORTQuantizer -from optimum.onnxruntime.configuration import AutoQuantizationConfig, OptimizationConfig -from sklearn.metrics.pairwise import cosine_similarity -from transformers import AutoModel, AutoTokenizer, Pipeline - - -def mean_pooling(model_output, attention_mask): - token_embeddings = model_output[ - 0 - ] # First element of model_output contains all token embeddings - input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( - input_mask_expanded.sum(1), min=1e-9 - ) - - -class SentenceEmbeddingPipeline(Pipeline): - def _sanitize_parameters(self, **kwargs): - # We don't have any hyperameters to sanitize - preprocess_kwargs = {} - return preprocess_kwargs, {}, {} - - def preprocess(self, inputs): - encoded_inputs = self.tokenizer(inputs, padding=True, truncation=True, return_tensors="pt") - return encoded_inputs - - def _forward(self, model_inputs): - outputs = self.model(**model_inputs) - return {"outputs": outputs, "attention_mask": model_inputs["attention_mask"]} - - def postprocess(self, model_outputs): - # Perform mean pooling - sentence_embeddings = mean_pooling( - model_outputs["outputs"], model_outputs["attention_mask"] - ) - # Normalize embeddings - sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) - return sentence_embeddings - - -def optimize_model(model_id: str, onnx_path: Path) -> None: - """ - Optimize ONNX model for CPU performance - """ - model = ORTModelForCustomTasks.from_pretrained(model_id, export=True) - # Create ORTOptimizer and define optimization configuration - optimizer = ORTOptimizer.from_pretrained(model) - # Save models to local disk - model.save_pretrained(onnx_path) - tokenizer.save_pretrained(onnx_path) - # Set optimization_level = 99 -> enable all optimizations - optimization_config = OptimizationConfig(optimization_level=99) - # Apply the optimization configuration to the model - optimizer.optimize( - optimization_config=optimization_config, - save_dir=onnx_path, - ) - - -def quantize_optimized_model(onnx_path: Path) -> None: - """ - Quantize an already optimized ONNX model for even better CPU performance - """ - # Create ORTQuantizer and define quantization configuration - quantizer = ORTQuantizer.from_pretrained(onnx_path, file_name="model_optimized.onnx") - quantization_config = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=True) - # Apply the quantization configuration to the model - quantizer.quantize( - quantization_config=quantization_config, - save_dir=onnx_path, - ) - - -def generate_similarities(source_sentence: str, sentences: list[str], pipeline: Pipeline) -> None: - source_sentence_embedding = pipeline(source_sentence).tolist()[0] - - for sentence in sentences: - sentence_embedding = pipeline(sentence).tolist()[0] - similarity = cosine_similarity([source_sentence_embedding], [sentence_embedding])[0] - print(f"Similarity between '{source_sentence}' and '{sentence}': {similarity}") - - -def main() -> None: - """ - Generate optimized and quantized ONNX models from a vanilla sentence transformer model - """ - # Init vanilla sentence transformer pipeline - print("---\nLoading vanilla sentence transformer model\n---") - vanilla_pipeline = SentenceEmbeddingPipeline(model=vanilla_model, tokenizer=tokenizer) - # Print out pairwise similarities - generate_similarities(source_sentence, sentences, vanilla_pipeline) - - # Save model to ONNX - Path("onnx").mkdir(exist_ok=True) - onnx_path = Path("onnx") - - # First, dynamically optimize an existing sentence transformer model - optimize_model(model_id, onnx_path) - # Next, dynamically quantize the optimized model - quantize_optimized_model(onnx_path) - - # Init quantized ONNX pipeline - print("---\nLoading quantized ONNX model\n---") - model_filename = "model_optimized_quantized.onnx" - quantized_model = ORTModelForCustomTasks.from_pretrained(onnx_path, file_name=model_filename) - quantized_pipeline = SentenceEmbeddingPipeline(model=quantized_model, tokenizer=tokenizer) - # Print out pairwise similarities - generate_similarities(source_sentence, sentences, quantized_pipeline) - - -if __name__ == "__main__": - # Example sentences we want sentence embeddings for - source_sentence = "I'm very happy" - sentences = ["I am so glad", "I'm so sad", "My dog is missing", "The universe is so vast!"] - - model_id = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1" - # Load AutoModel from huggingface model repository - tokenizer = AutoTokenizer.from_pretrained(model_id) - vanilla_model = AutoModel.from_pretrained(model_id) - - main() diff --git a/dbs/qdrant/requirements-onnx.txt b/dbs/qdrant/requirements-onnx.txt deleted file mode 100644 index e86aa2b..0000000 --- a/dbs/qdrant/requirements-onnx.txt +++ /dev/null @@ -1,79 +0,0 @@ -aiohttp==3.8.4 -aiosignal==1.3.1 -anyio==3.6.2 -async-timeout==4.0.2 -attrs==23.1.0 -catalogue==2.0.8 -certifi==2022.12.7 -charset-normalizer==3.1.0 -click==8.1.3 -cmake==3.26.3 -coloredlogs==15.0.1 -datasets==2.11.0 -dill==0.3.6 -evaluate==0.4.0 -fastapi==0.95.1 -filelock==3.12.0 -flatbuffers==23.3.3 -frozenlist==1.3.3 -fsspec==2023.4.0 -grpcio==1.54.0 -grpcio-tools==1.48.2 -h11==0.14.0 -h2==4.1.0 -hpack==4.0.0 -httpcore==0.17.0 -httpx==0.24.0 -huggingface-hub==0.13.4 -humanfriendly==10.0 -hyperframe==6.0.1 -idna==3.4 -Jinja2==3.1.2 -joblib==1.2.0 -lit==16.0.1 -MarkupSafe==2.1.2 -mpmath==1.3.0 -multidict==6.0.4 -multiprocess==0.70.14 -networkx==3.1 -nltk==3.8.1 -numpy==1.24.2 -onnx==1.13.1 -onnxruntime==1.14.1 -optimum==1.8.2 -packaging==23.1 -pandas==2.0.0 -Pillow==9.5.0 -protobuf==3.20.2 -pyarrow==11.0.0 -pydantic==1.10.7 -python-dateutil==2.8.2 -python-dotenv==1.0.0 -pytz==2023.3 -PyYAML==6.0 -qdrant-client==1.1.5 -regex==2023.3.23 -requests==2.28.2 -responses==0.18.0 -scikit-learn==1.2.2 -scipy==1.10.1 -sentence-transformers==2.2.2 -sentencepiece==0.1.98 -six==1.16.0 -sniffio==1.3.0 -srsly==2.4.6 -starlette==0.26.1 -sympy==1.11.1 -threadpoolctl==3.1.0 -tokenizers==0.13.3 -torch==2.0.0 -torchvision==0.15.1 -tqdm==4.65.0 -transformers==4.28.1 -triton==2.0.0 -typing_extensions==4.5.0 -tzdata==2023.3 -urllib3==1.26.15 -uvicorn==0.21.1 -xxhash==3.2.0 -yarl==1.9.1 diff --git a/dbs/qdrant/requirements.txt b/dbs/qdrant/requirements.txt index 248f78b..399eda5 100644 --- a/dbs/qdrant/requirements.txt +++ b/dbs/qdrant/requirements.txt @@ -1,9 +1,12 @@ -qdrant-client>=1.1.4 -transformers==4.28.1 -sentence-transformers==2.2.2 -pydantic[dotenv]>=1.10.7, <2.0.0 -fastapi>=0.95.0, <1.0.0 +qdrant-client~=1.3.0 +transformers~=4.28.0 +sentence-transformers~=2.2.0 +pydantic~=2.0.0 +pydantic-settings>=2.0.0 +python-dotenv>=1.0.0 +fastapi~=0.100.0 httpx>=0.24.0 aiohttp>=3.8.4 +uvloop>=0.17.0 uvicorn>=0.21.0, <1.0.0 -srsly>=2.4.6 \ No newline at end of file +srsly>=2.4.6 diff --git a/dbs/qdrant/schemas/wine.py b/dbs/qdrant/schemas/wine.py index bff0913..1dd4a0e 100644 --- a/dbs/qdrant/schemas/wine.py +++ b/dbs/qdrant/schemas/wine.py @@ -1,27 +1,13 @@ -from pydantic import BaseModel, root_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator class Wine(BaseModel): - id: int - points: int - title: str - description: str | None - price: float | None - variety: str | None - winery: str | None - vineyard: str | None - country: str | None - province: str | None - region_1: str | None - region_2: str | None - taster_name: str | None - taster_twitter_handle: str | None - - class Config: - extra = "allow" - allow_population_by_field_name = True - validate_assignment = True - schema_extra = { + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + extra="allow", + str_strip_whitespace=True, + json_schema_extra={ "example": { "id": 45100, "points": 85, @@ -38,17 +24,25 @@ class Config: "taster_name": "Michael Schachner", "taster_twitter_handle": "@wineschach", } - } + }, + ) - @root_validator(pre=True) - def _get_vineyard(cls, values): - "Rename designation to vineyard" - vineyard = values.pop("designation", None) - if vineyard: - values["vineyard"] = vineyard.strip() - return values + id: int + points: int + title: str + description: str | None + price: float | None + variety: str | None + winery: str | None + vineyard: str | None = Field(..., alias="designation") + country: str | None + province: str | None + region_1: str | None + region_2: str | None + taster_name: str | None + taster_twitter_handle: str | None - @root_validator + @model_validator(mode="before") def _fill_country_unknowns(cls, values): "Fill in missing country values with 'Unknown', as we always want this field to be queryable" country = values.get("country") @@ -56,7 +50,7 @@ def _fill_country_unknowns(cls, values): values["country"] = "Unknown" return values - @root_validator + @model_validator(mode="before") def _add_to_vectorize_fields(cls, values): "Add a field to_vectorize that will be used to create sentence embeddings" variety = values.get("variety", "") diff --git a/dbs/qdrant/scripts/bulk_index_onnx.py b/dbs/qdrant/scripts/bulk_index_onnx.py deleted file mode 100644 index ca2061e..0000000 --- a/dbs/qdrant/scripts/bulk_index_onnx.py +++ /dev/null @@ -1,177 +0,0 @@ -import argparse -import os -import sys -from concurrent.futures import ProcessPoolExecutor -from functools import lru_cache -from pathlib import Path -from typing import Any, Iterator - -import srsly -from dotenv import load_dotenv -from optimum.onnxruntime import ORTModelForCustomTasks -from optimum.pipelines import pipeline -from pydantic.main import ModelMetaclass -from qdrant_client import QdrantClient -from qdrant_client.http import models -from transformers import AutoTokenizer - -sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) -from api.config import Settings -from schemas.wine import Wine - -load_dotenv() -# Custom types -JsonBlob = dict[str, Any] - - -class FileNotFoundError(Exception): - pass - - -@lru_cache() -def get_settings(): - # Use lru_cache to avoid loading .env file for every request - return Settings() - - -def chunk_iterable(item_list: list[JsonBlob], chunksize: int) -> Iterator[tuple[JsonBlob, ...]]: - """ - Break a large iterable into an iterable of smaller iterables of size `chunksize` - """ - for i in range(0, len(item_list), chunksize): - yield tuple(item_list[i : i + chunksize]) - - -def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: - """Get all line-delimited json files (.jsonl) from a directory with a given prefix""" - file_path = data_dir / filename - if not file_path.is_file(): - # File may not have been uncompressed yet so try to do that first - data = srsly.read_gzip_jsonl(file_path) - # This time if it isn't there it really doesn't exist - if not file_path.is_file(): - raise FileNotFoundError(f"No valid .jsonl file found in `{data_dir}`") - else: - data = srsly.read_gzip_jsonl(file_path) - return data - - -def validate( - data: list[JsonBlob], - model: ModelMetaclass, - exclude_none: bool = False, -) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] - return validated_data - - -def create_index( - client: QdrantClient, - collection_name: str, -) -> None: - # Field that will be vectorized requires special treatment for tokenization - client.create_payload_index( - collection_name=collection_name, - field_name="description", - field_schema=models.TextIndexParams( - type="text", - tokenizer=models.TokenizerType.WORD, - min_token_len=3, - max_token_len=15, - lowercase=True, - ), - ) - # Lowercase fields that will be filtered on - for field_name in ["country", "province", "region_1", "region_2", "variety"]: - client.create_payload_index( - collection_name=collection_name, - field_name=field_name, - field_schema="keyword", - ) - - -def get_embedding_pipeline(onnx_path, model_filename: str) -> pipeline: - """ - Create a sentence embedding pipeline using the optimized ONNX model - """ - # Reload tokenizer - tokenizer = AutoTokenizer.from_pretrained(onnx_path) - optimized_model = ORTModelForCustomTasks.from_pretrained(onnx_path, file_name=model_filename) - embedding_pipeline = pipeline("feature-extraction", model=optimized_model, tokenizer=tokenizer) - return embedding_pipeline - - -def add_vectors_to_index(data_chunk: tuple[JsonBlob, ...]) -> None: - settings = get_settings() - collection = "wines" - client = QdrantClient(host=settings.qdrant_host, port=settings.qdrant_port, timeout=None) - data = validate(data_chunk, Wine, exclude_none=True) - - # Preload optimized, quantized ONNX sentence transformers model - # NOTE: This requires that the script ../onnx_model/onnx_optimizer.py has been run beforehand - pipeline = get_embedding_pipeline(ONNX_PATH, model_filename="model_optimized_quantized.onnx") - - ids = [item["id"] for item in data] - to_vectorize = [text.pop("to_vectorize") for text in data] - sentence_embeddings = [pipeline(text.lower(), truncate=True)[0][0] for text in to_vectorize] - print(f"Finished vectorizing data in the ID range {min(ids)}-{max(ids)}") - try: - # Upsert payload - client.upsert( - collection_name=collection, - wait=True, - points=models.Batch( - ids=ids, - payloads=data, - vectors=sentence_embeddings, - ), - ) - print(f"Indexed ID range {min(ids)}-{max(ids)} to db") - except Exception as e: - print(f"{e}: Failed to index ID range {min(ids)}-{max(ids)} to db") - return ids - - -def main(data: list[JsonBlob]) -> None: - settings = get_settings() - COLLECTION = "wines" - client = QdrantClient(host=settings.qdrant_host, port=settings.qdrant_port, timeout=None) - # Create or recreate collection - client.recreate_collection( - collection_name=COLLECTION, - vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE), - ) - # Create payload with text field whose sentence embeddings will be added to the index - create_index(client, COLLECTION) - print("Created index") - - print("Processing chunks") - with ProcessPoolExecutor(max_workers=WORKERS) as executor: - chunked_data = chunk_iterable(data, CHUNKSIZE) - for _ in executor.map(add_vectors_to_index, chunked_data): - pass - - -if __name__ == "__main__": - # fmt: off - parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data") - parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes") - parser.add_argument("--chunksize", type=int, default=512, help="Size of each chunk to break the dataset into before processing") - parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use") - parser.add_argument("--workers", type=int, default=3, help="Number of workers to use for vectorization") - args = vars(parser.parse_args()) - # fmt: on - - LIMIT = args["limit"] - DATA_DIR = Path(__file__).parents[3] / "data" - FILENAME = args["filename"] - CHUNKSIZE = args["chunksize"] - WORKERS = args["workers"] - ONNX_PATH = Path(__file__).parents[1] / ("onnx_model") / "onnx" - - data = list(get_json_data(DATA_DIR, FILENAME)) - - if data: - data = data[:LIMIT] if LIMIT > 0 else data - main(data) - print("Finished execution!") diff --git a/dbs/qdrant/scripts/bulk_index_sbert.py b/dbs/qdrant/scripts/bulk_index_sbert.py index 4ca51e8..61725dd 100644 --- a/dbs/qdrant/scripts/bulk_index_sbert.py +++ b/dbs/qdrant/scripts/bulk_index_sbert.py @@ -8,14 +8,14 @@ import srsly from dotenv import load_dotenv -from pydantic.main import ModelMetaclass from qdrant_client import QdrantClient from qdrant_client.http import models sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1])) +from sentence_transformers import SentenceTransformer + from api.config import Settings from schemas.wine import Wine -from sentence_transformers import SentenceTransformer load_dotenv() # Custom types @@ -56,10 +56,9 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: def validate( data: list[JsonBlob], - model: ModelMetaclass, exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] return validated_data @@ -92,7 +91,7 @@ def add_vectors_to_index(data_chunk: tuple[JsonBlob, ...]) -> None: settings = get_settings() collection = "wines" client = QdrantClient(host=settings.qdrant_host, port=settings.qdrant_port, timeout=None) - data = validate(data_chunk, Wine, exclude_none=True) + data = validate(data_chunk, exclude_none=True) # Load a sentence transformer model for semantic similarity from a specified checkpoint model_id = get_settings().embedding_model_checkpoint diff --git a/dbs/weaviate/.env.example b/dbs/weaviate/.env.example index b64ef4e..93502d2 100644 --- a/dbs/weaviate/.env.example +++ b/dbs/weaviate/.env.example @@ -1,4 +1,4 @@ -WEAVIATE_VERSION = "1.18.4" +WEAVIATE_VERSION = "1.20.2" WEAVIATE_PORT = 8080 WEAVIATE_HOST = "localhost" WEAVIATE_SERVICE = "weaviate" @@ -7,7 +7,7 @@ EMBEDDING_MODEL_CHECKPOINT = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1" ONNX_MODEL_FILENAME = "model_optimized_quantized.onnx" # Container image tag -TAG = "0.1.0" +TAG = "0.2.0" # Docker project namespace (defaults to the current folder name if not set) COMPOSE_PROJECT_NAME = weaviate_wine \ No newline at end of file diff --git a/dbs/weaviate/Dockerfile b/dbs/weaviate/Dockerfile index 2a47f56..7e9dd8b 100644 --- a/dbs/weaviate/Dockerfile +++ b/dbs/weaviate/Dockerfile @@ -8,6 +8,5 @@ RUN pip install --no-cache-dir -U pip wheel setuptools RUN pip install --no-cache-dir -r /wine/requirements.txt COPY ./api /wine/api -COPY ./schemas /wine/schemas EXPOSE 8000 \ No newline at end of file diff --git a/dbs/weaviate/Dockerfile.onnxruntime b/dbs/weaviate/Dockerfile.onnxruntime deleted file mode 100644 index f62d37c..0000000 --- a/dbs/weaviate/Dockerfile.onnxruntime +++ /dev/null @@ -1,14 +0,0 @@ -FROM python:3.10-slim-bullseye - -WORKDIR /wine - -COPY ./requirements-onnx.txt /wine/requirements-onnx.txt - -RUN pip install --no-cache-dir -U pip wheel setuptools -RUN pip install --no-cache-dir -r /wine/requirements-onnx.txt - -COPY ./api /wine/api -COPY ./schemas /wine/schemas -COPY ./onnx_model /wine/onnx_model - -EXPOSE 8000 \ No newline at end of file diff --git a/dbs/weaviate/README.md b/dbs/weaviate/README.md index 9982b88..826990e 100644 --- a/dbs/weaviate/README.md +++ b/dbs/weaviate/README.md @@ -33,11 +33,11 @@ The database and API services can be restarted at any time for maintenance and u **💡 Note:** The setup shown here would not be ideal in production, as there are other details related to security and scalability that are not addressed via simple docker, but, this is a good starting point to begin experimenting! -### Option 1: Use `sbert` model +### Use `sbert` model If using the `sbert` model [from the sentence-transformers repo](https://www.sbert.net/) directly, use the provided `docker-compose.yml` to initiate separate containers, one that runs Weaviate, and another one that serves as an API on top of the database. -**⚠️ Note**: This approach will attempt to run `sbert` on a GPU if available, and if not, on CPU (while utilizing all CPU cores). This approach may not yield the fastest vectorization if using CPU-only -- a more optimized version is provided [below](#option-2-use-onnxruntime-model-highly-optimized-for-cpu). +**⚠️ Note**: This approach will attempt to run `sbert` on a GPU if available, and if not, on CPU (while utilizing all CPU cores). ``` docker compose -f docker-compose.yml up -d @@ -48,24 +48,6 @@ Tear down the services using the following command. docker compose -f docker-compose.yml down ``` -### Option 2: Use `onnxruntime` model - -An approach to make the sentence embedding vector generation process more efficient is to optimize and quantize the original `sbert` model via [ONNX (Open Neural Network Exchange)](https://huggingface.co/docs/transformers/serialization). This framework provides a standard interface for optimizing deep learning models and their computational graphs to be executed much faster and with lower resources on specialized runtimes and hardware. - -To deploy the services with the optimized `sbert` model, use the provided `docker-compose.yml` to initiate separate containers, one that runs Weaviate, and another one that serves as an API on top of the database. - -**⚠️ Note**: This approach requires some more additional packages from Hugging Face, on top of the `sbert` modules. **Currently (as of early 2023), they only work on Python 3.10**. For this section, make sure to only use Python 3.10 if ONNX complains about module installations via `pip`. - -``` -docker compose -f docker-compose-onnx.yml up -d -``` -Tear down the services using the following command. - -``` -docker compose -f docker-compose-onnx.yml down -``` - - ## Step 2: Ingest the data We ingest both the JSON data for full-text search and filtering, as well as the sentence embedding vectors for similarity search into Weaviate. For this dataset, it's reasonable to expect that a simple concatenation of fields like `title`, `country`, `province`, `variety` and `description` would result in a useful vector that can be compared against a search query, also vectorized in the same embedding space. @@ -93,32 +75,12 @@ Although larger and more powerful text embedding models exist (such as [OpenAI e For this work, it makes sense to use among the fastest models in this list, which is the `multi-qa-MiniLM-L6-cos-v1` **uncased** model. As per the docs, it was tuned for semantic search and question answering, and generates sentence embeddings for single sentences or paragraphs up to a maximum sequence length of 512. It was trained on 215M question answer pairs from various sources. Compared to the more general-purpose `all-MiniLM-L6-v2` model, it shows slightly improved performance on semantic search tasks while offering a similar level of performance. [See the sbert docs](https://www.sbert.net/docs/pretrained_models.html) for more details on performance comparisons between the various pretrained models. -### Build ONNX optimized model files - -A key step, if using ONNX runtime to speed up vectorization, is to build optimized and quantized models from the base `sbert` model. This is done by running the script `onnx_optimizer.py` in the `onnx_model/` directory. - -The optimization/quantization are done using a modified version of [the methods in this blog post](https://www.philschmid.de/optimize-sentence-transformers). We ony perform dynamic quantization for now as static quantization requires a very hardware and OS-specific set of instructions that don't generalize -- it only makes sense to do this in a production environment that is expected to serve thousands of requests in short time. As further reading, a detailed explanation of the difference between static and dynamic quantization [is available in the Hugging Face docs](https://huggingface.co/docs/optimum/concept_guides/quantization). - -```sh -cd onnx_model -python onnx_optimizer.py # python -> python 3.10 -``` - -Running this script generates a new directory `onnx_models/onnx` with the optimized and quantized models, along with their associated model config files. - -* `model_optimized.onnx` -* `model_optimized_quantized.onnx` - -The `model_optimized_quantized.onnx` is a dynamically-quantized model file that is ~26% smaller in size than the original model in this case, and generates sentence embeddings roughly 1.8x faster than the original sentence transformers model, due to the optimized ONNX runtime. A more detailed blog post benchmarking these numbers will be published shortly! - ### Run data loader Data is ingested into the Weaviate database through the scripts in the `scripts` directly. The scripts validate the input JSON data via [Pydantic](https://docs.pydantic.dev), and then index both the JSON data and the vectors to Weaviate using the [Weaviate Python client](https://github.com/weaviate/weaviate-python-client). As mentioned before, the fields `variety`, `country`, `province`, `title` and `description` are concatenated, vectorized, and then indexed to Weaviate. -#### Option 1: Use `sbert` - If running on a Macbook or a machine without a GPU, it's possible to generate sentence embeddings using the original `sbert` model as per the `EMBEDDING_MODEL_CHECKPOINT` variable in the `.env` file. ```sh @@ -126,25 +88,7 @@ cd scripts python bulk_index_sbert.py ``` -#### Option 2: Use `onnx` quantized model - -If running on a remote Linux CPU instance, it is highly recommended to use the ONNX quantized model for the `EMBEDDING_MODEL_CHECKPOINT` model specified in `.env`. Using the appropriate hardware on modern Intel chips can vastly outperform the original `sbert` model on a conventional CPU, allowing for lower-cost and higher-throughput indexing for much larger datasets, all with very low memory consumption (under 2 GB). - -```sh -cd scripts -python bulk_index_onnx.py -``` - -### Time to index dataset - -Because vectorizing a large dataset can be an expensive step, part of the goal of this exercise is to see whether we can do so on CPU, with the fewest resources possible. - -In short, we are able to index all 129,971 wine reviews from the dataset in **28 min 30 sec**. The conditions under which this indexing time was achieved are listed below. - -* Ubuntu 22.04 EC2 `T2.xlarge` instance on AWS (1 CPU with 4 cores, 16 GB of RAM) -* Python `3.10.10` (Did not use Python 3.11 because ONNX doesn't support it yet) -* Quantized ONNX version of the `sentence-transformers/multi-qa-MiniLM-L6-cos-v1` sentence transformer -* Weaviate version `1.18.4` +Depending on the CPU on your machine, this may take a while. On a 2022 M2 Macbook Pro, vectorizing and bulk-indexing ~130k records took about 25 minutes. When tested on an AWS EC2 T2 medium instance, the same process took just over an hour. ## Step 3: Test API diff --git a/dbs/weaviate/api/config.py b/dbs/weaviate/api/config.py index 7740296..24aebc4 100644 --- a/dbs/weaviate/api/config.py +++ b/dbs/weaviate/api/config.py @@ -1,15 +1,17 @@ -from pydantic import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + extra="allow", + ) + weaviate_service: str weaviate_port: str weaviate_host: str weaviate_service: str - api_port = str + api_port: int embedding_model_checkpoint: str onnx_model_filename: str tag: str - - class Config: - env_file = ".env" diff --git a/dbs/weaviate/api/main.py b/dbs/weaviate/api/main.py index 0a4499a..6b216f0 100644 --- a/dbs/weaviate/api/main.py +++ b/dbs/weaviate/api/main.py @@ -8,16 +8,9 @@ from api.config import Settings from api.routers import rest -try: - from optimum.onnxruntime import ORTModelForCustomTasks - from optimum.pipelines import pipeline - from transformers import AutoTokenizer +from sentence_transformers import SentenceTransformer - model_type = "onnx" -except ModuleNotFoundError: - from sentence_transformers import SentenceTransformer - - model_type = "sbert" +model_type = "sbert" @lru_cache() @@ -26,30 +19,13 @@ def get_settings(): return Settings() -def get_embedding_pipeline(onnx_path, model_filename: str): - """ - Create a sentence embedding pipeline using the optimized ONNX model, if available in the environment - """ - # Reload tokenizer - tokenizer = AutoTokenizer.from_pretrained(onnx_path) - optimized_model = ORTModelForCustomTasks.from_pretrained(onnx_path, file_name=model_filename) - embedding_pipeline = pipeline("feature-extraction", model=optimized_model, tokenizer=tokenizer) - return embedding_pipeline - - @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Async context manager for Weaviate database connection.""" settings = get_settings() model_checkpoint = settings.embedding_model_checkpoint - if model_type == "sbert": - app.model = SentenceTransformer(model_checkpoint) - app.model_type = "sbert" - elif model_type == "onnx": - app.model = get_embedding_pipeline( - "onnx_model/onnx", model_filename=settings.onnx_model_filename - ) - app.model_type = "onnx" + app.model = SentenceTransformer(model_checkpoint) + app.model_type = "sbert" # Create Weaviate client HOST = settings.weaviate_service PORT = settings.weaviate_port diff --git a/dbs/weaviate/api/routers/rest.py b/dbs/weaviate/api/routers/rest.py index abe2aea..bc27c80 100644 --- a/dbs/weaviate/api/routers/rest.py +++ b/dbs/weaviate/api/routers/rest.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, HTTPException, Query, Request -from schemas.retriever import CountByCountry, SimilaritySearch + +from api.schemas.rest import CountByCountry, SimilaritySearch router = APIRouter() @@ -121,11 +122,7 @@ def _search_by_similarity( request: Request, class_name: str, terms: str ) -> list[SimilaritySearch] | None: # Convert input text query into a vector for lookup in the db - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms)[0][0] - + vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() near_vec = {"vector": vector} response = ( request.app.client.query.get( @@ -162,11 +159,7 @@ def _search_by_similarity_and_country( country: str, ) -> list[SimilaritySearch] | None: # Convert input text query into a vector for lookup in the db - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms)[0][0] - + vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() near_vec = {"vector": vector} where_filter = { "path": "country", @@ -211,11 +204,7 @@ def _search_by_similarity_and_filters( price: float, ) -> list[SimilaritySearch] | None: # Convert input text query into a vector for lookup in the db - if request.app.model_type == "sbert": - vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() - elif request.app.model_type == "onnx": - vector = request.app.model(terms)[0][0] - + vector = request.app.model.encode(terms, show_progress_bar=False, batch_size=128).tolist() near_vec = {"vector": vector} where_filter = { "operator": "And", diff --git a/dbs/weaviate/api/schemas/__init__.py b/dbs/weaviate/api/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbs/weaviate/schemas/retriever.py b/dbs/weaviate/api/schemas/rest.py similarity index 84% rename from dbs/weaviate/schemas/retriever.py rename to dbs/weaviate/api/schemas/rest.py index 0521979..32c1117 100644 --- a/dbs/weaviate/schemas/retriever.py +++ b/dbs/weaviate/api/schemas/rest.py @@ -1,22 +1,12 @@ -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class SimilaritySearch(BaseModel): - wineID: int - country: str - province: str | None - title: str - description: str | None - points: int - price: float | str | None - variety: str | None - winery: str | None - - class Config: - extra = "ignore" - schema_extra = { + model_config = ConfigDict( + extra="ignore", + json_schema_extra={ "example": { - "id": 3845, + "wineID": 3845, "country": "Italy", "title": "Castellinuzza e Piuca 2010 Chianti Classico", "description": "This gorgeous Chianti Classico boasts lively cherry, strawberry and violet aromas. The mouthwatering palate shows concentrated wild-cherry flavor layered with mint, white pepper and clove. It has fresh acidity and firm tannins that will develop complexity with more bottle age. A textbook Chianti Classico.", @@ -25,7 +15,18 @@ class Config: "variety": "Red Blend", "winery": "Castellinuzza e Piuca", } - } + }, + ) + + wineID: int + country: str + province: str | None + title: str + description: str | None + points: int + price: float | str | None + variety: str | None + winery: str | None class CountByCountry(BaseModel): diff --git a/dbs/weaviate/docker-compose-onnx.yml b/dbs/weaviate/docker-compose-onnx.yml deleted file mode 100644 index 877451e..0000000 --- a/dbs/weaviate/docker-compose-onnx.yml +++ /dev/null @@ -1,43 +0,0 @@ -version: "3.9" - -services: - weaviate: - image: semitechnologies/weaviate:${WEAVIATE_VERSION} - ports: - - ${WEAVIATE_PORT}:8080 - restart: on-failure:0 - environment: - QUERY_DEFAULTS_LIMIT: 25 - AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' - PERSISTENCE_DATA_PATH: '/var/lib/weaviate' - DEFAULT_VECTORIZER_MODULE: 'none' - CLUSTER_HOSTNAME: 'node1' - volumes: - - weaviate_data:/var/lib/weaviate - networks: - - wine - - fastapi: - image: weaviate_wine_fastapi:${TAG} - build: - context: . - dockerfile: Dockerfile.onnxruntime - restart: unless-stopped - env_file: - - .env - ports: - - ${API_PORT}:8000 - depends_on: - - weaviate - volumes: - - ./:/wine - networks: - - wine - command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload - -volumes: - weaviate_data: - -networks: - wine: - driver: bridge \ No newline at end of file diff --git a/dbs/weaviate/onnx_model/onnx_optimizer.py b/dbs/weaviate/onnx_model/onnx_optimizer.py deleted file mode 100644 index 4ee37ca..0000000 --- a/dbs/weaviate/onnx_model/onnx_optimizer.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -This script is a modified version of the method shown in this blog post: -https://www.philschmid.de/optimize-sentence-transformers - -It uses the ONNX Runtime to dynamically optimize and quantize a sentence transformers model for better CPU performance. - -Using the quantized version of `sentence-transformers/multi-qa-MiniLM-L6-cos-v1` allows us to: - * Generate similar quality sentence embeddings as the original model, but with a roughly 1.8x speedup in vectorization time - * Reduce the model size from 86 MB to around 63 MB, a roughly 26% reduction in file size -""" -from pathlib import Path - -import torch -import torch.nn.functional as F -from optimum.onnxruntime import ORTModelForCustomTasks, ORTOptimizer, ORTQuantizer -from optimum.onnxruntime.configuration import AutoQuantizationConfig, OptimizationConfig -from sklearn.metrics.pairwise import cosine_similarity -from transformers import AutoModel, AutoTokenizer, Pipeline - - -def mean_pooling(model_output, attention_mask): - token_embeddings = model_output[ - 0 - ] # First element of model_output contains all token embeddings - input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( - input_mask_expanded.sum(1), min=1e-9 - ) - - -class SentenceEmbeddingPipeline(Pipeline): - def _sanitize_parameters(self, **kwargs): - # We don't have any hyperameters to sanitize - preprocess_kwargs = {} - return preprocess_kwargs, {}, {} - - def preprocess(self, inputs): - encoded_inputs = self.tokenizer(inputs, padding=True, truncation=True, return_tensors="pt") - return encoded_inputs - - def _forward(self, model_inputs): - outputs = self.model(**model_inputs) - return {"outputs": outputs, "attention_mask": model_inputs["attention_mask"]} - - def postprocess(self, model_outputs): - # Perform mean pooling - sentence_embeddings = mean_pooling( - model_outputs["outputs"], model_outputs["attention_mask"] - ) - # Normalize embeddings - sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) - return sentence_embeddings - - -def optimize_model(model_id: str, onnx_path: Path) -> None: - """ - Optimize ONNX model for CPU performance - """ - model = ORTModelForCustomTasks.from_pretrained(model_id, export=True) - # Create ORTOptimizer and define optimization configuration - optimizer = ORTOptimizer.from_pretrained(model) - # Save models to local disk - model.save_pretrained(onnx_path) - tokenizer.save_pretrained(onnx_path) - # Set optimization_level = 99 -> enable all optimizations - optimization_config = OptimizationConfig(optimization_level=99) - # Apply the optimization configuration to the model - optimizer.optimize( - optimization_config=optimization_config, - save_dir=onnx_path, - ) - - -def quantize_optimized_model(onnx_path: Path) -> None: - """ - Quantize an already optimized ONNX model for even better CPU performance - """ - # Create ORTQuantizer and define quantization configuration - quantizer = ORTQuantizer.from_pretrained(onnx_path, file_name="model_optimized.onnx") - quantization_config = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=True) - # Apply the quantization configuration to the model - quantizer.quantize( - quantization_config=quantization_config, - save_dir=onnx_path, - ) - - -def generate_similarities(source_sentence: str, sentences: list[str], pipeline: Pipeline) -> None: - source_sentence_embedding = pipeline(source_sentence).tolist()[0] - - for sentence in sentences: - sentence_embedding = pipeline(sentence).tolist()[0] - similarity = cosine_similarity([source_sentence_embedding], [sentence_embedding])[0] - print(f"Similarity between '{source_sentence}' and '{sentence}': {similarity}") - - -def main() -> None: - """ - Generate optimized and quantized ONNX models from a vanilla sentence transformer model - """ - # Init vanilla sentence transformer pipeline - print("---\nLoading vanilla sentence transformer model\n---") - vanilla_pipeline = SentenceEmbeddingPipeline(model=vanilla_model, tokenizer=tokenizer) - # Print out pairwise similarities - generate_similarities(source_sentence, sentences, vanilla_pipeline) - - # Save model to ONNX - Path("onnx").mkdir(exist_ok=True) - onnx_path = Path("onnx") - - # First, dynamically optimize an existing sentence transformer model - optimize_model(model_id, onnx_path) - # Next, dynamically quantize the optimized model - quantize_optimized_model(onnx_path) - - # Init quantized ONNX pipeline - print("---\nLoading quantized ONNX model\n---") - model_filename = "model_optimized_quantized.onnx" - quantized_model = ORTModelForCustomTasks.from_pretrained(onnx_path, file_name=model_filename) - quantized_pipeline = SentenceEmbeddingPipeline(model=quantized_model, tokenizer=tokenizer) - # Print out pairwise similarities - generate_similarities(source_sentence, sentences, quantized_pipeline) - - -if __name__ == "__main__": - # Example sentences we want sentence embeddings for - source_sentence = "I'm very happy" - sentences = ["I am so glad", "I'm so sad", "My dog is missing", "The universe is so vast!"] - - model_id = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1" - # Load AutoModel from huggingface model repository - tokenizer = AutoTokenizer.from_pretrained(model_id) - vanilla_model = AutoModel.from_pretrained(model_id) - - main() diff --git a/dbs/weaviate/requirements-onnx.txt b/dbs/weaviate/requirements-onnx.txt deleted file mode 100644 index 7563cb7..0000000 --- a/dbs/weaviate/requirements-onnx.txt +++ /dev/null @@ -1,86 +0,0 @@ -aiohttp==3.8.4 -aiosignal==1.3.1 -anyio==3.6.2 -async-timeout==4.0.2 -attrs==23.1.0 -Authlib==1.2.0 -catalogue==2.0.8 -certifi==2022.12.7 -cffi==1.15.1 -charset-normalizer==3.1.0 -click==8.1.3 -cmake==3.26.3 -coloredlogs==15.0.1 -cryptography==40.0.2 -datasets==2.11.0 -decorator==5.1.1 -dill==0.3.6 -evaluate==0.4.0 -fastapi==0.95.1 -filelock==3.12.0 -flatbuffers==23.3.3 -frozenlist==1.3.3 -fsspec==2023.4.0 -grpcio==1.54.0 -grpcio-tools==1.48.2 -h11==0.14.0 -h2==4.1.0 -hpack==4.0.0 -httpcore==0.17.0 -httpx==0.24.0 -huggingface-hub==0.13.4 -humanfriendly==10.0 -hyperframe==6.0.1 -idna==3.4 -Jinja2==3.1.2 -joblib==1.2.0 -lit==16.0.1 -MarkupSafe==2.1.2 -mpmath==1.3.0 -multidict==6.0.4 -multiprocess==0.70.14 -networkx==3.1 -nltk==3.8.1 -numpy==1.24.2 -onnx==1.13.1 -onnxruntime==1.14.1 -optimum==1.8.2 -packaging==23.1 -pandas==2.0.0 -Pillow==9.5.0 -protobuf==3.20.2 -pyarrow==11.0.0 -pycparser==2.21 -pydantic==1.10.7 -python-dateutil==2.8.2 -python-dotenv==1.0.0 -pytz==2023.3 -PyYAML==6.0 -qdrant-client==1.1.5 -regex==2023.3.23 -requests==2.28.2 -responses==0.18.0 -scikit-learn==1.2.2 -scipy==1.10.1 -sentence-transformers==2.2.2 -sentencepiece==0.1.98 -six==1.16.0 -sniffio==1.3.0 -srsly==2.4.6 -starlette==0.26.1 -sympy==1.11.1 -threadpoolctl==3.1.0 -tokenizers==0.13.3 -torch==2.0.0 -torchvision==0.15.1 -tqdm==4.65.0 -transformers==4.28.1 -triton==2.0.0 -typing_extensions==4.5.0 -tzdata==2023.3 -urllib3==1.26.15 -uvicorn==0.21.1 -validators==0.20.0 -weaviate-client==3.16.2 -xxhash==3.2.0 -yarl==1.9.1 diff --git a/dbs/weaviate/requirements.txt b/dbs/weaviate/requirements.txt index 7d53e6d..84a5277 100644 --- a/dbs/weaviate/requirements.txt +++ b/dbs/weaviate/requirements.txt @@ -1,9 +1,12 @@ -weaviate-client>=3.16.2 -transformers==4.28.1 -sentence-transformers==2.2.2 -pydantic[dotenv]>=1.10.7, <2.0.0 -fastapi>=0.95.0, <1.0.0 +weaviate-client~=3.22.0 +transformers~=4.28.0 +sentence-transformers~=2.2.0 +pydantic~=2.0.0 +pydantic-settings~=2.0.0 +python-dotenv>=1.0.0 +fastapi~=0.100.0 httpx>=0.24.0 aiohttp>=3.8.4 +uvloop>=0.17.0 uvicorn>=0.21.0, <1.0.0 srsly>=2.4.6 \ No newline at end of file diff --git a/dbs/weaviate/schemas/wine.py b/dbs/weaviate/schemas/wine.py index 0aaf930..3b321b2 100644 --- a/dbs/weaviate/schemas/wine.py +++ b/dbs/weaviate/schemas/wine.py @@ -1,27 +1,13 @@ -from pydantic import BaseModel, root_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator class Wine(BaseModel): - id: int - points: int - title: str - description: str | None - price: float | None - variety: str | None - winery: str | None - vineyard: str | None - country: str | None - province: str | None - region_1: str | None - region_2: str | None - taster_name: str | None - taster_twitter_handle: str | None - - class Config: - extra = "allow" - allow_population_by_field_name = True - validate_assignment = True - schema_extra = { + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + extra="allow", + str_strip_whitespace=True, + json_schema_extra={ "example": { "id": 45100, "points": 85, @@ -38,17 +24,25 @@ class Config: "taster_name": "Michael Schachner", "taster_twitter_handle": "@wineschach", } - } + }, + ) - @root_validator(pre=True) - def _get_vineyard(cls, values): - "Rename designation to vineyard" - vineyard = values.pop("designation", None) - if vineyard: - values["vineyard"] = vineyard.strip() - return values + id: int + points: int + title: str + description: str | None + price: float | None + variety: str | None + winery: str | None + vineyard: str | None = Field(..., alias="designation") + country: str | None + province: str | None + region_1: str | None + region_2: str | None + taster_name: str | None + taster_twitter_handle: str | None - @root_validator + @model_validator(mode="before") def _fill_country_unknowns(cls, values): "Fill in missing country values with 'Unknown', as we always want this field to be queryable" country = values.get("country") @@ -56,7 +50,7 @@ def _fill_country_unknowns(cls, values): values["country"] = "Unknown" return values - @root_validator + @model_validator(mode="before") def _add_to_vectorize_fields(cls, values): "Add a field to_vectorize that will be used to create sentence embeddings" variety = values.get("variety", "") diff --git a/dbs/weaviate/scripts/bulk_index_onnx.py b/dbs/weaviate/scripts/bulk_index_onnx.py index 8f6ce42..49df478 100644 --- a/dbs/weaviate/scripts/bulk_index_onnx.py +++ b/dbs/weaviate/scripts/bulk_index_onnx.py @@ -12,7 +12,6 @@ from dotenv import load_dotenv from optimum.onnxruntime import ORTModelForCustomTasks from optimum.pipelines import pipeline -from pydantic.main import ModelMetaclass from tqdm import tqdm from transformers import AutoTokenizer from weaviate.client import Client @@ -60,10 +59,9 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: def validate( data: list[JsonBlob], - model: ModelMetaclass, exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] return validated_data @@ -99,7 +97,7 @@ def add_vectors_to_index(data_chunk: tuple[JsonBlob, ...]) -> None: HOST = settings.weaviate_host PORT = settings.weaviate_port client = weaviate.Client(f"http://{HOST}:{PORT}") - data = validate(data_chunk, Wine, exclude_none=True) + data = validate(data_chunk, exclude_none=True) # Preload optimized, quantized ONNX sentence transformers model # NOTE: This requires that the script ../onnx_model/onnx_optimizer.py has been run beforehand diff --git a/dbs/weaviate/scripts/bulk_index_sbert.py b/dbs/weaviate/scripts/bulk_index_sbert.py index 94a663b..12d8a34 100644 --- a/dbs/weaviate/scripts/bulk_index_sbert.py +++ b/dbs/weaviate/scripts/bulk_index_sbert.py @@ -9,7 +9,6 @@ import srsly import weaviate from dotenv import load_dotenv -from pydantic.main import ModelMetaclass from sentence_transformers import SentenceTransformer from weaviate.client import Client @@ -56,10 +55,9 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]: def validate( data: list[JsonBlob], - model: ModelMetaclass, exclude_none: bool = False, ) -> list[JsonBlob]: - validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data] + validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data] return validated_data @@ -84,7 +82,7 @@ def add_vectors_to_index(data_chunk: tuple[JsonBlob, ...]) -> None: HOST = settings.weaviate_host PORT = settings.weaviate_port client = weaviate.Client(f"http://{HOST}:{PORT}") - data = validate(data_chunk, Wine, exclude_none=True) + data = validate(data_chunk, exclude_none=True) # Load a sentence transformer model for semantic similarity from a specified checkpoint model_id = get_settings().embedding_model_checkpoint