From dd460574d6a78f40ef62adb92fec23a56927641d Mon Sep 17 00:00:00 2001 From: iameru Date: Wed, 22 Nov 2023 12:30:34 +0100 Subject: [PATCH 1/3] add fastapi_utils dependency --- server/poetry.lock | 18 +++++++++++++++++- server/pyproject.toml | 1 + 2 files changed, 18 insertions(+), 1 deletion(-) mode change 100644 => 100755 server/pyproject.toml diff --git a/server/poetry.lock b/server/poetry.lock index cdc57b66..c8a1949b 100644 --- a/server/poetry.lock +++ b/server/poetry.lock @@ -546,6 +546,22 @@ dev = ["pre-commit (>=2.17.0,<3.0.0)", "ruff (==0.0.138)", "uvicorn[standard] (> doc = ["mdx-include (>=1.4.1,<2.0.0)", "mkdocs (>=1.1.2,<2.0.0)", "mkdocs-markdownextradata-plugin (>=0.1.7,<0.3.0)", "mkdocs-material (>=8.1.4,<9.0.0)", "pyyaml (>=5.3.1,<7.0.0)", "typer-cli (>=0.0.13,<0.0.14)", "typer[all] (>=0.6.1,<0.8.0)"] test = ["anyio[trio] (>=3.2.1,<4.0.0)", "black (==23.1.0)", "coverage[toml] (>=6.5.0,<8.0)", "databases[sqlite] (>=0.3.2,<0.7.0)", "email-validator (>=1.1.1,<2.0.0)", "flask (>=1.1.2,<3.0.0)", "httpx (>=0.23.0,<0.24.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.982)", "orjson (>=3.2.1,<4.0.0)", "passlib[bcrypt] (>=1.7.2,<2.0.0)", "peewee (>=3.13.3,<4.0.0)", "pytest (>=7.1.3,<8.0.0)", "python-jose[cryptography] (>=3.3.0,<4.0.0)", "python-multipart (>=0.0.5,<0.0.7)", "pyyaml (>=5.3.1,<7.0.0)", "ruff (==0.0.138)", "sqlalchemy (>=1.3.18,<1.4.43)", "types-orjson (==3.6.2)", "types-ujson (==5.7.0.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0,<6.0.0)"] +[[package]] +name = "fastapi-utils" +version = "0.2.1" +description = "Reusable utilities for FastAPI" +optional = false +python-versions = ">=3.6,<4.0" +files = [ + {file = "fastapi-utils-0.2.1.tar.gz", hash = "sha256:0e6c7fc1870b80e681494957abf65d4f4f42f4c7f70005918e9181b22f1bd759"}, + {file = "fastapi_utils-0.2.1-py3-none-any.whl", hash = "sha256:dd0be7dc7f03fa681b25487a206651d99f2330d5a567fb8ab6cb5f8a06a29360"}, +] + +[package.dependencies] +fastapi = "*" +pydantic = ">=1.0,<2.0" +sqlalchemy = ">=1.3.12,<2.0.0" + [[package]] name = "flake8" version = "4.0.1" @@ -2493,4 +2509,4 @@ specs = ["eralchemy2"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "1a36f59acb9a756f829d2ec11305d160ea9faf674b8aec7a20fe7b36810907da" +content-hash = "8d5f3efa68c146a7f2ee5a73097f0656435dc9bedcc497a7a871436f4bcfa1b4" diff --git a/server/pyproject.toml b/server/pyproject.toml old mode 100644 new mode 100755 index e04ee867..32539d05 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -42,6 +42,7 @@ canonicaljson = "^2.0.0" phonenumbers = "^8.13.22" pytz = "^2023.3.post1" alembic = "^1.12.1" +fastapi-utils = "^0.2.1" [tool.poetry.group.dev.dependencies] flake8 = "^4.0.1" From f59dc271c5524c758c095cd5497a0ea77e5ef451 Mon Sep 17 00:00:00 2001 From: iameru Date: Wed, 22 Nov 2023 13:27:49 +0100 Subject: [PATCH 2/3] Basic infrastructure for scheduling tasks --- server/dearmep/main.py | 4 ++++ server/dearmep/schedules/__init__.py | 11 +++++++++++ server/dearmep/schedules/calls.py | 11 +++++++++++ 3 files changed, 26 insertions(+) create mode 100644 server/dearmep/schedules/__init__.py create mode 100644 server/dearmep/schedules/calls.py diff --git a/server/dearmep/main.py b/server/dearmep/main.py index 4af781c2..e53e4913 100644 --- a/server/dearmep/main.py +++ b/server/dearmep/main.py @@ -8,6 +8,7 @@ from starlette_exporter.optional_metrics import request_body_size, \ response_body_size +from . import schedules from . import __version__, markdown_files, static_files from .api import v1 as api_v1 from .phone import elks @@ -72,4 +73,7 @@ def create_app(config_dict: Optional[dict] = None) -> FastAPI: require_operation_id(app) + for task in schedules.tasks: + app.on_event("startup")(task) + return app diff --git a/server/dearmep/schedules/__init__.py b/server/dearmep/schedules/__init__.py new file mode 100644 index 00000000..a18b47ca --- /dev/null +++ b/server/dearmep/schedules/__init__.py @@ -0,0 +1,11 @@ +from . import calls + +# List of tasks to be initiated on app startup via scheduler. +tasks = [ + calls.build_queue, + calls.work_queue, +] + +__all__ = [ + "tasks", +] diff --git a/server/dearmep/schedules/calls.py b/server/dearmep/schedules/calls.py new file mode 100644 index 00000000..15e22c32 --- /dev/null +++ b/server/dearmep/schedules/calls.py @@ -0,0 +1,11 @@ +from fastapi_utils.tasks import repeat_every + + +@repeat_every(seconds=6.1, wait_first=True) +def build_queue(): + pass + + +@repeat_every(seconds=7.11, wait_first=False) +def work_queue(): + pass From fd50744d16cd7c2fa2a9caf7fd49227dcbc90e8d Mon Sep 17 00:00:00 2001 From: iameru Date: Thu, 23 Nov 2023 17:45:05 +0100 Subject: [PATCH 3/3] Scheduler loading at startup, configured with config.yml * uses `lifespan` now to handle starting up tasks --- doc/scheduler.md | 47 ++++++++++++++++++++++++++++ server/dearmep/config.py | 23 ++++++++++++++ server/dearmep/example-config.yaml | 18 ++++++++++- server/dearmep/main.py | 15 ++++++--- server/dearmep/schedules/__init__.py | 41 +++++++++++++++++++----- server/dearmep/schedules/calls.py | 9 ++---- 6 files changed, 133 insertions(+), 20 deletions(-) create mode 100644 doc/scheduler.md diff --git a/doc/scheduler.md b/doc/scheduler.md new file mode 100644 index 00000000..575eb3f4 --- /dev/null +++ b/doc/scheduler.md @@ -0,0 +1,47 @@ +# Scheduler + +The scheduler handles background tasks of the application. It is logically +seperated by "modules" with each module holding tasks. + +## Module + +Currently implemented `modules` are: + +* `calls` + +## Task Arguments + +Example: +```yml +interval: 20.2 # in seconds, Optional as Tasks have their own default values +wait_first: false # true(default) or false, Optional +``` + +### interval + +Time in seconds after which the task is executed again. + +### wait_first + +Optional. Defaults to `false`. If set to `true` the task's first execution is +after `seconds`, not immediately after startup. + +## Calls Module + +The `calls` module allows for scheduled, call relevant functions to be executed +regularly. + +Currently supports the tasks: + +* `build_queue` +* `handle_queue` + +### build_queue + +This task builds a queue of Users who have given the information that they +wanted to be called at the current time. + +### handle_queue + +This task checks the queue created by `build_queue` and makes a single phone +call to the first user in the queue. diff --git a/server/dearmep/config.py b/server/dearmep/config.py index 8b3696e8..a26a88b4 100644 --- a/server/dearmep/config.py +++ b/server/dearmep/config.py @@ -326,6 +326,28 @@ class RecommenderConfig(BaseModel): n_clear_feedback_threshold: int = Field(ge=0, default=8) +class SchedulerTaskConfig(BaseModel): + interval: float + wait_first: bool = True + + +class SchedulerCallBuildQueueConfig(SchedulerTaskConfig): + interval: float = 30.2 + + +class SchedulerCallHandleQueueConfig(SchedulerTaskConfig): + interval: float = 33.3 + + +class SchedulerCallConfig(BaseModel): + build_queue: Optional[SchedulerCallBuildQueueConfig] + handle_queue: Optional[SchedulerCallHandleQueueConfig] + + +class SchedulerConfig(BaseModel): + calls: Optional[SchedulerCallConfig] + + class Config(BaseModel): """The main application configuration supplied via the config file.""" api: APIConfig @@ -336,6 +358,7 @@ class Config(BaseModel): l10n: L10nConfig telephony: TelephonyConfig recommender: RecommenderConfig + scheduler: Optional[SchedulerConfig] _instance: ClassVar[Optional["Config"]] = None _patch: ClassVar[Optional[Dict]] = None diff --git a/server/dearmep/example-config.yaml b/server/dearmep/example-config.yaml index d29eb88b..6e23f094 100644 --- a/server/dearmep/example-config.yaml +++ b/server/dearmep/example-config.yaml @@ -1,4 +1,4 @@ -# Options ending in `_duration` or `_timeout` (or simply named `duration` or +# Options ending in `_duration`, `_interval` or `_timeout` (or simply named `duration`, `interval` or # `timeout`) should be specified in seconds, unless noted otherwise. # Options ending in `_limit` (or simply named `limit`) should contain a string @@ -408,6 +408,22 @@ recommender: # https://www.wolframalpha.com/input?i=plot+1%2F%281%28abs%28x%2F%28N*8%29%29*3%29%5E4+%2B1%29+for+-40%3C%3Dx%3C%3D40%2C+N%3D10 n_clear_feedback_threshold: 8 +# The Scheduler is responsible for scheduling background tasks of the +# application, for example to initiate calls or cleaning up of database. +# Check the `doc/scheduler.md` file for more information. +scheduler: + calls: + # This builds a queue of calls to be made automatically from the saved + # schedules of Users. + build_queue: + # time in seconds after which the task is executed again + interval: 30.2 # this is the default value for this task + # wait_first: false means that the task is executed immediately after + # startup. + wait_first: false + # This initiates the next call in the queue. + handle_queue: + interval: 33.3 # this is the default value for this task # Localization options. l10n: diff --git a/server/dearmep/main.py b/server/dearmep/main.py index e53e4913..012ac8c0 100644 --- a/server/dearmep/main.py +++ b/server/dearmep/main.py @@ -1,6 +1,7 @@ import logging from typing import Optional +from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute @@ -8,8 +9,7 @@ from starlette_exporter.optional_metrics import request_body_size, \ response_body_size -from . import schedules -from . import __version__, markdown_files, static_files +from . import __version__, markdown_files, schedules, static_files from .api import v1 as api_v1 from .phone import elks from .config import APP_NAME, Config @@ -51,9 +51,17 @@ def create_app(config_dict: Optional[dict] = None) -> FastAPI: else: config = Config.load_dict(config_dict) + @asynccontextmanager + async def lifespan(app: FastAPI): + for task in schedules.get_background_tasks(config): + _logger.info(f"Loading background Task: {task.__name__}") + await task() + yield + app = FastAPI( title=APP_NAME, version=__version__, + lifespan=lifespan, ) setup_cors(app, config) @@ -73,7 +81,4 @@ def create_app(config_dict: Optional[dict] = None) -> FastAPI: require_operation_id(app) - for task in schedules.tasks: - app.on_event("startup")(task) - return app diff --git a/server/dearmep/schedules/__init__.py b/server/dearmep/schedules/__init__.py index a18b47ca..69eb1620 100644 --- a/server/dearmep/schedules/__init__.py +++ b/server/dearmep/schedules/__init__.py @@ -1,11 +1,38 @@ -from . import calls +from typing import Callable, List, Tuple + +from fastapi_utils.tasks import repeat_every + +from .calls import build_queue, handle_queue +from ..config import Config, SchedulerTaskConfig + +SchedulerTask = Callable[[], None] + + +def get_background_tasks(config: Config): + """ + Returns a list of configured background tasks to be run at startup. + """ + tasks: List[Tuple[SchedulerTaskConfig, SchedulerTask]] = [] + + if not config.scheduler: + return [] + + if config.scheduler.calls: + # We add our tasks to the list of tasks to be run at startup if we find + # their config. + if (build_queue_cfg := config.scheduler.calls.build_queue): + tasks.append((build_queue_cfg, build_queue)) + + if (handle_queue_cfg := config.scheduler.calls.handle_queue): + tasks.append((handle_queue_cfg, handle_queue)) + + return [ + repeat_every( + seconds=cfg.interval, + wait_first=cfg.wait_first, + )(func) for cfg, func in tasks] -# List of tasks to be initiated on app startup via scheduler. -tasks = [ - calls.build_queue, - calls.work_queue, -] __all__ = [ - "tasks", + "get_background_tasks", ] diff --git a/server/dearmep/schedules/calls.py b/server/dearmep/schedules/calls.py index 15e22c32..f3d27430 100644 --- a/server/dearmep/schedules/calls.py +++ b/server/dearmep/schedules/calls.py @@ -1,11 +1,6 @@ -from fastapi_utils.tasks import repeat_every - - -@repeat_every(seconds=6.1, wait_first=True) -def build_queue(): +def build_queue() -> None: pass -@repeat_every(seconds=7.11, wait_first=False) -def work_queue(): +def handle_queue() -> None: pass