Skip to content

Commit

Permalink
Add queue scheduler
Browse files Browse the repository at this point in the history
* add metric for queues
* wrap the scheduled tasks to handle exceptions
* build queue function
* handle queue function
* relevant queries for scheduler functionality
  • Loading branch information
iameru committed Dec 6, 2023
1 parent fb76b74 commit 6069a18
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 10 deletions.
4 changes: 2 additions & 2 deletions server/dearmep/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from uuid import uuid4

from pydantic import UUID4, BaseModel
from sqlmodel import Column, Enum, Field, JSON, Relationship, \
SQLModel, String, TIMESTAMP, UniqueConstraint, and_, case, or_, func, text
from sqlmodel import Column, Enum, Field, JSON, Relationship, SQLModel, \
String, TIMESTAMP, UniqueConstraint, and_, case, or_, func, text

from ..config import Config, ConfigNotLoaded, Language
from ..models import CountryCode, FeedbackConvinced, FeedbackText, \
Expand Down
66 changes: 62 additions & 4 deletions server/dearmep/database/query.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import Callable, Dict, List, NamedTuple, Optional, Union, cast
from secrets import randbelow
import re
Expand All @@ -20,7 +20,7 @@
from .connection import Session, select
from .models import Blob, BlobID, Destination, DestinationID, \
DestinationSelectionLog, DestinationSelectionLogEvent, MediaList, \
NumberVerificationRequest, ScheduledCall, UserFeedback
NumberVerificationRequest, QueuedCall, ScheduledCall, UserFeedback


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -726,11 +726,10 @@ def get_schedule(
) -> List[ScheduledCall]:
""" Get all scheduled calls for a user"""

schedule = session.exec(
return session.exec(
select(ScheduledCall)
.where(ScheduledCall.user_id == user_id)
).all()
return schedule


def set_schedule(
Expand All @@ -751,3 +750,62 @@ def set_schedule(
day=scheduled_call.day,
start_time=scheduled_call.start_time,
))


def get_currently_scheduled_calls(
session: Session,
now: datetime,
) -> List[ScheduledCall]:
"""
Returns a list of ScheduledCall's that are
- scheduled for today
- scheduled for a time that is in the past but within our
call_schedule_interval
- have not been queued today
"""
timeframe = timedelta(
minutes=Config.get().telephony.office_hours.call_schedule_interval)

return session.exec(select(ScheduledCall).filter(
ScheduledCall.day == now.isoweekday(),
and_(
ScheduledCall.start_time <= now.time(),
ScheduledCall.start_time >= (now - timeframe).time(),
),
or_(
col(ScheduledCall.last_queued_at).is_(None),
cast(date, ScheduledCall.last_queued_at) < now.date(),
),
)).all()


def mark_scheduled_calls_queued(
session: Session,
calls: List[ScheduledCall],
now: datetime,
):
"""Timestamps 'last_queued_at' to 'now' for calls."""
for call in calls:
call.last_queued_at = now
session.add_all(calls)


def get_next_queued_call(
session: Session,
now: datetime,
) -> Optional[QueuedCall]:
"""
Returns a QueuedCall object which was the first inserted
with priority to the postponed calls.
"""

postponed = session.exec(select(QueuedCall).filter(
col(QueuedCall.postponed_to).is_not(None),
cast(datetime, QueuedCall.postponed_to) <= now,
).order_by(
col(QueuedCall.postponed_to).desc(),
)).first()
if postponed:
return postponed
else:
return session.query(QueuedCall).first()
32 changes: 30 additions & 2 deletions server/dearmep/schedules/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
from logging import getLogger
from typing import Callable, List, Tuple

from fastapi_restful.tasks import repeat_every
from prometheus_client import Counter

from .calls import build_queue, handle_queue
from ..config import Config, SchedulerTaskConfig

SchedulerTask = Callable[[], None]

_logger = getLogger(__name__)
scheduler_exceptions_total = Counter(
name="scheduler_exceptions",
documentation="Number of exceptions in scheduler tasks",
labelnames=("task_name",),
)


def task_wrapper(func: SchedulerTask) -> SchedulerTask:
"""
Wraps a background task to handle any exceptions in case they appear. We
inform via logger and prometheus. We stop the background task in such a
case by raising the exception.
"""
def wrapped() -> None:
try:
func()
except Exception:
scheduler_exceptions_total.labels(func.__name__).inc()
_logger.critical(f"Error in scheduled task {func.__name__}",
exc_info=True,
)
raise
return wrapped


def get_background_tasks(config: Config):
"""
Expand All @@ -21,15 +48,16 @@ def get_background_tasks(config: Config):
# We add our tasks to the list of tasks to be run at startup if we find
# their config.
if (build_queue_cfg := config.scheduler.calls.build_queue):
tasks.append((build_queue_cfg, build_queue))
tasks.append((build_queue_cfg, task_wrapper(build_queue)))

if (handle_queue_cfg := config.scheduler.calls.handle_queue):
tasks.append((handle_queue_cfg, handle_queue))
tasks.append((handle_queue_cfg, task_wrapper(handle_queue)))

return [
repeat_every(
seconds=cfg.interval,
wait_first=cfg.wait_first,
raise_exceptions=True,
)(func) for cfg, func in tasks]


Expand Down
44 changes: 42 additions & 2 deletions server/dearmep/schedules/calls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
from datetime import datetime
from prometheus_client import Counter

from ..config import Config
from ..database import query
from ..database.models import QueuedCall
from ..database.connection import get_session

queued_calls_total = Counter(
name="queued_calls_total",
documentation="Total number of calls queued",
)


def build_queue() -> None:
pass

now = datetime.now()
office_hours = Config.get().telephony.office_hours
if not office_hours.open(now):
return

with get_session() as session:
calls = query.get_currently_scheduled_calls(session, now)
calls.sort(key=lambda call: call.start_time)
# we iterate this to preserve the order in db insertion
for call in calls:
session.add(
QueuedCall(
user_id=call.user_id,
language=call.language,
))
queued_calls_total.inc(len(calls))
query.mark_scheduled_calls_queued(session, calls, now)
session.commit()


def handle_queue() -> None:
pass

now = datetime.now()
with get_session() as session:
queued_call = query.get_next_queued_call(session, now)
if queued_call is None:
return
session.delete(queued_call)
session.commit()
# TODO Start call

0 comments on commit 6069a18

Please sign in to comment.