diff --git a/changelog.d/17219.feature b/changelog.d/17219.feature new file mode 100644 index 00000000000..f8277a89d85 --- /dev/null +++ b/changelog.d/17219.feature @@ -0,0 +1 @@ +Add logging to tasks managed by the task scheduler, showing CPU and database usage. \ No newline at end of file diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 01d05c9ed60..448960b2978 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -24,7 +24,12 @@ from twisted.python.failure import Failure -from synapse.logging.context import nested_logging_context +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + nested_logging_context, + set_current_context, +) from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, @@ -81,6 +86,8 @@ class TaskScheduler: MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs + # Report a running task's status and usage every so often. + OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes def __init__(self, hs: "HomeServer"): self._hs = hs @@ -346,6 +353,33 @@ async def _clean_scheduled_tasks(self) -> None: assert task.id not in self._running_tasks await self._store.delete_scheduled_task(task.id) + @staticmethod + def _log_task_usage( + state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float + ) -> None: + """ + Log a line describing the state and usage of a task. + The log line is inspired by / a copy of the request log line format, + but with irrelevant fields removed. + + active_time: Time that the task has been running for, in seconds. + """ + + logger.info( + "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " [%d dbevts] %r, %r", + state, + active_time, + usage.ru_utime, + usage.ru_stime, + usage.db_sched_duration_sec, + usage.db_txn_duration_sec, + int(usage.db_txn_count), + usage.evt_db_fetch_count, + task.resource_id, + task.params, + ) + async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. @@ -360,8 +394,32 @@ async def _launch_task(self, task: ScheduledTask) -> None: ) function = self._actions[task.action] + def _occasional_report( + task_log_context: LoggingContext, start_time: float + ) -> None: + """ + Helper to log a 'Task continuing' line every so often. + """ + + current_time = self._clock.time() + calling_context = set_current_context(task_log_context) + try: + usage = task_log_context.get_resource_usage() + TaskScheduler._log_task_usage( + "continuing", task, usage, current_time - start_time + ) + finally: + set_current_context(calling_context) + async def wrapper() -> None: - with nested_logging_context(task.id): + with nested_logging_context(task.id) as log_context: + start_time = self._clock.time() + occasional_status_call = self._clock.looping_call( + _occasional_report, + TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, + log_context, + start_time, + ) try: (status, result, error) = await function(task) except Exception: @@ -383,6 +441,13 @@ async def wrapper() -> None: ) self._running_tasks.remove(task.id) + current_time = self._clock.time() + usage = log_context.get_resource_usage() + TaskScheduler._log_task_usage( + status.value, task, usage, current_time - start_time + ) + occasional_status_call.stop() + # Try launch a new task since we've finished with this one. self._clock.call_later(0.1, self._launch_scheduled_tasks)