Skip to content

Commit

Permalink
Use thread ancestry hierarchy to support nested threads
Browse files Browse the repository at this point in the history
Lint
  • Loading branch information
krassowski committed Dec 27, 2023
1 parent 5956899 commit e1258de
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 19 deletions.
11 changes: 9 additions & 2 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ def __init__(
"parent_header"
)
self._parent_header.set({})
self._thread_parents = {}
self._thread_to_parent = {}
self._thread_to_parent_header = {}
self._parent_header_global = {}
self._master_pid = os.getpid()
self._flush_pending = False
Expand Down Expand Up @@ -509,7 +510,13 @@ def parent_header(self):
except LookupError:
try:
# thread-specific
return self._thread_parents[threading.current_thread().ident]
identity = threading.current_thread().ident
# retrieve the outermost (oldest ancestor,
# discounting the kernel thread) thread identity
while identity in self._thread_to_parent:
identity = self._thread_to_parent[identity]
# use the header of the oldest ancestor
return self._thread_to_parent_header[identity]
except KeyError:
# global (fallback)
return self._parent_header_global
Expand Down
61 changes: 44 additions & 17 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def _get_comm_manager(*args, **kwargs):

import threading

threading_start = threading.Thread.start
_threading_Thread_run = threading.Thread.run
_threading_Thread__init__ = threading.Thread.__init__


class IPythonKernel(KernelBase):
Expand Down Expand Up @@ -158,6 +159,9 @@ def __init__(self, **kwargs):

appnope.nope()

self._new_threads_parent_header = {}
self._initialize_thread_hooks()

if hasattr(gc, "callbacks"):
# while `gc.callbacks` exists since Python 3.3, pypy does not
# implement it even as of 3.9.
Expand Down Expand Up @@ -356,7 +360,7 @@ def set_sigint_result():
async def execute_request(self, stream, ident, parent):
"""Override for cell output - cell reconciliation."""
parent_header = extract_header(parent)
self._associate_identity_of_new_threads_with(parent_header)
self._associate_new_top_level_threads_with(parent_header)
await super().execute_request(stream, ident, parent)

async def do_execute(
Expand Down Expand Up @@ -724,31 +728,47 @@ def do_clear(self):
self.shell.reset(False)
return dict(status="ok")

def _associate_identity_of_new_threads_with(self, parent_header):
"""Intercept the identity of any thread started after this method finished,
and associate the thread's output with the parent header frame, which allows
to direct the outputs to the cell which started the thread.
def _associate_new_top_level_threads_with(self, parent_header):
"""Store the parent header to associate it with new top-level threads"""
self._new_threads_parent_header = parent_header

This is a no-op if the `self._stdout` and `self._stderr` are not
sub-classes of `OutStream`.
"""
def _initialize_thread_hooks(self):
"""Store thread hierarchy and thread-parent_header associations."""
stdout = self._stdout
stderr = self._stderr
kernel_thread_ident = threading.get_ident()
kernel = self

def start_closure(self: threading.Thread):
def run_closure(self: threading.Thread):
"""Wrap the `threading.Thread.start` to intercept thread identity.
This is needed because there is no "start" hook yet, but there
might be one in the future: https://bugs.python.org/issue14073
This is a no-op if the `self._stdout` and `self._stderr` are not
sub-classes of `OutStream`.
"""

threading_start(self)
try:
parent = self._ipykernel_parent_thread_ident # type:ignore[attr-defined]
except AttributeError:
return
for stream in [stdout, stderr]:
if isinstance(stream, OutStream):
stream._thread_parents[self.ident] = parent_header
if parent == kernel_thread_ident:
stream._thread_to_parent_header[
self.ident
] = kernel._new_threads_parent_header
else:
stream._thread_to_parent[self.ident] = parent
_threading_Thread_run(self)

def init_closure(self: threading.Thread, *args, **kwargs):
_threading_Thread__init__(self, *args, **kwargs)
self._ipykernel_parent_thread_ident = threading.get_ident() # type:ignore[attr-defined]

threading.Thread.start = start_closure # type:ignore[method-assign]
threading.Thread.__init__ = init_closure # type:ignore[method-assign]
threading.Thread.run = run_closure # type:ignore[method-assign]

def _clean_thread_parent_frames(
self, phase: t.Literal["start", "stop"], info: t.Dict[str, t.Any]
Expand All @@ -768,11 +788,18 @@ def _clean_thread_parent_frames(
active_threads = {thread.ident for thread in threading.enumerate()}
for stream in [self._stdout, self._stderr]:
if isinstance(stream, OutStream):
thread_parents = stream._thread_parents
for identity in list(thread_parents.keys()):
thread_to_parent_header = stream._thread_to_parent_header
for identity in list(thread_to_parent_header.keys()):
if identity not in active_threads:
try:
del thread_to_parent_header[identity]
except KeyError:
pass
thread_to_parent = stream._thread_to_parent
for identity in list(thread_to_parent.keys()):
if identity not in active_threads:
try:
del thread_parents[identity]
del thread_to_parent[identity]
except KeyError:
pass

Expand Down

0 comments on commit e1258de

Please sign in to comment.