Skip to content

Commit

Permalink
Look up the time through trait Runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith authored and djc committed May 1, 2024
1 parent d5849f0 commit 7cf0a02
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 20 deletions.
6 changes: 3 additions & 3 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ pub(crate) struct State {

impl State {
fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
let now = Instant::now();
let now = self.runtime.now();
let mut transmits = 0;

let max_datagrams = self.socket.max_transmit_segments();
Expand Down Expand Up @@ -1170,7 +1170,7 @@ impl State {

// A timer expired, so the caller needs to check for
// new transmits, which might cause new timers to be set.
self.inner.handle_timeout(Instant::now());
self.inner.handle_timeout(self.runtime.now());
self.timer_deadline = None;
true
}
Expand Down Expand Up @@ -1213,7 +1213,7 @@ impl State {
}

fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
self.inner.close(Instant::now(), error_code, reason);
self.inner.close(self.runtime.now(), error_code, reason);
self.terminate(ConnectionError::LocallyClosed, shared);
self.wake();
}
Expand Down
35 changes: 18 additions & 17 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Endpoint {

let (ch, conn) = endpoint
.inner
.connect(Instant::now(), config, addr, server_name)?;
.connect(self.runtime.now(), config, addr, server_name)?;

let socket = endpoint.socket.clone();
Ok(endpoint
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Future for EndpointDriver {
endpoint.driver = Some(cx.waker().clone());
}

let now = Instant::now();
let now = endpoint.runtime.now();
let mut keep_going = false;
keep_going |= endpoint.drive_recv(cx, now)?;
keep_going |= endpoint.handle_events(cx, &self.0.shared);
Expand Down Expand Up @@ -373,12 +373,11 @@ impl EndpointInner {
) -> Result<Connecting, ConnectionError> {
let mut state = self.state.lock().unwrap();
let mut response_buffer = Vec::new();
match state.inner.accept(
incoming,
Instant::now(),
&mut response_buffer,
server_config,
) {
let now = state.runtime.now();
match state
.inner
.accept(incoming, now, &mut response_buffer, server_config)
{
Ok((handle, conn)) => {
let socket = state.socket.clone();
let runtime = state.runtime.clone();
Expand Down Expand Up @@ -441,20 +440,21 @@ pub(crate) struct Shared {

impl State {
fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
self.recv_state.recv_limiter.start_cycle(Instant::now);
let get_time = || self.runtime.now();
self.recv_state.recv_limiter.start_cycle(get_time);
if let Some(socket) = &self.prev_socket {
// We don't care about the `PollProgress` from old sockets.
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &**socket, now);
let poll_res =
self.recv_state
.poll_socket(cx, &mut self.inner, &**socket, &*self.runtime, now);
if poll_res.is_err() {
self.prev_socket = None;
}
};
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, now);
self.recv_state.recv_limiter.finish_cycle(Instant::now);
let poll_res =
self.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, &*self.runtime, now);
self.recv_state.recv_limiter.finish_cycle(get_time);
let poll_res = poll_res?;
if poll_res.received_connection_packet {
// Traffic has arrived on self.socket, therefore there is no need for the abandoned
Expand Down Expand Up @@ -712,6 +712,7 @@ impl RecvState {
cx: &mut Context,
endpoint: &mut proto::Endpoint,
socket: &dyn AsyncUdpSocket,
runtime: &dyn Runtime,
now: Instant,
) -> Result<PollProgress, io::Error> {
let mut received_connection_packet = false;
Expand Down Expand Up @@ -786,7 +787,7 @@ impl RecvState {
return Err(e);
}
}
if !self.recv_limiter.allow_work(Instant::now) {
if !self.recv_limiter.allow_work(|| runtime.now()) {
return Ok(PollProgress {
received_connection_packet,
keep_going: true,
Expand Down
2 changes: 2 additions & 0 deletions quinn/src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ mod tracking {
///
/// The purpose will be recorded in the list of last lock owners
pub(crate) fn lock(&self, purpose: &'static str) -> MutexGuard<T> {
// We don't bother dispatching through Runtime::now because they're pure performance
// diagnostics.
let now = Instant::now();
let guard = self.inner.lock().unwrap();

Expand Down
6 changes: 6 additions & 0 deletions quinn/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub trait Runtime: Send + Sync + Debug + 'static {
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
/// Convert `t` into the socket type used by this runtime
fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>>;
/// Look up the current time
///
/// Allows simulating the flow of time for testing.
fn now(&self) -> Instant {
Instant::now()
}
}

/// Abstract implementation of an async timer for runtime independence
Expand Down
4 changes: 4 additions & 0 deletions quinn/src/runtime/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl Runtime for TokioRuntime {
io: tokio::net::UdpSocket::from_std(sock)?,
}))
}

fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
}

impl AsyncTimer for Sleep {
Expand Down

0 comments on commit 7cf0a02

Please sign in to comment.