diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index eee22b613..3df0159b2 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -952,7 +952,7 @@ pub(crate) struct State { impl State { fn drive_transmit(&mut self, cx: &mut Context) -> io::Result { - let now = Instant::now(); + let now = self.runtime.now(); let mut transmits = 0; let max_datagrams = self.socket.max_transmit_segments(); @@ -1170,7 +1170,7 @@ impl State { // A timer expired, so the caller needs to check for // new transmits, which might cause new timers to be set. - self.inner.handle_timeout(Instant::now()); + self.inner.handle_timeout(self.runtime.now()); self.timer_deadline = None; true } @@ -1213,7 +1213,7 @@ impl State { } fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) { - self.inner.close(Instant::now(), error_code, reason); + self.inner.close(self.runtime.now(), error_code, reason); self.terminate(ConnectionError::LocallyClosed, shared); self.wake(); } diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index cf307f0df..771e835c5 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -197,7 +197,7 @@ impl Endpoint { let (ch, conn) = endpoint .inner - .connect(Instant::now(), config, addr, server_name)?; + .connect(self.runtime.now(), config, addr, server_name)?; let socket = endpoint.socket.clone(); Ok(endpoint @@ -324,7 +324,7 @@ impl Future for EndpointDriver { endpoint.driver = Some(cx.waker().clone()); } - let now = Instant::now(); + let now = endpoint.runtime.now(); let mut keep_going = false; keep_going |= endpoint.drive_recv(cx, now)?; keep_going |= endpoint.handle_events(cx, &self.0.shared); @@ -373,12 +373,11 @@ impl EndpointInner { ) -> Result { let mut state = self.state.lock().unwrap(); let mut response_buffer = Vec::new(); - match state.inner.accept( - incoming, - Instant::now(), - &mut response_buffer, - server_config, - ) { + let now = state.runtime.now(); + match state + .inner + .accept(incoming, now, &mut response_buffer, server_config) + { Ok((handle, conn)) => { let socket = state.socket.clone(); let runtime = state.runtime.clone(); @@ -441,20 +440,21 @@ pub(crate) struct Shared { impl State { fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result { - self.recv_state.recv_limiter.start_cycle(Instant::now); + let get_time = || self.runtime.now(); + self.recv_state.recv_limiter.start_cycle(get_time); if let Some(socket) = &self.prev_socket { // We don't care about the `PollProgress` from old sockets. - let poll_res = self - .recv_state - .poll_socket(cx, &mut self.inner, &**socket, now); + let poll_res = + self.recv_state + .poll_socket(cx, &mut self.inner, &**socket, &*self.runtime, now); if poll_res.is_err() { self.prev_socket = None; } }; - let poll_res = self - .recv_state - .poll_socket(cx, &mut self.inner, &*self.socket, now); - self.recv_state.recv_limiter.finish_cycle(Instant::now); + let poll_res = + self.recv_state + .poll_socket(cx, &mut self.inner, &*self.socket, &*self.runtime, now); + self.recv_state.recv_limiter.finish_cycle(get_time); let poll_res = poll_res?; if poll_res.received_connection_packet { // Traffic has arrived on self.socket, therefore there is no need for the abandoned @@ -712,6 +712,7 @@ impl RecvState { cx: &mut Context, endpoint: &mut proto::Endpoint, socket: &dyn AsyncUdpSocket, + runtime: &dyn Runtime, now: Instant, ) -> Result { let mut received_connection_packet = false; @@ -786,7 +787,7 @@ impl RecvState { return Err(e); } } - if !self.recv_limiter.allow_work(Instant::now) { + if !self.recv_limiter.allow_work(|| runtime.now()) { return Ok(PollProgress { received_connection_packet, keep_going: true, diff --git a/quinn/src/mutex.rs b/quinn/src/mutex.rs index dc276d118..c110bcec8 100644 --- a/quinn/src/mutex.rs +++ b/quinn/src/mutex.rs @@ -44,6 +44,8 @@ mod tracking { /// /// The purpose will be recorded in the list of last lock owners pub(crate) fn lock(&self, purpose: &'static str) -> MutexGuard { + // We don't bother dispatching through Runtime::now because they're pure performance + // diagnostics. let now = Instant::now(); let guard = self.inner.lock().unwrap(); diff --git a/quinn/src/runtime.rs b/quinn/src/runtime.rs index 823c617f7..2a7a9c59f 100644 --- a/quinn/src/runtime.rs +++ b/quinn/src/runtime.rs @@ -19,6 +19,12 @@ pub trait Runtime: Send + Sync + Debug + 'static { fn spawn(&self, future: Pin + Send>>); /// Convert `t` into the socket type used by this runtime fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result>; + /// Look up the current time + /// + /// Allows simulating the flow of time for testing. + fn now(&self) -> Instant { + Instant::now() + } } /// Abstract implementation of an async timer for runtime independence diff --git a/quinn/src/runtime/tokio.rs b/quinn/src/runtime/tokio.rs index 42f63b214..ad321a240 100644 --- a/quinn/src/runtime/tokio.rs +++ b/quinn/src/runtime/tokio.rs @@ -33,6 +33,10 @@ impl Runtime for TokioRuntime { io: tokio::net::UdpSocket::from_std(sock)?, })) } + + fn now(&self) -> Instant { + tokio::time::Instant::now().into_std() + } } impl AsyncTimer for Sleep {