diff --git a/ipa-core/src/helpers/buffers/ordering_sender.rs b/ipa-core/src/helpers/buffers/ordering_sender.rs index e8a21bda6..426d60492 100644 --- a/ipa-core/src/helpers/buffers/ordering_sender.rs +++ b/ipa-core/src/helpers/buffers/ordering_sender.rs @@ -55,14 +55,11 @@ impl State { } fn save_waker(v: &mut Option, cx: &Context<'_>) { - // here used to be a check that new waker will wake the same task. - // however, the contract for `will_wake` states that it is a best-effort and even if - // both wakes wake the same task, `will_wake` may still return `false`. - // This is exactly what happened once we started using HTTP/2 - somewhere deep inside hyper - // h2 implementation, there is a new waker (with the same vtable) that is used to poll - // this stream again. This does not happen when we use HTTP/1.1, but it does not matter for - // this code. - v.replace(cx.waker().clone()); + if let Some(waker) = v { + waker.clone_from(cx.waker()); + } else { + v.replace(cx.waker().clone()); + } } fn wake(v: &mut Option) { @@ -182,7 +179,6 @@ impl WaitingShard { match self.wakers[j].i.cmp(&i) { Ordering::Greater => (), Ordering::Equal => { - assert!(item.w.will_wake(&self.wakers[j].w)); self.wakers[j] = item; return Ok(()); } diff --git a/ipa-core/src/helpers/buffers/unordered_receiver.rs b/ipa-core/src/helpers/buffers/unordered_receiver.rs index 573024626..ef5e2a09f 100644 --- a/ipa-core/src/helpers/buffers/unordered_receiver.rs +++ b/ipa-core/src/helpers/buffers/unordered_receiver.rs @@ -59,7 +59,7 @@ where if recv.is_next(this.i) { recv.poll_next(cx) } else { - recv.add_waker(this.i, cx.waker().clone()); + recv.add_waker(this.i, cx.waker()); Poll::Pending } } @@ -190,7 +190,7 @@ where /// /// [`recv`]: UnorderedReceiver::recv /// [`poll`]: Future::poll - fn add_waker(&mut self, i: usize, waker: Waker) { + fn add_waker(&mut self, i: usize, waker: &Waker) { assert!( i > self.next, "Awaiting a read (record = {i}) that has already been fulfilled. Read cursor is currently at {}", self.next @@ -198,20 +198,17 @@ where // We don't save a waker at `self.next`, so `>` and not `>=`. if i > self.next + self.wakers.len() { #[cfg(feature = "stall-detection")] - let overflow = (waker, i); + let overflow = (waker.clone(), i); #[cfg(not(feature = "stall-detection"))] - let overflow = waker; + let overflow = waker.clone(); self.overflow_wakers.push(overflow); } else { let index = i % self.wakers.len(); - if let Some(old) = self.wakers[index].as_ref() { - // We are OK with having multiple polls of the same `Receiver` - // (or two `Receiver`s for the same item being polled). - // However, as we are only tracking one waker, they both need - // to be woken when we invoke the waker we get. - assert!(waker.will_wake(old)); + if let Some(old) = self.wakers[index].as_mut() { + old.clone_from(waker); + } else { + self.wakers[index] = Some(waker.clone()); } - self.wakers[index] = Some(waker); } } diff --git a/ipa-core/src/helpers/transport/stream/collection.rs b/ipa-core/src/helpers/transport/stream/collection.rs index b5b1a8bcf..d67e58283 100644 --- a/ipa-core/src/helpers/transport/stream/collection.rs +++ b/ipa-core/src/helpers/transport/stream/collection.rs @@ -81,29 +81,24 @@ impl StreamCollection { let mut streams = self.inner.lock().unwrap(); match streams.entry(key.clone()) { - Entry::Occupied(mut entry) => { - match entry.get_mut() { - StreamState::Waiting(old_waker) => { - let will_wake = old_waker.will_wake(waker); - drop(streams); // avoid mutex poisoning - assert!(will_wake); - None - } - rs @ StreamState::Ready(_) => { - let StreamState::Ready(stream) = - std::mem::replace(rs, StreamState::Completed) - else { - unreachable!(); - }; + Entry::Occupied(mut entry) => match entry.get_mut() { + StreamState::Waiting(old_waker) => { + old_waker.clone_from(waker); + None + } + rs @ StreamState::Ready(_) => { + let StreamState::Ready(stream) = std::mem::replace(rs, StreamState::Completed) + else { + unreachable!(); + }; - Some(stream) - } - StreamState::Completed => { - drop(streams); - panic!("{key:?} stream has been consumed already") - } + Some(stream) } - } + StreamState::Completed => { + drop(streams); + panic!("{key:?} stream has been consumed already") + } + }, Entry::Vacant(entry) => { entry.insert(StreamState::Waiting(waker.clone())); None