diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index 7be778054..c340c085e 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -101,6 +101,7 @@ def _create_socket(self) -> None: assert self.context is not None self.socket = self.context.socket(zmq.REQ) self.socket.linger = 1000 + assert self.address is not None self.socket.connect(self.address) self.poller.register(self.socket, zmq.POLLIN) @@ -192,9 +193,7 @@ def call_handlers(self, since_last_heartbeat: float) -> None: class ZMQSocketChannel(object): """A ZMQ socket in an async API""" - def __init__( - self, socket: zmq.sugar.socket.Socket, session: Session, loop: t.Any = None - ) -> None: + def __init__(self, socket: zmq.asyncio.Socket, session: Session, loop: t.Any = None) -> None: """Create a channel. Parameters @@ -208,7 +207,7 @@ def __init__( """ super().__init__() - self.socket: t.Optional[zmq.sugar.socket.Socket] = socket + self.socket: t.Optional[zmq.asyncio.Socket] = socket self.session = session async def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]: diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index af7d64540..dcca6c94e 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -8,11 +8,9 @@ from threading import Event from threading import Thread from typing import Any -from typing import Awaitable from typing import Dict from typing import List from typing import Optional -from typing import Union import zmq from traitlets import Instance @@ -30,10 +28,6 @@ # during garbage collection of threads at exit -async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]: - return await msg - - class ThreadedZMQSocketChannel(object): """A ZMQ socket invoking a callback in the ioloop""" @@ -68,6 +62,7 @@ def __init__( evt = Event() def setup_stream(): + assert self.socket is not None self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) self.stream.on_recv(self._handle_recv) evt.set() @@ -113,13 +108,11 @@ def thread_send(): assert self.ioloop is not None self.ioloop.add_callback(thread_send) - def _handle_recv(self, future_msg: Awaitable) -> None: + def _handle_recv(self, msg_list: List[bytes]) -> None: """Callback for stream.on_recv. Unpacks message, and calls handlers with it. """ - assert self.ioloop is not None - msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg)) assert self.session is not None ident, smsg = self.session.feed_identities(msg_list) msg = self.session.deserialize(smsg) diff --git a/pyproject.toml b/pyproject.toml index 0ba47d9ea..a4bce3f92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "jupyter_core>=4.9.2", "nest-asyncio>=1.5.4", "python-dateutil>=2.8.2", - "pyzmq>=22.3", + "pyzmq>=23.0", "tornado>=6.0", "traitlets", ]