Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Selective persistent termination logs (#9)
Browse files Browse the repository at this point in the history
- We should preserve the termination log of tasks that are supposed to
  be long-running and not terminated during the entire application
  lifecycle (e.g., server apps), when there are many volatile tasks
  continuously spawned and destroyed, regardless of the history limit.
  • Loading branch information
achimnol committed Oct 19, 2022
1 parent ed025c7 commit 18acda2
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 67 deletions.
42 changes: 33 additions & 9 deletions aiomonitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from terminaltables import AsciiTable

from . import console
from .task import TracedTask
from .task import TracedTask, persistent_coro
from .types import CancellationChain, TerminatedTaskInfo
from .utils import (
AliasGroupMixin,
Expand All @@ -58,6 +58,7 @@
_format_timedelta,
all_tasks,
cancel_task,
get_default_args,
task_by_id,
)

Expand Down Expand Up @@ -321,10 +322,12 @@ def _create_task(
parent_task = asyncio.current_task()
except RuntimeError:
parent_task = None
persistent = coro in persistent_coro
task = TracedTask(
self._coro_wrapper(coro), # type: ignore
termination_info_queue=self._termination_info_queue.sync_q,
cancellation_chain_queue=self._cancellation_chain_queue.sync_q,
persistent=persistent,
loop=self._monitored_loop,
)
task._orig_coro = coro
Expand Down Expand Up @@ -380,7 +383,8 @@ async def _ui_handle_termination_updates(self) -> None:
except asyncio.CancelledError:
return
self._terminated_tasks[update.id] = update
self._terminated_history.append(update.id)
if not update.persistent:
self._terminated_history.append(update.id)
# canceller stack is already put in _ui_handle_cancellation_updates()
if canceller_stack := self._canceller_stacks.pop(update.id, None):
update.canceller_stack = canceller_stack
Expand Down Expand Up @@ -650,9 +654,14 @@ async def _console(ctx: click.Context) -> None:

@monitor_cli.command(name="ps", aliases=["p"])
@click.option("-f", "--filter", "filter_", help="filter by coroutine or task name")
@click.option("-p", "--persistent", is_flag=True, help="show only persistent tasks")
@custom_help_option
@auto_command_done
def do_ps(ctx: click.Context, filter_: str) -> None:
def do_ps(
ctx: click.Context,
filter_: str,
persistent: bool,
) -> None:
"""Show task table"""
headers = (
"Task ID",
Expand All @@ -671,8 +680,13 @@ def do_ps(ctx: click.Context, filter_: str) -> None:
taskid = str(id(task))
if isinstance(task, TracedTask):
coro_repr = _format_coroutine(task._orig_coro).partition(" ")[0]
if persistent and task._orig_coro not in persistent_coro:
continue
else:
coro_repr = _format_coroutine(task.get_coro()).partition(" ")[0]
if persistent:
# untracked tasks should be skipped when showing persistent ones only
continue
if filter_ and (filter_ not in coro_repr and filter_ not in task.get_name()):
continue
creation_stack = self._created_tracebacks.get(task)
Expand Down Expand Up @@ -706,7 +720,7 @@ def do_ps(ctx: click.Context, filter_: str) -> None:
table = AsciiTable(table_data)
table.inner_row_border = False
table.inner_column_border = False
if filter_:
if filter_ or persistent:
stdout.write(
f"{len(all_running_tasks)} tasks running "
f"(showing {len(table_data) - 1} tasks)\n"
Expand All @@ -720,9 +734,14 @@ def do_ps(ctx: click.Context, filter_: str) -> None:

@monitor_cli.command(name="ps-terminated", aliases=["pt", "pst"])
@click.option("-f", "--filter", "filter_", help="filter by coroutine or task name")
@click.option("-p", "--persistent", is_flag=True, help="show only persistent tasks")
@custom_help_option
@auto_command_done
def do_ps_terminated(ctx: click.Context, filter_: str) -> None:
def do_ps_terminated(
ctx: click.Context,
filter_: str,
persistent: bool,
) -> None:
"""List recently terminated/cancelled tasks"""
headers = (
"Trace ID",
Expand All @@ -740,6 +759,8 @@ def do_ps_terminated(ctx: click.Context, filter_: str) -> None:
key=lambda info: info.terminated_at,
reverse=True,
):
if persistent and not item.persistent:
continue
if filter_ and (filter_ not in item.coro and filter_ not in item.name):
continue
run_since = _format_timedelta(
Expand All @@ -760,9 +781,9 @@ def do_ps_terminated(ctx: click.Context, filter_: str) -> None:
table = AsciiTable(table_data)
table.inner_row_border = False
table.inner_column_border = False
if filter_:
if filter_ or persistent:
stdout.write(
f"{len(terminated_tasks)} tasks running "
f"{len(terminated_tasks)} tasks terminated "
f"(showing {len(table_data) - 1} tasks)\n"
)
else:
Expand Down Expand Up @@ -880,21 +901,24 @@ def do_where_terminated(ctx: click.Context, trace_id: str) -> None:
def start_monitor(
loop: asyncio.AbstractEventLoop,
*,
monitor: Type[Monitor] = Monitor,
monitor_cls: Type[Monitor] = Monitor,
host: str = MONITOR_HOST,
port: int = MONITOR_PORT,
console_port: int = CONSOLE_PORT,
console_enabled: bool = True,
hook_task_factory: bool = False,
max_termination_history: Optional[int] = None,
locals: Optional[Dict[str, Any]] = None,
) -> Monitor:
m = monitor(
m = monitor_cls(
loop,
host=host,
port=port,
console_port=console_port,
console_enabled=console_enabled,
hook_task_factory=hook_task_factory,
max_termination_history=max_termination_history
or get_default_args(monitor_cls.__init__)["max_termination_history"],
locals=locals,
)
m.start()
Expand Down
36 changes: 35 additions & 1 deletion aiomonitor/task.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
import asyncio
import base64
import functools
import struct
import sys
import time
import traceback
import weakref
from asyncio.coroutines import _format_coroutine # type: ignore
from typing import Any, Generator, List, Optional
from typing import Any, Callable, Coroutine, Generator, List, Optional, TypeVar

import janus
from typing_extensions import ParamSpec

from .types import CancellationChain, TerminatedTaskInfo
from .utils import _extract_stack_from_frame

__all__ = (
"TracedTask",
"preserve_termination_log",
"persistent_coro",
)

persistent_coro: weakref.WeakSet[Coroutine] = weakref.WeakSet()

T = TypeVar("T")
P = ParamSpec("P")


class TracedTask(asyncio.Task):
_orig_coro: Generator[Any, Any, Any]
Expand All @@ -22,6 +36,7 @@ def __init__(
*args,
termination_info_queue: janus._SyncQueueProxy[TerminatedTaskInfo],
cancellation_chain_queue: janus._SyncQueueProxy[CancellationChain],
persistent: bool = False,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
Expand All @@ -30,6 +45,7 @@ def __init__(
self._started_at = time.perf_counter()
self._termination_stack = None
self.add_done_callback(self._trace_termination)
self._persistent = persistent

def get_trace_id(self) -> str:
h = hash(
Expand Down Expand Up @@ -58,6 +74,7 @@ def _trace_termination(self, _: asyncio.Task[Any]) -> None:
terminated_at=time.perf_counter(),
termination_stack=self._termination_stack,
canceller_stack=None,
persistent=self._persistent,
)
self._termination_info_queue.put_nowait(task_info)

Expand All @@ -75,3 +92,20 @@ def cancel(self, msg: Optional[str] = None) -> bool:
)
self._cancellation_chain_queue.put_nowait(cancellation_chain)
return super().cancel(msg)


def preserve_termination_log(
corofunc: Callable[P, Coroutine[Any, None, T]]
) -> Callable[P, Coroutine[Any, None, T]]:
"""
Guard the given coroutine function from being stripped out due to the max history
limit when created as TracedTask.
"""

@functools.wraps(corofunc)
def inner(*args: P.args, **kwargs: P.kwargs) -> Coroutine[Any, None, T]:
coro = corofunc(*args, **kwargs)
persistent_coro.add(coro)
return coro

return inner
1 change: 1 addition & 0 deletions aiomonitor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TerminatedTaskInfo:
termination_stack: Optional[List[traceback.FrameSummary]]
canceller_stack: Optional[List[traceback.FrameSummary]] = None
exc_repr: Optional[str] = None
persistent: bool = False


@define
Expand Down
10 changes: 10 additions & 0 deletions aiomonitor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import contextlib
import inspect
import linecache
import sys
import traceback
Expand Down Expand Up @@ -149,6 +150,15 @@ def _extract_stack_from_exception(e: BaseException) -> List[traceback.FrameSumma
return stack


def get_default_args(func):
signature = inspect.signature(func)
return {
k: v.default
for k, v in signature.parameters.items()
if v.default is not inspect.Parameter.empty
}


class AliasGroupMixin(click.Group):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
16 changes: 15 additions & 1 deletion examples/cancellation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import asyncio

import aiomonitor
from aiomonitor.task import preserve_termination_log


@preserve_termination_log
async def should_have_run_until_exit():
print("should_have_run_until_exit: begin")
await asyncio.sleep(5)
print("should_have_run_until_exit: cancel")
raise asyncio.CancelledError("cancelled-suspiciously")


async def chain3():
Expand Down Expand Up @@ -65,7 +74,12 @@ async def do_unhandled(tick):

async def main():
loop = asyncio.get_running_loop()
with aiomonitor.start_monitor(loop, hook_task_factory=True):
with aiomonitor.start_monitor(
loop,
hook_task_factory=True,
max_termination_history=10,
):
asyncio.create_task(should_have_run_until_exit())
chain_main_task = asyncio.create_task(chain_main())
self_cancel_main_task = asyncio.create_task(self_cancel_main())
unhandled_exc_main_task = asyncio.create_task(unhandled_exc_main())
Expand Down
56 changes: 0 additions & 56 deletions examples/web_srv_custom_monitor.py

This file was deleted.

1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def read(f):
"click>=8",
"janus>=1.0",
"terminaltables",
"typing-extensions>=4.1",
"prompt_toolkit>=3.0",
"aioconsole",
]
Expand Down

0 comments on commit 18acda2

Please sign in to comment.