Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging to tasks managed by the task scheduler, showing CPU and database usage. #17219

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17219.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add logging to tasks managed by the task scheduler, showing CPU and database usage.
69 changes: 67 additions & 2 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@

from twisted.python.failure import Failure

from synapse.logging.context import nested_logging_context
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
nested_logging_context,
set_current_context,
)
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
Expand Down Expand Up @@ -81,6 +86,8 @@ class TaskScheduler:
MAX_CONCURRENT_RUNNING_TASKS = 5
# Time from the last task update after which we will log a warning
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
# Report a running task's status and usage every so often.
OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes

def __init__(self, hs: "HomeServer"):
self._hs = hs
Expand Down Expand Up @@ -346,6 +353,33 @@ async def _clean_scheduled_tasks(self) -> None:
assert task.id not in self._running_tasks
await self._store.delete_scheduled_task(task.id)

@staticmethod
def _log_task_usage(
state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float
) -> None:
"""
Log a line describing the state and usage of a task.
The log line is inspired by / a copy of the request log line format,
but with irrelevant fields removed.

active_time: Time that the task has been running for, in seconds.
"""

logger.info(
"Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to report whether its ongoing or finished? Are these total values or deltas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Task %s will say 'contuining', 'completed' or 'failed' depending on which one it is. These are totals, but only for this process lifetime of Synapse. (if you restart Synapse, it might start some of these tasks again where they might essentially resume. Also I think it's possible for a task to finish in such a way that it reschedules itself later, though we don't appear to do that right now)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right yes, sorry for some reason I thought it would be Task <name> 🤦

" [%d dbevts] %r, %r",
state,
active_time,
usage.ru_utime,
usage.ru_stime,
usage.db_sched_duration_sec,
usage.db_txn_duration_sec,
int(usage.db_txn_count),
usage.evt_db_fetch_count,
task.resource_id,
task.params,
)

async def _launch_task(self, task: ScheduledTask) -> None:
"""Launch a scheduled task now.

Expand All @@ -360,8 +394,32 @@ async def _launch_task(self, task: ScheduledTask) -> None:
)
function = self._actions[task.action]

def _occasional_report(
task_log_context: LoggingContext, start_time: float
) -> None:
"""
Helper to log a 'Task continuing' line every so often.
"""

current_time = self._clock.time()
calling_context = set_current_context(task_log_context)
try:
usage = task_log_context.get_resource_usage()
TaskScheduler._log_task_usage(
"continuing", task, usage, current_time - start_time
)
finally:
set_current_context(calling_context)

async def wrapper() -> None:
with nested_logging_context(task.id):
with nested_logging_context(task.id) as log_context:
start_time = self._clock.time()
occasional_status_call = self._clock.looping_call(
_occasional_report,
TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS,
log_context,
start_time,
)
try:
(status, result, error) = await function(task)
except Exception:
Expand All @@ -383,6 +441,13 @@ async def wrapper() -> None:
)
self._running_tasks.remove(task.id)

current_time = self._clock.time()
usage = log_context.get_resource_usage()
TaskScheduler._log_task_usage(
status.value, task, usage, current_time - start_time
)
occasional_status_call.stop()

# Try launch a new task since we've finished with this one.
self._clock.call_later(0.1, self._launch_scheduled_tasks)

Expand Down
Loading