Skip to content

Commit

Permalink
re-add scheduled tasks with apscheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
zaironjacobs committed Aug 20, 2024
1 parent 25a3634 commit 7d91350
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 36 deletions.
10 changes: 5 additions & 5 deletions harmony_api/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class Settings(BaseSettings):
# General harmony_api config
VERSION: str = "2.0"
APP_TITLE: str = "Harmony API"
TIKA_ENDPOINT: str = os.getenv("TIKA_ENDPOINT", "")
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY")
AZURE_OPENAI_API_KEY: str = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT: str = os.getenv("AZURE_OPENAI_ENDPOINT")
TIKA_ENDPOINT: str = os.getenv("TIKA_ENDPOINT", "http://tika:9998")
OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY")
AZURE_OPENAI_API_KEY: str | None = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_ENDPOINT: str | None = os.getenv("AZURE_OPENAI_ENDPOINT")
GOOGLE_APPLICATION_CREDENTIALS: dict = GOOGLE_APPLICATION_CREDENTIALS


Expand Down Expand Up @@ -79,7 +79,7 @@ class ProdSettings(Settings):
}


def get_settings():
def get_settings() -> Union[DevSettings | ProdSettings]:
env = os.getenv("STAGE", "dev")
settings_type = {
"dev": DevSettings(),
Expand Down
32 changes: 22 additions & 10 deletions harmony_api/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,44 @@
SOFTWARE.
"""

from fastapi.concurrency import run_in_threadpool
# from rocketry import Rocketry
# from rocketry.conds import cron
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

from harmony_api.services.instruments_cache import InstrumentsCache
from harmony_api.services.vectors_cache import VectorsCache

# app = Rocketry(executation="async")
# Jobstores
jobstores = {
"default": MemoryJobStore()
}

# Executors
executors = {
"default": ThreadPoolExecutor(),
}

# @app.task(cron("0 */12 * * *"))
async def do_every_12th_hour():
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone="UTC")


@scheduler.scheduled_job(
"cron", year="*", month="*", day="*", hour="*/12", minute="0", second="0"
)
def do_every_12th_hour():
"""
Save the caches to disk every 12th hour.
Save the caches to disk.
Runs at minute 0 past every 12th hour
Runs at minute 0 past every 12th hour.
"""

# Save instruments cache to disk
try:
await run_in_threadpool(InstrumentsCache().save)
InstrumentsCache().save()
except (Exception,) as e:
print(f"Could not save instruments cache: {str(e)}.")

# Save vectors cache to disk
try:
await run_in_threadpool(VectorsCache().save)
VectorsCache().save()
except (Exception,) as e:
print(f"Could not save vectors cache: {str(e)}.")
4 changes: 2 additions & 2 deletions harmony_api/services/instruments_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __load(self):
# Dict to instruments
cache_parsed: dict[str, List[Instrument]] = {}
for key, value in cache.items():
instruments = [Instrument.parse_obj(x) for x in value]
instruments = [Instrument.model_validate(x) for x in value]
cache_parsed[key] = instruments

self.__cache = cache_parsed
Expand Down Expand Up @@ -113,7 +113,7 @@ def save(self):
# Instruments to dict
cache_parsed: dict[str, List] = {}
for key, value in self.__cache.items():
instruments = [x.model_dump(mode="json")() for x in value]
instruments = [x.model_dump(mode="json") for x in value]
cache_parsed[key] = instruments

with open(cache_file_path, "w", encoding="utf8") as file:
Expand Down
33 changes: 14 additions & 19 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
sys.path.append("./harmony/src")

import asyncio
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI
Expand All @@ -40,7 +41,7 @@
from harmony_api.routers.info_router import router as info_router
from harmony_api.routers.text_router import router as text_router
from harmony_api.services.instruments_cache import InstrumentsCache
# from harmony_api.scheduler import app as app_rocketry
from harmony_api.scheduler import scheduler
from harmony_api.services.vectors_cache import VectorsCache

description = """
Expand All @@ -50,10 +51,18 @@
You can try Harmony at <a href="https://harmonydata.ac.uk/app">harmonydata.ac.uk/harmony_api</a> and you can read our blog at <a href="https://harmonydata.ac.uk">harmonydata.ac.uk</a>.
"""


@asynccontextmanager
async def lifespan(_: FastAPI):
scheduler.start()

yield

app_fastapi = FastAPI(
title=settings.APP_TITLE,
description=description,
version=settings.VERSION,
lifespan=lifespan,
docs_url="/docs",
contact={
"name": "Thomas Wood",
Expand Down Expand Up @@ -82,25 +91,13 @@
app_fastapi.include_router(info_router, tags=["Info"])


class Server(uvicorn.Server):
"""
Custom uvicorn.Server
Override signals and include Rocketry
"""

def handle_exit(self, sig: int, frame):
# app_rocketry.session.shut_down()

return super().handle_exit(sig, frame)


async def main():
# Load cache
print("INFO:\t Loading cache...")
InstrumentsCache()
VectorsCache()

server = Server(
server = uvicorn.Server(
config=uvicorn.Config(
app=app_fastapi,
host=settings.SERVER_HOST,
Expand All @@ -112,12 +109,10 @@ async def main():
)

api = asyncio.create_task(server.serve())
# scheduler = asyncio.create_task(app_rocketry.serve())

# Start both applications (FastAPI & Rocketry)
print("INFO:\t Starting applications...")
# await asyncio.wait([api, scheduler])
await asyncio.wait(api)
# Start FastAPI
print("INFO:\t Starting application...")
await asyncio.wait([api])


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ vertexai==1.49.0
numpy==1.26.4
sklearn-crfsuite==0.5.0
scikit-learn==1.5.0
APScheduler==3.10.4

0 comments on commit 7d91350

Please sign in to comment.