Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure garbage collection of distributed.scheduler.TaskState instances #6364

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,9 @@ class TaskState:
#: Cached hash of :attr:`~TaskState.client_key`
_hash: int

__slots__ = tuple(__annotations__) # type: ignore
__slots__ = tuple(__annotations__) + ("__weakref__",) # type: ignore

_instances: weakref.WeakSet[TaskState] = weakref.WeakSet()

def __init__(self, key: str, run_spec: object):
self.key = key
Expand Down Expand Up @@ -1098,6 +1100,7 @@ def __init__(self, key: str, run_spec: object):
self.metadata = {}
self.annotations = {}
self.erred_on = set()
TaskState._instances.add(self)

def __hash__(self) -> int:
return self._hash
Expand Down
7 changes: 7 additions & 0 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
from distributed.nanny import Nanny
from distributed.node import ServerNode
from distributed.proctitle import enable_proctitle_on_children
from distributed.profile import wait_profiler
from distributed.protocol import deserialize
from distributed.scheduler import TaskState
from distributed.security import Security
from distributed.utils import (
DequeHandler,
Expand Down Expand Up @@ -1793,6 +1795,11 @@ def check_instances():

_global_clients.clear()

wait_profiler()
gc.collect()
Comment on lines +1798 to +1799
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this myself already but this is actually not safe. The wait_profiler polls for the thread to not be running. If we rely on ordinary ref-count based object collection, this is sufficient since as soon as the profiler thread pauses, the ref-count based collection cleans up all references and we're good.

Most of the TaskStates are actually part of a densely connected, self referencing data structure. these self referencing cycles are not necessarily a problem since the gc.collect can detect these, break them and clean up. However, the background thread may actually already be running again after leaving wait_profiler since we're only polling and are not actually stopping the thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd rather need something like

with no_profiler():
    # Some magic (e.g. a lock) ensures that the profile thread cannot watch while this ctx manager is held
    gc.collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this?

from distributed import profile

with profile.lock:
    gc.collect()
# profile.py

lock = threading.Lock()

def _watch(...):
    ...
    with lock:
        frame = sys._current_frames()[thread_id]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it seems simpler to reference a lock directly rather than make a context manager)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was typing the contextmanager first and while typing realized this is not occult magic but simply a lock :)


assert not TaskState._instances

for w in Worker._instances:
with suppress(RuntimeError): # closed IOLoop
w.loop.add_callback(w.close, report=False, executor_wait=False)
Expand Down