Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for stream cloning #203

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']

except trio.ClosedResourceError:
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration

except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise

except KeyError:
# internal error should never get here
assert msg.get('cid'), (
Expand All @@ -102,14 +105,25 @@ async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
if self._rx_chan._closed:
rx_chan = self._rx_chan
stats = rx_chan.statistics()

if rx_chan._closed:
log.warning(f"{self} is already closed")
return

if stats.open_receive_channels > 1:
# if we've been cloned don't kill the stream
log.debug("there are still consumers running keeping stream alive")
return

if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return

# close the local mem chan
rx_chan.close()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely needs to be put above.

Thanks to @richardsheridan for catching 🏄🏼


cid = self._cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
Expand All @@ -131,11 +145,16 @@ async def aclose(self):
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")

with trio.CancelScope(shield=True):
await self._rx_chan.aclose()

def clone(self):
return self
"""Clone this receive channel allowing for multi-task
consumption from the same channel.

"""
return ReceiveStream(
self._cid,
self._rx_chan.clone(),
self._portal,
)


class Portal:
Expand Down