Skip to content

Commit

Permalink
Merge pull request #1889 from minrk/asyncio-poll-leak
Browse files Browse the repository at this point in the history
ensure never-resolved Socket.poll futures get cleaned up
  • Loading branch information
minrk committed Jul 27, 2023
2 parents 93cdeaf + e604a77 commit 4a26a05
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
28 changes: 20 additions & 8 deletions zmq/_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,31 +383,43 @@ def poll(self, timeout=None, flags=_zmq.POLLIN) -> Awaitable[int]: # type: igno

p = self._poller_class()
p.register(self, flags)
f = cast(Future, p.poll(timeout))
poll_future = cast(Future, p.poll(timeout))

future = self._Future()

def unwrap_result(f):
if future.done():
return
if f.cancelled():
if poll_future.cancelled():
try:
future.cancel()
except RuntimeError:
# RuntimeError may be called during teardown
pass
return
if f.exception():
future.set_exception(f.exception())
future.set_exception(poll_future.exception())
else:
evts = dict(f.result())
evts = dict(poll_future.result())
future.set_result(evts.get(self, 0))

if f.done():
# hook up result if
unwrap_result(f)
if poll_future.done():
# hook up result if already done
unwrap_result(poll_future)
else:
f.add_done_callback(unwrap_result)
poll_future.add_done_callback(unwrap_result)

def cancel_poll(future):
"""Cancel underlying poll if request has been cancelled"""
if not poll_future.done():
try:
poll_future.cancel()
except RuntimeError:
# RuntimeError may be called during teardown
pass

future.add_done_callback(cancel_poll)

return future

# overrides only necessary for updated types
Expand Down
13 changes: 13 additions & 0 deletions zmq/tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,19 @@ def test_shadow():
assert async_s.type == s.type


async def test_poll_leak():
ctx = zmq.asyncio.Context()
with ctx, ctx.socket(zmq.PULL) as s:
assert len(s._recv_futures) == 0
for i in range(10):
f = asyncio.ensure_future(s.poll(timeout=1000, flags=zmq.PollEvent.POLLIN))
f.cancel()
await asyncio.sleep(0)
# one more sleep allows further chained cleanup
await asyncio.sleep(0.1)
assert len(s._recv_futures) == 0


class ProcessForTeardownTest(Process):
def run(self):
"""Leave context, socket and event loop upon implicit disposal"""
Expand Down

0 comments on commit 4a26a05

Please sign in to comment.