From 8f47e9cb5c4a462bf59acf90ad100db781ebf51a Mon Sep 17 00:00:00 2001 From: Pixelstorm Date: Tue, 9 Apr 2024 13:05:18 +0100 Subject: [PATCH] Simplify even more and switch back to BinaryHeap --- quinn-proto/src/connection/streams/mod.rs | 72 ++++++++++++--------- quinn-proto/src/connection/streams/state.rs | 38 +++++------ 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 337c19ec35..e5728c751f 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{hash_map, BTreeMap}, - ops::Bound, -}; +use std::collections::{hash_map, BinaryHeap}; use bytes::Bytes; use thiserror::Error; @@ -237,7 +234,7 @@ impl<'a> SendStream<'a> { self.state.unacked_data += written.bytes as u64; trace!(stream = %self.id, "wrote {} bytes", written.bytes); if !was_pending { - push_pending(&mut self.state.pending, self.id, stream.priority); + self.state.pending.push_pending(self.id, stream.priority); } Ok(written) } @@ -268,7 +265,7 @@ impl<'a> SendStream<'a> { let was_pending = stream.is_pending(); stream.finish()?; if !was_pending { - push_pending(&mut self.state.pending, self.id, stream.priority); + self.state.pending.push_pending(self.id, stream.priority); } Ok(()) @@ -335,43 +332,54 @@ impl<'a> SendStream<'a> { } } -/// Push a pending stream ID with the given priority, queued after all preexisting streams for the priority -fn push_pending(pending: &mut BTreeMap, id: StreamId, priority: i32) { - // Get all preexisting streams of this priority - let mut range = pending.range(( - Bound::Included(PendingPriority { - priority, - recency: 0, - }), - Bound::Included(PendingPriority { - priority, +/// A queue of streams with pending outgoing data, sorted by priority +struct PendingStreamsQueue { + streams: BinaryHeap, + /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority. + /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending` + recency: u64, +} + +impl PendingStreamsQueue { + fn new() -> Self { + Self { + streams: BinaryHeap::new(), recency: u64::MAX, - }), - )); - - // Determine the recency value for this stream - // Setting it to 1 below all preexisting streams of this priority causes it to be sorted after all of those streams in - // the `BTreeMap`. This implements round-robin scheduling for streams that are still pending even after being handled, - // as in that case they are removed and then immediately reinserted. - // If this is the only/first stream for this priority, recency is initialised to u64::MAX instead. As this function is - // the only place that recency is initialised or mutated, this ensures that it will practically never underflow. - let recency = range.next().map(|(p, _)| p.recency - 1).unwrap_or(u64::MAX); - - pending.insert(PendingPriority { priority, recency }, id); + } + } + + /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority + fn push_pending(&mut self, id: StreamId, priority: i32) { + // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it + // after all other queued streams of the same priority. + // This is enough to implement round-robin scheduling for streams that are still pending even after being handled, + // as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted. + self.recency -= 1; + self.streams.push(PendingStream { + priority, + recency: self.recency, + id, + }); + } } -/// Key type for a [`BTreeMap`] for streams with pending data, to sort them by priority and recency +/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency #[derive(PartialEq, Eq, PartialOrd, Ord)] -struct PendingPriority { +struct PendingStream { /// The priority of the stream // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable) priority: i32, /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling: /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know - /// that it currently has the highest recency value, so it is deprioritized by 'leapfrogging' its recency past - /// the recency values of the other streams, down to 1 less than the previous lowest recency value for that priority + /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the + /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back + /// round to this one recency: u64, + /// The ID of the stream + // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below + // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive + id: StreamId, } /// Application events about streams diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index bbaade4076..e541bc199d 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map, BTreeMap, VecDeque}, + collections::{hash_map, VecDeque}, convert::TryFrom, mem, }; @@ -9,12 +9,12 @@ use rustc_hash::FxHashMap; use tracing::{debug, trace}; use super::{ - PendingPriority, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, StreamHalf, - ThinRetransmits, + PendingStreamsQueue, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, + StreamHalf, ThinRetransmits, }; use crate::{ coding::BufMutExt, - connection::{stats::FrameStats, streams::push_pending}, + connection::stats::FrameStats, frame::{self, FrameStruct, StreamMetaVec}, transport_parameters::TransportParameters, Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT, @@ -53,7 +53,7 @@ pub struct StreamsState { /// permitted to open but which have not yet been opened. pub(super) send_streams: usize, /// Streams with outgoing data queued, sorted by priority - pub(super) pending: BTreeMap, + pub(super) pending: PendingStreamsQueue, events: VecDeque, /// Streams blocked on connection-level flow control or stream window space @@ -115,7 +115,7 @@ impl StreamsState { opened: [false, false], next_reported_remote: [0, 0], send_streams: 0, - pending: BTreeMap::new(), + pending: PendingStreamsQueue::new(), events: VecDeque::new(), connection_blocked: Vec::new(), max_data: 0, @@ -192,7 +192,7 @@ impl StreamsState { } } - self.pending.clear(); + self.pending.streams.clear(); self.send_streams = 0; self.data_sent = 0; self.connection_blocked.clear(); @@ -345,9 +345,9 @@ impl StreamsState { /// Whether any stream data is queued, regardless of control frames pub(crate) fn can_send_stream_data(&self) -> bool { // Reset streams may linger in the pending stream list, but will never produce stream frames - self.pending.values().any(|id| { + self.pending.streams.iter().any(|stream| { self.send - .get(id) + .get(&stream.id) .and_then(|s| s.as_ref()) .map_or(false, |s| !s.is_reset()) }) @@ -503,10 +503,12 @@ impl StreamsState { // Pop the stream of the highest priority that currently has pending data // If the stream still has some pending data left after writing, it will be reinserted, otherwise not - let Some((_, id)) = self.pending.pop_last() else { + let Some(stream) = self.pending.streams.pop() else { break; }; + let id = stream.id; + let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) { Some(s) => s, // Stream was reset with pending data and the reset was acknowledged @@ -534,7 +536,7 @@ impl StreamsState { // If the stream still has pending data, reinsert it, possibly with an updated priority value // Fairness with other streams is achieved by implementing round-robin scheduling, // so that the other streams will have a chance to write data before we touch this stream again. - push_pending(&mut self.pending, id, stream.priority); + self.pending.push_pending(id, stream.priority); } let meta = frame::StreamMeta { id, offsets, fin }; @@ -614,7 +616,7 @@ impl StreamsState { Some(x) => x, }; if !stream.is_pending() { - push_pending(&mut self.pending, frame.id, stream.priority); + self.pending.push_pending(frame.id, stream.priority); } stream.fin_pending |= frame.fin; stream.pending.retransmit(frame.offsets); @@ -634,7 +636,7 @@ impl StreamsState { continue; } if !stream.is_pending() { - push_pending(&mut self.pending, id, stream.priority); + self.pending.push_pending(id, stream.priority); } stream.pending.retransmit_all_for_0rtt(); } @@ -1294,7 +1296,7 @@ mod tests { assert_eq!(meta[2].id, id_low); assert!(!server.can_send_stream_data()); - assert_eq!(server.pending.len(), 0); + assert_eq!(server.pending.streams.len(), 0); } #[test] @@ -1323,7 +1325,7 @@ mod tests { conn_state: &state, }; assert_eq!(mid.write(b"mid").unwrap(), 3); - assert_eq!(server.pending.len(), 1); + assert_eq!(server.pending.streams.len(), 1); let mut high = SendStream { id: id_high, @@ -1333,7 +1335,7 @@ mod tests { }; high.set_priority(1).unwrap(); assert_eq!(high.write(&[0; 200]).unwrap(), 200); - assert_eq!(server.pending.len(), 2); + assert_eq!(server.pending.streams.len(), 2); // Requeue the high priority stream to lowest priority. The initial send // still uses high priority since it's queued that way. After that it will @@ -1352,7 +1354,7 @@ mod tests { assert_eq!(meta[0].id, id_high); // After requeuing we should end up with 2 priorities - not 3 - assert_eq!(server.pending.len(), 2); + assert_eq!(server.pending.streams.len(), 2); // Send the remaining data. The initial mid priority one should go first now let meta = server.write_stream_frames(&mut buf, 1000); @@ -1361,7 +1363,7 @@ mod tests { assert_eq!(meta[1].id, id_high); assert!(!server.can_send_stream_data()); - assert_eq!(server.pending.len(), 0); + assert_eq!(server.pending.streams.len(), 0); } #[test]