diff --git a/zmq/_future.py b/zmq/_future.py index 1d22ba647..76aa82b9f 100644 --- a/zmq/_future.py +++ b/zmq/_future.py @@ -383,14 +383,14 @@ def poll(self, timeout=None, flags=_zmq.POLLIN) -> Awaitable[int]: # type: igno p = self._poller_class() p.register(self, flags) - f = cast(Future, p.poll(timeout)) + poll_future = cast(Future, p.poll(timeout)) future = self._Future() def unwrap_result(f): if future.done(): return - if f.cancelled(): + if poll_future.cancelled(): try: future.cancel() except RuntimeError: @@ -398,16 +398,28 @@ def unwrap_result(f): pass return if f.exception(): - future.set_exception(f.exception()) + future.set_exception(poll_future.exception()) else: - evts = dict(f.result()) + evts = dict(poll_future.result()) future.set_result(evts.get(self, 0)) - if f.done(): - # hook up result if - unwrap_result(f) + if poll_future.done(): + # hook up result if already done + unwrap_result(poll_future) else: - f.add_done_callback(unwrap_result) + poll_future.add_done_callback(unwrap_result) + + def cancel_poll(future): + """Cancel underlying poll if request has been cancelled""" + if not poll_future.done(): + try: + poll_future.cancel() + except RuntimeError: + # RuntimeError may be called during teardown + pass + + future.add_done_callback(cancel_poll) + return future # overrides only necessary for updated types diff --git a/zmq/tests/test_asyncio.py b/zmq/tests/test_asyncio.py index 01a94c335..92f47d452 100644 --- a/zmq/tests/test_asyncio.py +++ b/zmq/tests/test_asyncio.py @@ -342,6 +342,19 @@ def test_shadow(): assert async_s.type == s.type +async def test_poll_leak(): + ctx = zmq.asyncio.Context() + with ctx, ctx.socket(zmq.PULL) as s: + assert len(s._recv_futures) == 0 + for i in range(10): + f = asyncio.ensure_future(s.poll(timeout=1000, flags=zmq.PollEvent.POLLIN)) + f.cancel() + await asyncio.sleep(0) + # one more sleep allows further chained cleanup + await asyncio.sleep(0.1) + assert len(s._recv_futures) == 0 + + class ProcessForTeardownTest(Process): def run(self): """Leave context, socket and event loop upon implicit disposal"""