From 6069a18b38e6c661e4fd3e9f734d4109f20306cd Mon Sep 17 00:00:00 2001 From: iameru Date: Fri, 1 Dec 2023 14:26:52 +0100 Subject: [PATCH] Add queue scheduler * add metric for queues * wrap the scheduled tasks to handle exceptions * build queue function * handle queue function * relevant queries for scheduler functionality --- server/dearmep/database/models.py | 4 +- server/dearmep/database/query.py | 66 ++++++++++++++++++++++++++-- server/dearmep/schedules/__init__.py | 32 +++++++++++++- server/dearmep/schedules/calls.py | 44 ++++++++++++++++++- 4 files changed, 136 insertions(+), 10 deletions(-) diff --git a/server/dearmep/database/models.py b/server/dearmep/database/models.py index 4d140fa8..5bac9c72 100644 --- a/server/dearmep/database/models.py +++ b/server/dearmep/database/models.py @@ -4,8 +4,8 @@ from uuid import uuid4 from pydantic import UUID4, BaseModel -from sqlmodel import Column, Enum, Field, JSON, Relationship, \ - SQLModel, String, TIMESTAMP, UniqueConstraint, and_, case, or_, func, text +from sqlmodel import Column, Enum, Field, JSON, Relationship, SQLModel, \ + String, TIMESTAMP, UniqueConstraint, and_, case, or_, func, text from ..config import Config, ConfigNotLoaded, Language from ..models import CountryCode, FeedbackConvinced, FeedbackText, \ diff --git a/server/dearmep/database/query.py b/server/dearmep/database/query.py index 6c450db7..5ea01961 100644 --- a/server/dearmep/database/query.py +++ b/server/dearmep/database/query.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import Callable, Dict, List, NamedTuple, Optional, Union, cast from secrets import randbelow import re @@ -20,7 +20,7 @@ from .connection import Session, select from .models import Blob, BlobID, Destination, DestinationID, \ DestinationSelectionLog, DestinationSelectionLogEvent, MediaList, \ - NumberVerificationRequest, ScheduledCall, UserFeedback + NumberVerificationRequest, QueuedCall, ScheduledCall, UserFeedback _logger = logging.getLogger(__name__) @@ -726,11 +726,10 @@ def get_schedule( ) -> List[ScheduledCall]: """ Get all scheduled calls for a user""" - schedule = session.exec( + return session.exec( select(ScheduledCall) .where(ScheduledCall.user_id == user_id) ).all() - return schedule def set_schedule( @@ -751,3 +750,62 @@ def set_schedule( day=scheduled_call.day, start_time=scheduled_call.start_time, )) + + +def get_currently_scheduled_calls( + session: Session, + now: datetime, +) -> List[ScheduledCall]: + """ + Returns a list of ScheduledCall's that are + - scheduled for today + - scheduled for a time that is in the past but within our + call_schedule_interval + - have not been queued today + """ + timeframe = timedelta( + minutes=Config.get().telephony.office_hours.call_schedule_interval) + + return session.exec(select(ScheduledCall).filter( + ScheduledCall.day == now.isoweekday(), + and_( + ScheduledCall.start_time <= now.time(), + ScheduledCall.start_time >= (now - timeframe).time(), + ), + or_( + col(ScheduledCall.last_queued_at).is_(None), + cast(date, ScheduledCall.last_queued_at) < now.date(), + ), + )).all() + + +def mark_scheduled_calls_queued( + session: Session, + calls: List[ScheduledCall], + now: datetime, +): + """Timestamps 'last_queued_at' to 'now' for calls.""" + for call in calls: + call.last_queued_at = now + session.add_all(calls) + + +def get_next_queued_call( + session: Session, + now: datetime, +) -> Optional[QueuedCall]: + """ + Returns a QueuedCall object which was the first inserted + with priority to the postponed calls. + """ + + postponed = session.exec(select(QueuedCall).filter( + col(QueuedCall.postponed_to).is_not(None), + cast(datetime, QueuedCall.postponed_to) <= now, + ).order_by( + col(QueuedCall.postponed_to).desc(), + )).first() + if postponed: + return postponed + else: + return session.query(QueuedCall).first() diff --git a/server/dearmep/schedules/__init__.py b/server/dearmep/schedules/__init__.py index 3fd47e87..5e7b3d16 100644 --- a/server/dearmep/schedules/__init__.py +++ b/server/dearmep/schedules/__init__.py @@ -1,12 +1,39 @@ +from logging import getLogger from typing import Callable, List, Tuple from fastapi_restful.tasks import repeat_every +from prometheus_client import Counter from .calls import build_queue, handle_queue from ..config import Config, SchedulerTaskConfig SchedulerTask = Callable[[], None] +_logger = getLogger(__name__) +scheduler_exceptions_total = Counter( + name="scheduler_exceptions", + documentation="Number of exceptions in scheduler tasks", + labelnames=("task_name",), +) + + +def task_wrapper(func: SchedulerTask) -> SchedulerTask: + """ + Wraps a background task to handle any exceptions in case they appear. We + inform via logger and prometheus. We stop the background task in such a + case by raising the exception. + """ + def wrapped() -> None: + try: + func() + except Exception: + scheduler_exceptions_total.labels(func.__name__).inc() + _logger.critical(f"Error in scheduled task {func.__name__}", + exc_info=True, + ) + raise + return wrapped + def get_background_tasks(config: Config): """ @@ -21,15 +48,16 @@ def get_background_tasks(config: Config): # We add our tasks to the list of tasks to be run at startup if we find # their config. if (build_queue_cfg := config.scheduler.calls.build_queue): - tasks.append((build_queue_cfg, build_queue)) + tasks.append((build_queue_cfg, task_wrapper(build_queue))) if (handle_queue_cfg := config.scheduler.calls.handle_queue): - tasks.append((handle_queue_cfg, handle_queue)) + tasks.append((handle_queue_cfg, task_wrapper(handle_queue))) return [ repeat_every( seconds=cfg.interval, wait_first=cfg.wait_first, + raise_exceptions=True, )(func) for cfg, func in tasks] diff --git a/server/dearmep/schedules/calls.py b/server/dearmep/schedules/calls.py index f3d27430..1f6f2279 100644 --- a/server/dearmep/schedules/calls.py +++ b/server/dearmep/schedules/calls.py @@ -1,6 +1,46 @@ +from datetime import datetime +from prometheus_client import Counter + +from ..config import Config +from ..database import query +from ..database.models import QueuedCall +from ..database.connection import get_session + +queued_calls_total = Counter( + name="queued_calls_total", + documentation="Total number of calls queued", +) + + def build_queue() -> None: - pass + + now = datetime.now() + office_hours = Config.get().telephony.office_hours + if not office_hours.open(now): + return + + with get_session() as session: + calls = query.get_currently_scheduled_calls(session, now) + calls.sort(key=lambda call: call.start_time) + # we iterate this to preserve the order in db insertion + for call in calls: + session.add( + QueuedCall( + user_id=call.user_id, + language=call.language, + )) + queued_calls_total.inc(len(calls)) + query.mark_scheduled_calls_queued(session, calls, now) + session.commit() def handle_queue() -> None: - pass + + now = datetime.now() + with get_session() as session: + queued_call = query.get_next_queued_call(session, now) + if queued_call is None: + return + session.delete(queued_call) + session.commit() + # TODO Start call