Skip to content

Commit

Permalink
Merge branch 'main' into lancedb
Browse files Browse the repository at this point in the history
  • Loading branch information
prrao87 committed Sep 23, 2023
2 parents 1383714 + d36089f commit bea9ed3
Show file tree
Hide file tree
Showing 64 changed files with 749 additions and 1,442 deletions.
35 changes: 21 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
# Async database transactions and FastAPI
# DBHub

This repo aims to provide working code and reproducible setups for asynchronous data ingestion and querying from numerous databases via Python. Wherever possible, data is ingested to a database via their supported async Python drivers, and the data is also queried in async fashion on top of FastAPI endpoints.
## Boilerplate for async ingestion and querying of DBs

Example code is provided for numerous databases, along with FastAPI docker deployments that allow you to easily supply complex query results to downstream applications.
This repo aims to provide working code and reproducible setups for bulk data ingestion and querying from numerous databases via their Python clients. Wherever possible, async database client APIs are utilized for data ingestion. The query interface to the data is exposed via async FastAPI endpoints. To enable reproducibility across environments, Dockerfiles are provided as well.

#### Currently implemented
The `docker-compose.yml` does the following:
1. Set up a local DB server in a container
2. Set up local volume mounts to persist the data
3. Set up a FastAPI server in another container
4. Set up a network bridge such that the DB server can be accessed from the FastAPI server
5. Tear down all the containers once development and testing is complete

### Currently implemented
* Neo4j
* Elasticsearch
* Meilisearch
* Qdrant
* Weaviate

#### 🚧 Coming soon
### 🚧 In the pipeline
* LanceDB
* SurrealDB
* MongoDB


## Goals

The primary aim is to compare the data ingestion and query performance of various databases that can be used for a host of downstream use cases. Two use cases are of particular interest:
The main goals of this repo are explained as follows.

1. We may want to expose (potentially sensitive) data to downstream client applications, so building an API on top of the database can be a very useful tool to share the data in a controlled manner
1. **Ease of setup**: There are tons of databases and client APIs out there, so it's useful to have a clean, efficient and reproducible workflow to experiment with a range of datasets, as well as databases for the problem at hand.

2. Databases or data stores in general can be important "sources of truth" for contextual querying via LLMs like ChatGPT, allowing us to ground our model's results with factual data. APIs allow us to add another layer to simplify querying a host of backends in a way that doesn't rely on the LLM learning a specific query language.
2. **Ease of distribution**: We may want to expose (potentially sensitive) data to downstream client applications, so building an API on top of the database can be a very useful tool to share the data in a controlled manner

In general, it's useful to have a clean, efficient and reproducible workflow to experiment with each database in question.
3. **Ease of testing advanced use cases**: Search databases (either full-text keyword search or vector DBs) can be important "sources of truth" for contextual querying via LLMs like ChatGPT, allowing us to ground our model's results with factual data.


## Pre-requisites

Install Docker and the latest version of Python (3.11+), as recent syntactic improvements in Python are extensively utilized in the code provided.

## About the dataset

The [dataset provided](https://github.com/prrao87/async-db-fastapi/tree/main/data) in this repo is a formatted version of the version obtained from Kaggle datasets. Full credit is due to [the original author](https://www.kaggle.com/zynicide) via Kaggle for curating this dataset.
* Python 3.10+
* Docker
* A passion to learn more about and experiment with databases!
2 changes: 1 addition & 1 deletion dbs/elasticsearch/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ELASTIC_SERVICE = "elasticsearch"
API_PORT = 8002

# Container image tag
TAG = "0.1.0"
TAG = "0.2.0"

# Docker project namespace (defaults to the current folder name if not set)
COMPOSE_PROJECT_NAME = elastic_wine
1 change: 0 additions & 1 deletion dbs/elasticsearch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ COPY ./requirements.txt /wine/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /wine/requirements.txt

COPY ./api /wine/api
COPY ./schemas /wine/schemas

EXPOSE 8000
10 changes: 6 additions & 4 deletions dbs/elasticsearch/api/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from pydantic import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
extra="allow",
)

elastic_service: str
elastic_user: str
elastic_password: str
elastic_url: str
elastic_port: int
elastic_index_alias: str
tag: str

class Config:
env_file = ".env"
3 changes: 2 additions & 1 deletion dbs/elasticsearch/api/routers/rest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from elasticsearch import AsyncElasticsearch
from fastapi import APIRouter, HTTPException, Query, Request
from schemas.retriever import (

from api.schemas.rest import (
CountByCountry,
FullTextSearch,
TopWinesByCountry,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict


class FullTextSearch(BaseModel):
id: int
country: str
title: str
description: str | None
points: int
price: float | str | None
variety: str | None
winery: str | None

class Config:
schema_extra = {
model_config = ConfigDict(
json_schema_extra={
"example": {
"id": 3845,
"country": "Italy",
Expand All @@ -24,6 +15,16 @@ class Config:
"winery": "Castellinuzza e Piuca",
}
}
)

id: int
country: str
title: str
description: str | None
points: int
price: float | str | None
variety: str | None
winery: str | None


class TopWinesByCountry(BaseModel):
Expand All @@ -36,9 +37,6 @@ class TopWinesByCountry(BaseModel):
variety: str | None
winery: str | None

class Config:
validate_assignment = True


class TopWinesByProvince(BaseModel):
id: int
Expand All @@ -51,9 +49,6 @@ class TopWinesByProvince(BaseModel):
variety: str | None
winery: str | None

class Config:
validate_assignment = True


class MostWinesByVariety(BaseModel):
country: str
Expand Down
8 changes: 5 additions & 3 deletions dbs/elasticsearch/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
elasticsearch>=8.7.0
pydantic[dotenv]>=1.10.7, <2.0.0
fastapi>=0.95.0, <1.0.0
elasticsearch~=8.7.0
pydantic~=2.0.0
pydantic-settings~=2.0.0
python-dotenv>=1.0.0
fastapi~=0.100.0
httpx>=0.24.0
aiohttp>=3.8.4
uvicorn>=0.21.0, <1.0.0
Expand Down
8 changes: 3 additions & 5 deletions dbs/elasticsearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
import srsly
from dotenv import load_dotenv
from elasticsearch import AsyncElasticsearch, helpers
from pydantic.main import ModelMetaclass
from schemas.wine import Wine

sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1]))
from api.config import Settings
from schemas.wine import Wine

load_dotenv()
# Custom types
Expand Down Expand Up @@ -59,15 +58,14 @@ def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]:

def validate(
data: tuple[JsonBlob],
model: ModelMetaclass,
exclude_none: bool = False,
) -> list[JsonBlob]:
validated_data = [model(**item).dict(exclude_none=exclude_none) for item in data]
validated_data = [Wine(**item).dict(exclude_none=exclude_none) for item in data]
return validated_data


def process_chunks(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]:
validated_data = validate(data, Wine, exclude_none=True)
validated_data = validate(data, exclude_none=True)
return validated_data


Expand Down
File renamed without changes.
80 changes: 80 additions & 0 deletions dbs/elasticsearch/scripts/schemas/wine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from pydantic import BaseModel, ConfigDict, Field, model_validator


class Wine(BaseModel):
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
extra="allow",
str_strip_whitespace=True,
json_schema_extra={
"example": {
"id": 45100,
"points": 85,
"title": "Balduzzi 2012 Reserva Merlot (Maule Valley)",
"description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.",
"price": 10.0,
"variety": "Merlot",
"winery": "Balduzzi",
"vineyard": "Reserva",
"country": "Chile",
"province": "Maule Valley",
"region_1": "null",
"region_2": "null",
"taster_name": "Michael Schachner",
"taster_twitter_handle": "@wineschach",
}
},
)

id: int
points: int
title: str
description: str | None
price: float | None
variety: str | None
winery: str | None
vineyard: str | None = Field(..., alias="designation")
country: str | None
province: str | None
region_1: str | None
region_2: str | None
taster_name: str | None
taster_twitter_handle: str | None

@model_validator(mode="before")
def _fill_country_unknowns(cls, values):
"Fill in missing country values with 'Unknown', as we always want this field to be queryable"
country = values.get("country")
if country is None or country == "null":
values["country"] = "Unknown"
return values

@model_validator(mode="before")
def _create_id(cls, values):
"Create an _id field because Elastic needs this to store as primary key"
values["_id"] = values["id"]
return values


if __name__ == "__main__":
data = {
"id": 45100,
"points": 85,
"title": "Balduzzi 2012 Reserva Merlot (Maule Valley)",
"description": "Ripe in color and aromas, this chunky wine delivers heavy baked-berry and raisin aromas in front of a jammy, extracted palate. Raisin and cooked berry flavors finish plump, with earthy notes.",
"price": 10, # Test if field is cast to float
"variety": "Merlot",
"winery": "Balduzzi",
"designation": "Reserva", # Test if field is renamed
"country": "null", # Test unknown country
"province": " Maule Valley ", # Test if field is stripped
"region_1": "null",
"region_2": "null",
"taster_name": "Michael Schachner",
"taster_twitter_handle": "@wineschach",
}
from pprint import pprint

wine = Wine(**data)
pprint(wine.model_dump(), sort_dicts=False)
4 changes: 2 additions & 2 deletions dbs/meilisearch/.env.example
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Master key must be at least 16 bytes, composed of valid UTF-8 characters
MEILI_MASTER_KEY = ""
MEILI_VERSION = "v1.1.1"
MEILI_VERSION = "v1.2.0"
MEILI_PORT = 7700
MEILI_URL = "localhost"
MEILI_SERVICE = "meilisearch"
API_PORT = 8003

# Container image tag
TAG = "0.1.0"
TAG = "0.2.0"

# Docker project namespace (defaults to the current folder name if not set)
COMPOSE_PROJECT_NAME = meili_wine
1 change: 0 additions & 1 deletion dbs/meilisearch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ COPY ./requirements.txt /wine/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /wine/requirements.txt

COPY ./api /wine/api
COPY ./schemas /wine/schemas

EXPOSE 8000
10 changes: 6 additions & 4 deletions dbs/meilisearch/api/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from pydantic import BaseSettings
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
extra="allow",
)

meili_service: str
meili_master_key: str
meili_port: int
meili_url: str
tag: str

class Config:
env_file = ".env"
1 change: 1 addition & 0 deletions dbs/meilisearch/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async def get_search_api_key(settings) -> str:
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Search for wines by keyword phrase using Meilisearch
settings = get_settings()
print(settings)
search_key = await get_search_api_key(settings)
URI = f"http://{settings.meili_service}:{settings.meili_port}"
async with Client(URI, search_key) as client:
Expand Down
6 changes: 4 additions & 2 deletions dbs/meilisearch/api/routers/rest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from meilisearch_python_async import Client
from fastapi import APIRouter, HTTPException, Query, Request
from schemas.retriever import (
from meilisearch_python_async import Client

from api.schemas.rest import (
FullTextSearch,
TopWinesByCountry,
TopWinesByProvince,
Expand Down Expand Up @@ -102,6 +103,7 @@ async def _top_by_country(
sort=["points:desc", "price:asc"],
)
if response:
print(response.hits)
return response.hits
return None

Expand Down
Empty file.
Loading

0 comments on commit bea9ed3

Please sign in to comment.