-
Notifications
You must be signed in to change notification settings - Fork 2
/
async_gui_ipython_kernel.py
165 lines (139 loc) · 5.63 KB
/
async_gui_ipython_kernel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import zmq
import sys
from typing import Any, Tuple
from tornado import gen
from ipykernel.ipkernel import IPythonKernel
class AsyncGUIKernel(IPythonKernel):
implementation = 'Async GUI'
banner = (
'Async GUI - Allow Comm messages to be passed '
'when other cells are running.'
)
# Since this is not explicitly defined in the parent class
comm_msg_types = [ 'comm_msg' ]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.log = self.log.getChild('AsyncGUIKernel')
self.log.setLevel(logging.INFO)
#self.log.addHandler(logging.StreamHandler(sys.__stderr__))
def _parse_message(self, msg) -> Tuple[Any, dict]:
"""dispatch control requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
return (idents, msg)
except:
self.log.error("Invalid Message", exc_info=True)
return
@gen.coroutine
def dispatch_shell(self, stream, msg: dict, idents):
"""dispatch shell requests"""
# Set the parent message for side effects.
self.set_parent(idents, msg)
self._publish_status('busy')
if self._aborting:
self._send_abort_reply(stream, msg, idents)
self._publish_status('idle')
# flush to ensure reply is sent before
# handling the next request
stream.flush(zmq.POLLOUT)
return
msg_type = msg['header']['msg_type']
# Print some info about this message and leave a '--->' marker, so it's
# easier to trace visually the message chain when debugging. Each
# handler prints its message at the end.
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
self.log.debug(' Content: %s\n --->\n ', msg['content'])
if not self.should_handle(stream, msg, idents):
return
handler = self.shell_handlers.get(msg_type, None)
if handler is None:
self.log.warning("Unknown message type: %r", msg_type)
else:
self.log.debug("%s: %s", msg_type, msg)
try:
self.pre_handler_hook()
except Exception:
self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
try:
yield gen.maybe_future(handler(stream, idents, msg))
except Exception:
self.log.error("Exception in message handler:", exc_info=True)
finally:
try:
self.post_handler_hook()
except Exception:
self.log.debug("Unable to signal in post_handler_hook:", exc_info=True)
sys.stdout.flush()
sys.stderr.flush()
self._publish_status('idle')
# flush to ensure reply is sent before
# handling the next request
stream.flush(zmq.POLLOUT)
@gen.coroutine
def dispatch_control(self, msg: dict, idents):
self.log.debug("Control received: %s", msg)
# Set the parent message for side effects.
self.set_parent(idents, msg)
self._publish_status('busy')
if self._aborting:
self._send_abort_reply(self.control_stream, msg, idents)
self._publish_status('idle')
return
header = msg['header']
msg_type = header['msg_type']
handler = self.control_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
else:
try:
yield gen.maybe_future(handler(self.control_stream, idents, msg))
except Exception:
self.log.error("Exception in control handler:", exc_info=True)
sys.stdout.flush()
sys.stderr.flush()
self._publish_status('idle')
# flush to ensure reply is sent
self.control_stream.flush(zmq.POLLOUT)
def schedule_dispatch(self, priority, dispatch, *args):
"""
Changes the schedule_dispatch dispatch method to
always dispatch comm events.
"""
# Only dispatch_shell messages have two args
if len(args) == 2:
stream, unparsed_msg = args
indent, msg = self._parse_message(unparsed_msg)
new_args = (stream, msg, indent)
elif len(args) == 1:
# One arg
(unparsed_msg,) = args
indent, msg = self._parse_message(unparsed_msg)
new_args = (msg, indent)
elif len(args) == 0:
new_args = args
if new_args and msg['header']['msg_type'] in self.comm_msg_types:
return self.io_loop.add_callback(dispatch, *new_args)
else:
idx = next(self._message_counter)
self.msg_queue.put_nowait(
(
priority,
idx,
dispatch,
new_args,
)
)
# ensure the eventloop wakes up
self.io_loop.add_callback(lambda: None)
def set_parent(self, ident, parent):
# The last message sent will set what cell output
# to use. We want to the awaiting future to print
# it's own output, not the cell which the comm is
# associated with.
# Don't change the output if the message is from a comm
#if parent['header']['msg_type'] not in self.comm_msg_types:
super().set_parent(ident, parent)
if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
IPKernelApp.launch_instance(kernel_class=AsyncGUIKernel)