Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1014 from alexcrichton/ping-pong-fixes
Browse files Browse the repository at this point in the history
A few small Rust ping-pong fixes
  • Loading branch information
bbangert authored Sep 12, 2017
2 parents b179642 + 3b5075a commit 9f7ac5a
Showing 1 changed file with 61 additions and 20 deletions.
81 changes: 61 additions & 20 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,23 @@ impl Future for PingManager {

fn poll(&mut self) -> Poll<(), Error> {
// If it's time for us to send a ping, then queue up a ping to get sent
// and start the clock for that ping to time out.
// and start the clock for that ping to time out once the ping is
// actually sent out on the socket.
while let Async::Ready(_) = self.ping_interval.poll()? {
match self.timeout {
TimeoutState::None => {}
_ => continue,
}
debug!("scheduling a ping to be sent");
self.socket.borrow_mut().ping = true;
let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, &self.srv.handle)?;
self.timeout = TimeoutState::Ping(timeout);
}
{
let mut socket = self.socket.borrow_mut();
if socket.ping && socket.send_ping()?.is_ready() {
let timeout = Timeout::new(self.srv.opts.auto_ping_timeout,
&self.srv.handle)?;
self.timeout = TimeoutState::Ping(timeout);
}
}

// If the client takes too long to respond to our websocket ping or too
Expand All @@ -488,24 +496,33 @@ impl Future for PingManager {
}
TimeoutState::Ping(ref mut timeout) => {
if timeout.poll()?.is_ready() {
if let CloseState::Exchange(ref mut client) = self.client {
client.shutdown();
}
return Err("pong not received within timeout".into())
// We may not actually be reading messages from the
// websocket right now, could have been waiting on something
// else. Instead of immediately returning an error here wait
// for the stream to return `NotReady` when looking for
// messages, as then we're extra sure that no pong was
// received after this timeout elapsed.
debug!("ping timeout fired, scheduling error to maybe happen");
self.socket.borrow_mut().pong_timeout = true;
// `timeout` is cleared in the clause below
}
}
}

// Received pongs will clear our ping timeout, but not the close
// timeout.
if self.socket.borrow_mut().poll_pong().is_ready() {
if let TimeoutState::Ping(_) = self.timeout {
if let TimeoutState::Ping(_) = self.timeout {
let mut socket = self.socket.borrow_mut();
if socket.poll_pong().is_ready() || socket.pong_timeout {
debug!("clearing ping timeout");
self.timeout = TimeoutState::None;
}
}

// Ensure the scheduled ping is actually flushed out
self.socket.borrow_mut().poll_complete()?;
// Be sure to always flush out any buffered messages/pings
self.socket.borrow_mut().poll_complete().chain_err(|| {
"failed routine `poll_complete` call"
})?;

// At this point looks our state of ping management A-OK, so try to
// make progress on our client, and when done with that execute the
Expand All @@ -531,6 +548,7 @@ struct WebpushSocket<T> {
inner: T,
pong: Pong,
ping: bool,
pong_timeout: bool,
}

enum Pong {
Expand All @@ -545,6 +563,7 @@ impl<T> WebpushSocket<T> {
inner: t,
pong: Pong::None,
ping: false,
pong_timeout: false,
}
}

Expand All @@ -554,6 +573,7 @@ impl<T> WebpushSocket<T> {
Pong::Received => return Async::Ready(()),
Pong::Waiting(_) => {}
}
debug!("waiting for a pong");
self.pong = Pong::Waiting(task::current());
Async::NotReady
}
Expand All @@ -562,9 +582,16 @@ impl<T> WebpushSocket<T> {
where T: Sink<SinkItem = Message>, Error: From<T::SinkError>
{
if self.ping {
debug!("sending a ping");
match self.inner.start_send(Message::Ping(Vec::new()))? {
AsyncSink::Ready => self.ping = false,
AsyncSink::NotReady(_) => return Ok(Async::NotReady),
AsyncSink::Ready => {
debug!("ping sent");
self.ping = false;
}
AsyncSink::NotReady(_) => {
debug!("ping not ready to be sent");
return Ok(Async::NotReady)
}
}
}
Ok(Async::Ready(()))
Expand All @@ -580,32 +607,46 @@ impl<T> Stream for WebpushSocket<T>

fn poll(&mut self) -> Poll<Option<ClientMessage>, Error> {
loop {
match try_ready!(self.inner.poll()) {
Some(Message::Text(ref s)) => {
let msg = match self.inner.poll()? {
Async::Ready(Some(msg)) => msg,
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => {
// If we don't have any more messages and our pong timeout
// elapsed (set above) then this is where we start
// triggering errors.
if self.pong_timeout {
return Err("failed to receive a pong in time".into())
}
return Ok(Async::NotReady)
}
};
match msg {
Message::Text(ref s) => {
trace!("text message {}", s);
let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?;
return Ok(Some(msg).into())
}

Some(Message::Binary(_)) => {
Message::Binary(_) => {
return Err("binary messages not accepted".into())
}

// sending a pong is already managed by lower layers, just go to
// the next message
Some(Message::Ping(_)) => {}
Message::Ping(_) => {}

// Wake up tasks waiting for a pong, if any.
Some(Message::Pong(_)) => {
Message::Pong(_) => {
self.pong_timeout = false;
match mem::replace(&mut self.pong, Pong::Received) {
Pong::None => {}
Pong::Received => {}
Pong::Waiting(task) => {
debug!("notifying a task of a pong");
task.notify();
}
}
}

None => return Ok(None.into()),
}
}
}
Expand Down

0 comments on commit 9f7ac5a

Please sign in to comment.