Skip to content

Commit

Permalink
Remove will_wake calls
Browse files Browse the repository at this point in the history
We used `will_wake` in correctness checks to make sure we don't hang the future/stream forever if a wrong waker is used, but the contract for this function is best-effort and it started generating more and more false positives.

The recent one, found by @aleiserson, is inside rust-lang (rust-lang/rust#119863), so it is very hard to get around.

We don't have a good replacement for this check unless we implement our own waker, but it is probably too much for now
  • Loading branch information
akoshelev committed May 3, 2024
1 parent 20c89ed commit d34154b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 33 deletions.
1 change: 0 additions & 1 deletion ipa-core/src/helpers/buffers/ordering_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ impl WaitingShard {
match self.wakers[j].i.cmp(&i) {
Ordering::Greater => (),
Ordering::Equal => {
assert!(item.w.will_wake(&self.wakers[j].w));
self.wakers[j] = item;
return Ok(());
}
Expand Down
19 changes: 8 additions & 11 deletions ipa-core/src/helpers/buffers/unordered_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
if recv.is_next(this.i) {
recv.poll_next(cx)
} else {
recv.add_waker(this.i, cx.waker().clone());
recv.add_waker(this.i, cx.waker());
Poll::Pending
}
}
Expand Down Expand Up @@ -190,28 +190,25 @@ where
///
/// [`recv`]: UnorderedReceiver::recv
/// [`poll`]: Future::poll
fn add_waker(&mut self, i: usize, waker: Waker) {
fn add_waker(&mut self, i: usize, waker: &Waker) {
assert!(
i > self.next,
"Awaiting a read (record = {i}) that has already been fulfilled. Read cursor is currently at {}", self.next
);
// We don't save a waker at `self.next`, so `>` and not `>=`.
if i > self.next + self.wakers.len() {
#[cfg(feature = "stall-detection")]
let overflow = (waker, i);
let overflow = (waker.clone(), i);
#[cfg(not(feature = "stall-detection"))]
let overflow = waker;
let overflow = waker.clone();
self.overflow_wakers.push(overflow);
} else {
let index = i % self.wakers.len();
if let Some(old) = self.wakers[index].as_ref() {
// We are OK with having multiple polls of the same `Receiver`
// (or two `Receiver`s for the same item being polled).
// However, as we are only tracking one waker, they both need
// to be woken when we invoke the waker we get.
assert!(waker.will_wake(old));
if let Some(old) = self.wakers[index].as_mut() {
old.clone_from(waker);
} else {
self.wakers[index] = Some(waker.clone());
}
self.wakers[index] = Some(waker);
}
}

Expand Down
37 changes: 16 additions & 21 deletions ipa-core/src/helpers/transport/stream/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,24 @@ impl<I: TransportIdentity, S: Stream> StreamCollection<I, S> {
let mut streams = self.inner.lock().unwrap();

match streams.entry(key.clone()) {
Entry::Occupied(mut entry) => {
match entry.get_mut() {
StreamState::Waiting(old_waker) => {
let will_wake = old_waker.will_wake(waker);
drop(streams); // avoid mutex poisoning
assert!(will_wake);
None
}
rs @ StreamState::Ready(_) => {
let StreamState::Ready(stream) =
std::mem::replace(rs, StreamState::Completed)
else {
unreachable!();
};
Entry::Occupied(mut entry) => match entry.get_mut() {
StreamState::Waiting(old_waker) => {
old_waker.clone_from(waker);
None
}
rs @ StreamState::Ready(_) => {
let StreamState::Ready(stream) = std::mem::replace(rs, StreamState::Completed)
else {
unreachable!();
};

Some(stream)
}
StreamState::Completed => {
drop(streams);
panic!("{key:?} stream has been consumed already")
}
Some(stream)
}
}
StreamState::Completed => {
drop(streams);
panic!("{key:?} stream has been consumed already")
}
},
Entry::Vacant(entry) => {
entry.insert(StreamState::Waiting(waker.clone()));
None
Expand Down

0 comments on commit d34154b

Please sign in to comment.