Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually pin Sleep futures #2914

Merged
merged 2 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions tower-batch/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ where
// submitted, so that the batch latency of all entries is at most
// self.max_latency. However, we don't keep the timer running unless
// there is a pending request to prevent wakeups on idle services.
let mut timer: Option<Sleep> = None;
let mut timer: Option<Pin<Box<Sleep>>> = None;
let mut pending_items = 0usize;
loop {
match timer {
match timer.as_mut() {
None => match self.rx.next().await {
// The first message in a new batch.
Some(msg) => {
Expand All @@ -135,13 +135,13 @@ where
// Apply the provided span to request processing
.instrument(span)
.await;
timer = Some(sleep(self.max_latency));
timer = Some(Box::pin(sleep(self.max_latency)));
pending_items = 1;
}
// No more messages, ever.
None => return,
},
Some(mut sleep) => {
Some(sleep) => {
// Wait on either a new message or the batch timer.
// If both are ready, select! chooses one of them at random.
tokio::select! {
Expand All @@ -161,16 +161,15 @@ where
timer = None;
pending_items = 0;
} else {
// The timer is still running, set it back!
timer = Some(sleep);
// The timer is still running.
}
}
None => {
// No more messages, ever.
return;
}
},
() = &mut sleep => {
() = sleep => {
// The batch timer elapsed.
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
Expand Down
8 changes: 4 additions & 4 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).

use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, pin::Pin, sync::Arc};

use futures::{
future::{self, Either},
Expand Down Expand Up @@ -320,7 +320,7 @@ pub struct Connection<S, Tx> {
/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Sleep>,
pub(super) request_timer: Option<Pin<Box<Sleep>>>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,
Expand Down Expand Up @@ -780,11 +780,11 @@ where
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
Expand Down