Skip to content

Commit

Permalink
Add logging to tasks managed by the task scheduler, showing CPU and d…
Browse files Browse the repository at this point in the history
…atabase usage. (element-hq#17219)

The log format is the same as the request log format, except:

- fields that are specific to HTTP requests have been removed
- the task's params are included at the end of the log line.

These log lines are emitted:
- when the task function finishes — both completion and failure (and I
suppose it is possible for a task to become schedulable again?)
- every 5 minutes whilst it is running

Closes element-hq#17217.

---------

Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
  • Loading branch information
reivilibre authored and Mic92 committed Jun 14, 2024
1 parent 7d32075 commit a08b072
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog.d/17219.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add logging to tasks managed by the task scheduler, showing CPU and database usage.
69 changes: 67 additions & 2 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

from twisted.python.failure import Failure

from synapse.logging.context import nested_logging_context
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
nested_logging_context,
set_current_context,
)
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
Expand Down Expand Up @@ -81,6 +86,8 @@ class TaskScheduler:
MAX_CONCURRENT_RUNNING_TASKS = 5
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
# Report a running task's status and usage every so often.
OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes

def __init__(self, hs: "HomeServer"):
self._hs = hs
Expand Down Expand Up @@ -346,6 +353,33 @@ async def _clean_scheduled_tasks(self) -> None:
assert task.id not in self._running_tasks
await self._store.delete_scheduled_task(task.id)

@staticmethod
def _log_task_usage(
state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float
) -> None:
"""
Log a line describing the state and usage of a task.
The log line is inspired by / a copy of the request log line format,
but with irrelevant fields removed.
active_time: Time that the task has been running for, in seconds.
"""

logger.info(
"Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" [%d dbevts] %r, %r",
state,
active_time,
usage.ru_utime,
usage.ru_stime,
usage.db_sched_duration_sec,
usage.db_txn_duration_sec,
int(usage.db_txn_count),
usage.evt_db_fetch_count,
task.resource_id,
task.params,
)

async def _launch_task(self, task: ScheduledTask) -> None:
"""Launch a scheduled task now.
Expand All @@ -360,8 +394,32 @@ async def _launch_task(self, task: ScheduledTask) -> None:
)
function = self._actions[task.action]

def _occasional_report(
task_log_context: LoggingContext, start_time: float
) -> None:
"""
Helper to log a 'Task continuing' line every so often.
"""

current_time = self._clock.time()
calling_context = set_current_context(task_log_context)
try:
usage = task_log_context.get_resource_usage()
TaskScheduler._log_task_usage(
"continuing", task, usage, current_time - start_time
)
finally:
set_current_context(calling_context)

async def wrapper() -> None:
with nested_logging_context(task.id):
with nested_logging_context(task.id) as log_context:
start_time = self._clock.time()
occasional_status_call = self._clock.looping_call(
_occasional_report,
TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS,
log_context,
start_time,
)
try:
(status, result, error) = await function(task)
except Exception:
Expand All @@ -383,6 +441,13 @@ async def wrapper() -> None:
)
self._running_tasks.remove(task.id)

current_time = self._clock.time()
usage = log_context.get_resource_usage()
TaskScheduler._log_task_usage(
status.value, task, usage, current_time - start_time
)
occasional_status_call.stop()

# Try launch a new task since we've finished with this one.
self._clock.call_later(0.1, self._launch_scheduled_tasks)

Expand Down

0 comments on commit a08b072

Please sign in to comment.