Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Some minor performance fixes for task schedular #16313

Merged
merged 13 commits into from
Sep 14, 2023
22 changes: 13 additions & 9 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from twisted.python.failure import Failure

from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -70,6 +73,8 @@ class TaskScheduler:
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn
# How often to clean up old tasks.
CLEANUP_INTERVAL_MS = 30 * 60 * 1000
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
Expand All @@ -94,10 +99,12 @@ def __init__(self, hs: "HomeServer"):

if self._run_background_tasks:
self._clock.looping_call(
run_as_background_process,
self._launch_scheduled_tasks,
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
self._clock.looping_call(
self._clean_scheduled_tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also separated in 2 like you did at some point, but I was not sure that no races exist between the 2 so I ended up removing it.
After more thinking I think we should be safe, we are iterating on 2 disjoint set of tasks (active+schedule vs failed+complete) and I don't see any trouble if an active task becomes complete or failed in the middle.

TaskScheduler.SCHEDULE_INTERVAL_MS,
"handle_scheduled_tasks",
self._handle_scheduled_tasks,
)

def register_action(
Expand Down Expand Up @@ -276,11 +283,7 @@ async def delete_task(self, id: str) -> None:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)

async def _handle_scheduled_tasks(self) -> None:
"""Main loop taking care of launching tasks and cleaning up old ones."""
await self._launch_scheduled_tasks()
await self._clean_scheduled_tasks()

@wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
# Don't bother trying to launch new tasks if we're already at capacity.
Expand All @@ -300,6 +303,7 @@ async def _launch_scheduled_tasks(self) -> None:

running_tasks_gauge.set(len(self._running_tasks))

@wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None:
"""Clean old complete or failed jobs to avoid clutter the DB."""
now = self._clock.time_msec()
Expand Down