Skip to content

Commit

Permalink
Simplify even more and switch back to BinaryHeap
Browse files Browse the repository at this point in the history
  • Loading branch information
BigWingBeat committed Apr 9, 2024
1 parent fd2e50e commit 8f47e9c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 50 deletions.
72 changes: 40 additions & 32 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::{hash_map, BTreeMap},
ops::Bound,
};
use std::collections::{hash_map, BinaryHeap};

use bytes::Bytes;
use thiserror::Error;
Expand Down Expand Up @@ -237,7 +234,7 @@ impl<'a> SendStream<'a> {
self.state.unacked_data += written.bytes as u64;
trace!(stream = %self.id, "wrote {} bytes", written.bytes);
if !was_pending {
push_pending(&mut self.state.pending, self.id, stream.priority);
self.state.pending.push_pending(self.id, stream.priority);
}
Ok(written)
}
Expand Down Expand Up @@ -268,7 +265,7 @@ impl<'a> SendStream<'a> {
let was_pending = stream.is_pending();
stream.finish()?;
if !was_pending {
push_pending(&mut self.state.pending, self.id, stream.priority);
self.state.pending.push_pending(self.id, stream.priority);
}

Ok(())
Expand Down Expand Up @@ -335,43 +332,54 @@ impl<'a> SendStream<'a> {
}
}

/// Push a pending stream ID with the given priority, queued after all preexisting streams for the priority
fn push_pending(pending: &mut BTreeMap<PendingPriority, StreamId>, id: StreamId, priority: i32) {
// Get all preexisting streams of this priority
let mut range = pending.range((
Bound::Included(PendingPriority {
priority,
recency: 0,
}),
Bound::Included(PendingPriority {
priority,
/// A queue of streams with pending outgoing data, sorted by priority
struct PendingStreamsQueue {
streams: BinaryHeap<PendingStream>,
/// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
/// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
recency: u64,
}

impl PendingStreamsQueue {
fn new() -> Self {
Self {
streams: BinaryHeap::new(),
recency: u64::MAX,
}),
));

// Determine the recency value for this stream
// Setting it to 1 below all preexisting streams of this priority causes it to be sorted after all of those streams in
// the `BTreeMap`. This implements round-robin scheduling for streams that are still pending even after being handled,
// as in that case they are removed and then immediately reinserted.
// If this is the only/first stream for this priority, recency is initialised to u64::MAX instead. As this function is
// the only place that recency is initialised or mutated, this ensures that it will practically never underflow.
let recency = range.next().map(|(p, _)| p.recency - 1).unwrap_or(u64::MAX);

pending.insert(PendingPriority { priority, recency }, id);
}
}

/// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
fn push_pending(&mut self, id: StreamId, priority: i32) {
// As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
// after all other queued streams of the same priority.
// This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
// as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted.
self.recency -= 1;
self.streams.push(PendingStream {
priority,
recency: self.recency,
id,
});
}
}

/// Key type for a [`BTreeMap`] for streams with pending data, to sort them by priority and recency
/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct PendingPriority {
struct PendingStream {
/// The priority of the stream
// Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
// (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
priority: i32,
/// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
/// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
/// that it currently has the highest recency value, so it is deprioritized by 'leapfrogging' its recency past
/// the recency values of the other streams, down to 1 less than the previous lowest recency value for that priority
/// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
/// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
/// round to this one
recency: u64,
/// The ID of the stream
// The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
// the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
id: StreamId,
}

/// Application events about streams
Expand Down
38 changes: 20 additions & 18 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{hash_map, BTreeMap, VecDeque},
collections::{hash_map, VecDeque},
convert::TryFrom,
mem,
};
Expand All @@ -9,12 +9,12 @@ use rustc_hash::FxHashMap;
use tracing::{debug, trace};

use super::{
PendingPriority, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, StreamHalf,
ThinRetransmits,
PendingStreamsQueue, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent,
StreamHalf, ThinRetransmits,
};
use crate::{
coding::BufMutExt,
connection::{stats::FrameStats, streams::push_pending},
connection::stats::FrameStats,
frame::{self, FrameStruct, StreamMetaVec},
transport_parameters::TransportParameters,
Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT,
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct StreamsState {
/// permitted to open but which have not yet been opened.
pub(super) send_streams: usize,
/// Streams with outgoing data queued, sorted by priority
pub(super) pending: BTreeMap<PendingPriority, StreamId>,
pub(super) pending: PendingStreamsQueue,

events: VecDeque<StreamEvent>,
/// Streams blocked on connection-level flow control or stream window space
Expand Down Expand Up @@ -115,7 +115,7 @@ impl StreamsState {
opened: [false, false],
next_reported_remote: [0, 0],
send_streams: 0,
pending: BTreeMap::new(),
pending: PendingStreamsQueue::new(),
events: VecDeque::new(),
connection_blocked: Vec::new(),
max_data: 0,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl StreamsState {
}
}

self.pending.clear();
self.pending.streams.clear();
self.send_streams = 0;
self.data_sent = 0;
self.connection_blocked.clear();
Expand Down Expand Up @@ -345,9 +345,9 @@ impl StreamsState {
/// Whether any stream data is queued, regardless of control frames
pub(crate) fn can_send_stream_data(&self) -> bool {
// Reset streams may linger in the pending stream list, but will never produce stream frames
self.pending.values().any(|id| {
self.pending.streams.iter().any(|stream| {
self.send
.get(id)
.get(&stream.id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
Expand Down Expand Up @@ -503,10 +503,12 @@ impl StreamsState {

// Pop the stream of the highest priority that currently has pending data
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
let Some((_, id)) = self.pending.pop_last() else {
let Some(stream) = self.pending.streams.pop() else {
break;
};

let id = stream.id;

let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(s) => s,
// Stream was reset with pending data and the reset was acknowledged
Expand Down Expand Up @@ -534,7 +536,7 @@ impl StreamsState {
// If the stream still has pending data, reinsert it, possibly with an updated priority value
// Fairness with other streams is achieved by implementing round-robin scheduling,
// so that the other streams will have a chance to write data before we touch this stream again.
push_pending(&mut self.pending, id, stream.priority);
self.pending.push_pending(id, stream.priority);
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -614,7 +616,7 @@ impl StreamsState {
Some(x) => x,
};
if !stream.is_pending() {
push_pending(&mut self.pending, frame.id, stream.priority);
self.pending.push_pending(frame.id, stream.priority);
}
stream.fin_pending |= frame.fin;
stream.pending.retransmit(frame.offsets);
Expand All @@ -634,7 +636,7 @@ impl StreamsState {
continue;
}
if !stream.is_pending() {
push_pending(&mut self.pending, id, stream.priority);
self.pending.push_pending(id, stream.priority);
}
stream.pending.retransmit_all_for_0rtt();
}
Expand Down Expand Up @@ -1294,7 +1296,7 @@ mod tests {
assert_eq!(meta[2].id, id_low);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 0);
assert_eq!(server.pending.streams.len(), 0);
}

#[test]
Expand Down Expand Up @@ -1323,7 +1325,7 @@ mod tests {
conn_state: &state,
};
assert_eq!(mid.write(b"mid").unwrap(), 3);
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.streams.len(), 1);

let mut high = SendStream {
id: id_high,
Expand All @@ -1333,7 +1335,7 @@ mod tests {
};
high.set_priority(1).unwrap();
assert_eq!(high.write(&[0; 200]).unwrap(), 200);
assert_eq!(server.pending.len(), 2);
assert_eq!(server.pending.streams.len(), 2);

// Requeue the high priority stream to lowest priority. The initial send
// still uses high priority since it's queued that way. After that it will
Expand All @@ -1352,7 +1354,7 @@ mod tests {
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);
assert_eq!(server.pending.streams.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000);
Expand All @@ -1361,7 +1363,7 @@ mod tests {
assert_eq!(meta[1].id, id_high);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 0);
assert_eq!(server.pending.streams.len(), 0);
}

#[test]
Expand Down

0 comments on commit 8f47e9c

Please sign in to comment.