diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 09ee633c..5c4b549a 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -462,15 +462,23 @@ impl Future for PingManager { fn poll(&mut self) -> Poll<(), Error> { // If it's time for us to send a ping, then queue up a ping to get sent - // and start the clock for that ping to time out. + // and start the clock for that ping to time out once the ping is + // actually sent out on the socket. while let Async::Ready(_) = self.ping_interval.poll()? { match self.timeout { TimeoutState::None => {} _ => continue, } + debug!("scheduling a ping to be sent"); self.socket.borrow_mut().ping = true; - let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, &self.srv.handle)?; - self.timeout = TimeoutState::Ping(timeout); + } + { + let mut socket = self.socket.borrow_mut(); + if socket.ping && socket.send_ping()?.is_ready() { + let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, + &self.srv.handle)?; + self.timeout = TimeoutState::Ping(timeout); + } } // If the client takes too long to respond to our websocket ping or too @@ -488,24 +496,33 @@ impl Future for PingManager { } TimeoutState::Ping(ref mut timeout) => { if timeout.poll()?.is_ready() { - if let CloseState::Exchange(ref mut client) = self.client { - client.shutdown(); - } - return Err("pong not received within timeout".into()) + // We may not actually be reading messages from the + // websocket right now, could have been waiting on something + // else. Instead of immediately returning an error here wait + // for the stream to return `NotReady` when looking for + // messages, as then we're extra sure that no pong was + // received after this timeout elapsed. + debug!("ping timeout fired, scheduling error to maybe happen"); + self.socket.borrow_mut().pong_timeout = true; + // `timeout` is cleared in the clause below } } } // Received pongs will clear our ping timeout, but not the close // timeout. - if self.socket.borrow_mut().poll_pong().is_ready() { - if let TimeoutState::Ping(_) = self.timeout { + if let TimeoutState::Ping(_) = self.timeout { + let mut socket = self.socket.borrow_mut(); + if socket.poll_pong().is_ready() || socket.pong_timeout { + debug!("clearing ping timeout"); self.timeout = TimeoutState::None; } } - // Ensure the scheduled ping is actually flushed out - self.socket.borrow_mut().poll_complete()?; + // Be sure to always flush out any buffered messages/pings + self.socket.borrow_mut().poll_complete().chain_err(|| { + "failed routine `poll_complete` call" + })?; // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the @@ -531,6 +548,7 @@ struct WebpushSocket { inner: T, pong: Pong, ping: bool, + pong_timeout: bool, } enum Pong { @@ -545,6 +563,7 @@ impl WebpushSocket { inner: t, pong: Pong::None, ping: false, + pong_timeout: false, } } @@ -554,6 +573,7 @@ impl WebpushSocket { Pong::Received => return Async::Ready(()), Pong::Waiting(_) => {} } + debug!("waiting for a pong"); self.pong = Pong::Waiting(task::current()); Async::NotReady } @@ -562,9 +582,16 @@ impl WebpushSocket { where T: Sink, Error: From { if self.ping { + debug!("sending a ping"); match self.inner.start_send(Message::Ping(Vec::new()))? { - AsyncSink::Ready => self.ping = false, - AsyncSink::NotReady(_) => return Ok(Async::NotReady), + AsyncSink::Ready => { + debug!("ping sent"); + self.ping = false; + } + AsyncSink::NotReady(_) => { + debug!("ping not ready to be sent"); + return Ok(Async::NotReady) + } } } Ok(Async::Ready(())) @@ -580,32 +607,46 @@ impl Stream for WebpushSocket fn poll(&mut self) -> Poll, Error> { loop { - match try_ready!(self.inner.poll()) { - Some(Message::Text(ref s)) => { + let msg = match self.inner.poll()? { + Async::Ready(Some(msg)) => msg, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + // If we don't have any more messages and our pong timeout + // elapsed (set above) then this is where we start + // triggering errors. + if self.pong_timeout { + return Err("failed to receive a pong in time".into()) + } + return Ok(Async::NotReady) + } + }; + match msg { + Message::Text(ref s) => { + trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } - Some(Message::Binary(_)) => { + Message::Binary(_) => { return Err("binary messages not accepted".into()) } // sending a pong is already managed by lower layers, just go to // the next message - Some(Message::Ping(_)) => {} + Message::Ping(_) => {} // Wake up tasks waiting for a pong, if any. - Some(Message::Pong(_)) => { + Message::Pong(_) => { + self.pong_timeout = false; match mem::replace(&mut self.pong, Pong::Received) { Pong::None => {} Pong::Received => {} Pong::Waiting(task) => { + debug!("notifying a task of a pong"); task.notify(); } } } - - None => return Ok(None.into()), } } }