-
-
Notifications
You must be signed in to change notification settings - Fork 372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kernel subshells (JEP91) implementation #1249
Conversation
651299f
to
9139669
Compare
The kernel subshells JEP has been accepted and as this is the reference implementation for that I am asking for someone to review this. |
ipykernel/subshell_manager.py
Outdated
while True: | ||
for socket, _ in self._poller.poll(0): | ||
msg = await socket.recv_multipart(copy=False) | ||
self._shell_socket.send_multipart(msg) | ||
|
||
# Yield to other tasks. | ||
await sleep(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop leads to 100% CPU usage, we need to find another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. On my macOS dev machine this is fine, but I've confirmed that it is a problem on Linux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the implementation in 97e3e91. I've removed the sleep(0)
and poll(0)
calls and now use an async zmq Poller
in an anyio
task. When a subshell is created or deleted this task is cancelled using an anyio.Event
and rescheduled with the updated list of subshells. With this I no longer see 100% CPU load on Linux or macOS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the implementation of this piece of code again. My use of await poller.poll()
turned out to cause problems on python < 3.10 and on Windows. Now I am avoiding use of zmq.Poller
and instead using a separate anyio task for each subshell to listen for reply messages and send them out to the client via the shell channel. This seems to be more robust on older python and Windows. I've also replaced the use of an anyio.Event
with a memory object stream instead (essentially an anyio async queue).
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience, I'm going to try reviewing this and pushing it forward.
I'm done a partial read – not in depth, and added a few comments – which are not mandatory request for changes but thoughts to myself, for when I'll come back to it.
I have to run some errands, so I'm going to post this as is for now but will come back to it later.
ipykernel/control.py
Outdated
|
||
def set_task(self, task): | ||
self._task = task | ||
super().__init__(name=CONTROL_THREAD_NAME, **kwargs) | ||
|
||
def run(self): | ||
"""Run the thread.""" | ||
self.name = CONTROL_THREAD_NAME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unnecessary now Thread have setter/getter and super().__init__
should set the private name directly.
Maybe try to put an assert ==, and if test are passing, just remove the override of the run method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the name duplication in b644f9e7
and 9109cc8
without causing any problems.
|
||
self._context: zmq.asyncio.Context = context | ||
self._shell_socket = shell_socket | ||
self._cache: dict[str, Subshell] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self curiosity:
check the difference between attribute typing in __init__
and at class level.
Is mypy smart enough, or does it say attribute may be unset
if type set in __init__
??
ipykernel/subshell_manager.py
Outdated
elif type == "list": | ||
reply["subshell_id"] = self.list_subshell() | ||
else: | ||
msg = f"Unrecognised message type {type}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg = f"Unrecognised message type {type}" | |
msg = f"Unrecognised message type {type!r}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d17c77f.
I'm +1 for this, a couple of stylistic notes – if you agree I'm happy to push some changes updates but don't want to do so without your approval as I know it can get unwieldy to have another maintainer push changes on your branch on big PRs. I think we can merge this as "experimental" and then work on the related issues when using this features on various projects (IPython history etc...). |
@Carreau Thanks for the review and for the offer of pushing changes. I'd rather make the changes myself, and I'll try to group them sensibly to avoid to much CI churn. |
It seems that you use |
A ZMQ inproc pair is an area of memory shared between two threads and wrapped to look like ZMQ sockets.
Here is where they are created in the source code which includes some explanation: # anyio memory object stream for async queue-like communication between tasks.
# Used by _create_subshell to tell listen_from_subshells to spawn a new task.
self._send_stream, self._receive_stream = create_memory_object_stream[str]() The communication is between two different async tasks running in the same thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some notes for myself and history, but all good on my side. Let's get that in and iterate if necessary.
# socket=None is valid if kernel subshells are not supported. | ||
try: | ||
while True: | ||
await self.process_shell_message(socket=socket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR:
- I don't like that breaking out of this while loop requires an exception that is hard to find in the implementation of
process_shell_message
; we should likely have an explicit exit point.
if inspect.isawaitable(result): | ||
await result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR:
- I think we should move to handler always returning an awaitable, or more likely handler always being a coroutine function.
"protocol_version": kernel_protocol_version, | ||
"implementation": self.implementation, | ||
"implementation_version": self.implementation_version, | ||
"language_info": self.language_info, | ||
"banner": self.banner, | ||
"help_links": self.help_links, | ||
"supported_features": [], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return | ||
if not self._supports_kernel_subshells: | ||
self.log.error("Subshells are not supported by this kernel") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And from the discussion bellow, maybe think in later maybe return something that indicate why the subshell was not created ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe not as that would change the message spec.
I think I get that better now, that these handler return values likely go nowhere so why they do not raise exception. Let's maybe leave making this clearer to a subsequent refactor/maintenance.
except BaseException as err: | ||
reply = { | ||
"status": "error", | ||
"evalue": str(err), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for this PR:
- Can this contain information that should not be send to the frontend ?
- I don't believe so as the user is anyway the one that runs the kernel, but we had case where traceback would contain sensitive values. In other apps webapp this would be a likely issue (like revealing paths and filename of a server), but I don't think it's an issue here.
I haven't merged something on this repo in quite some time, I think I would prefer a rebase-and-merge as you took time to craft the commits individually, but I think the policy here is squash-merged. Any objections to squash-merge ? |
A squash-merge is fine by me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ianthomas23 !
This is the implementation of the kernel subshells JEP (jupyter/enhancement-proposals#91). It follows the latest commit (1f1ad3d) with the addition of a
%subshell
magic command that is useful for debugging. To try this out I have a JupyterLab branch that talks to this branch and is most easily tried out using https://mybinder.org/v2/gh/ianthomas23/jupyterlab/jep91_demo?urlpath=lab; once the mybinder instance has started, open thesubshell_demo_notebook.ipynb
and follow the instructions therein.The idea is that this is mergeable as it is now, it is backward compatible in that it does not break any existing use of
ipykernel
(subject to CI confirmation). There are some ramifications of the protocol additions (outlined below) that will need addressing eventually, but I consider these future work that can be in separate PRs.Outline of changes
subshell_id
from the message and passes it on to the correct subshell.SubshellManager
in the shell channel thread is responsible for subshell lifetimes.Example scenario
Here is an example of the communication between threads when running a long task in the parent subshell (main thread) and whilst this is running a child subshell is created, used, and deleted.
Future work
ipykernel
the wholesome of the message to get thesubshell_id
. Ideally it would only deserialise the header. May need changes in Jupyter Client.threading.Event
following the existinganyio
implementation which requires an extra thread perEvent
. It would be nice if this could be changed so a subshell is a single thread not two.input()
on more than one subshell at the same time run but do not store correctly.JupyterLab
The JupyterLab branch I am using to demo this isn't really intended to be merged. But if it was, it needs:
kernel_info
to see if subshells are supported.ConsolePanel
.(Edited for clarity)