Drop messages on the client for closed streams/subscriptions #133
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why
Each stream/subscription has a messages channel with a capacity of 128 messages. In our main receive loop, we push messages into the channel, blocking until the channel has room. This adds some backpressure, but becomes problematic if the stream is not making any progress. For example, the client could start a stream and then decide to cancel it and not read any of the messages. If the server sends >128 messages, it will fill up the stream's channel leading to a deadlock for the session.
This will be more correctly fixed when river v2 support is landed, as that adds support for proper cancellation. In the meantime, we can close the channel when we know we are not going to be reading from it anymore, and then drop any messages destiined for a closed channel.
rpc/upload are not affected because the server is only allowed to send 1 payload, and the channel has a buffer size of 1, so there will always be room.
Note
A deadlock can still occur if the client holds a reference to the
AsyncGenerator
but doesn't service it. This is likely a client bug if it happens, but we can probably add some timeouts to putting messages in the stream channel for defense in depth. I'll do this as a followup as I need to think a little bit more about how to properly handle that case. This PR as-is should be a quick win for our usage since we shouldn't be holding references to async generators that we aren't also actively servicing.What changed
ChannelClosed
errors when adding message to streamTest plan