-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace async-channel #708
Conversation
Use tokio::sync::Notify to signal to the server when a subscriber has gone away without calling unsubscribe
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…rpsee into dp-replace-async_channel
Either::Right((None, _)) => { | ||
self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset)); | ||
break Ok(()); | ||
if let Some(close_notify) = self.close_notify.clone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the clone here @dvdplm?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't move out the Arc<Notify>
and I can't use a reference either (because send
takes a mutable reference below). :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be possible with take
here but it didn't work when I tried so I just wonder why :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take()
compiles but then the tests fail. :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Played a bit with this, and not sure there is a super clean solution. You either clone it here, or somehow decouple the closed_fut
from self
lifetime wise, which is really hard with pinned futures.
@dvdplm can you remove the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Either::Right((None, _)) => { | ||
self.close(&SubscriptionClosed::new(SubscriptionClosedReason::ConnectionReset)); | ||
break Ok(()); | ||
if let Some(close_notify) = self.close_notify.clone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Played a bit with this, and not sure there is a super clean solution. You either clone it here, or somehow decouple the closed_fut
from self
lifetime wise, which is really hard with pinned futures.
Replace
async-channel
withtokio::sync::Notify
to use a dependency less and (possibly?) be a bit faster by using a lighter primitive (maybe?). The problem theasync-channel
is solving is: how does theSubscriptionSink
know about rude subscribers that leaves without callingunsubscribe_*
? Using a full-on channel for this might be overkill and this is an attempt at fixing that.Fixes #691