Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not pre-allocate memory on streams #1682

Closed
wants to merge 9 commits into from
30 changes: 16 additions & 14 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cell::RefCell,
collections::{BinaryHeap, VecDeque},
collections::{hash_map, BinaryHeap, VecDeque},
};

use bytes::Bytes;
Expand Down Expand Up @@ -129,13 +129,13 @@ impl<'a> RecvStream<'a> {
/// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
/// attempts to operate on a stream will yield `UnknownStream` errors.
pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
let stream =
match self.state.recv.get_mut(&self.id).map(|s| {
s.get_or_insert_with(|| Box::new(Recv::new(self.state.stream_receive_window)))
}) {
Some(s) => s,
None => return Err(UnknownStream { _private: () }),
};
let mut entry = match self.state.recv.entry(self.id) {
hash_map::Entry::Occupied(s) => s,
hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }),
};
let stream = entry
.get_mut()
.get_or_insert_with(|| Box::new(Recv::new(self.state.stream_receive_window)));

let (read_credits, stop_sending) = stream.stop()?;
if stop_sending.should_transmit() {
Expand All @@ -149,7 +149,7 @@ impl<'a> RecvStream<'a> {
// connection-level flow control to account for discarded data. Otherwise, we can discard
// state immediately.
if !stream.receiving_unknown_size() {
self.state.recv.remove(&self.id);
entry.remove();
self.state.stream_freed(self.id, StreamHalf::Recv);
}

Expand Down Expand Up @@ -257,11 +257,12 @@ impl<'a> SendStream<'a> {
///
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
pub fn finish(&mut self) -> Result<(), FinishError> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.and_then(|s| s.as_mut())
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there are a few recurring patterns here that would make good candidates for moving into a method somewhere to raise the level of abstraction. We could address that after merging this, but would be great if you can take a whack at it.

.ok_or(FinishError::UnknownStream)?;

let was_pending = stream.is_pending();
Expand All @@ -278,11 +279,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.and_then(|s| s.as_mut())
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
.ok_or(UnknownStream { _private: () })?;

if matches!(stream.state, SendState::ResetSent) {
Expand All @@ -306,11 +308,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.and_then(|s| s.as_mut())
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
.ok_or(UnknownStream { _private: () })?;

stream.priority = priority;
Expand All @@ -326,10 +329,9 @@ impl<'a> SendStream<'a> {
.state
.send
.get(&self.id)
.and_then(|s| s.as_ref())
.ok_or(UnknownStream { _private: () })?;

Ok(stream.priority)
Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
}
}

Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl<'a> Chunks<'a> {
.stopped
{
true => return Err(ReadableError::UnknownStream),
false => entry.remove().unwrap(), // this can't fail at this point
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
};

recv.assembler.ensure_ordering(ordered)?;
Expand Down
50 changes: 29 additions & 21 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,16 @@ impl StreamsState {
}

pub(crate) fn received_ack_of(&mut self, frame: frame::StreamMeta) {
let stream = match self.send.get_mut(&frame.id).and_then(|s| s.as_mut()) {
None => return,
Some(s) => s,
let max_send_data = self.max_send_data(&frame.id);
let mut entry = match self.send.entry(frame.id) {
hash_map::Entry::Vacant(_) => return,
hash_map::Entry::Occupied(e) => e,
};

let stream = entry
.get_mut()
.get_or_insert_with(|| Box::new(Send::new(max_send_data)));
Ralith marked this conversation as resolved.
Show resolved Hide resolved

if stream.is_reset() {
// We account for outstanding data on reset streams at time of reset
return;
Expand All @@ -621,7 +626,7 @@ impl StreamsState {
return;
}

self.send.remove(&id);
entry.remove_entry();
self.stream_freed(id, StreamHalf::Send);
self.events.push_back(StreamEvent::Finished { id });
}
Expand All @@ -643,7 +648,10 @@ impl StreamsState {
for dir in Dir::iter() {
for index in 0..self.next[dir as usize] {
let id = StreamId::new(Side::Client, dir, index);
let stream = self.send.get_mut(&id).and_then(|s| s.as_mut()).unwrap();
let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(stream) => stream,
None => continue,
};
if stream.pending.is_fully_acked() && !stream.fin_pending {
// Stream data can't be acked in 0-RTT, so we must not have sent anything on
// this stream
Expand Down Expand Up @@ -833,25 +841,25 @@ impl StreamsState {
let bi = id.dir() == Dir::Bi;
// bidirectional OR (unidirectional AND NOT remote)
if bi || !remote {
if remote {
assert!(self.send.insert(id, None).is_none());
} else {
assert!(self
.send
.insert(id, Some(Box::new(Send::new(self.max_send_data(&id)))))
.is_none());
}
// if remote {
assert!(self.send.insert(id, None).is_none());
// } else {
// assert!(self
// .send
// .insert(id, Some(Box::new(Send::new(self.max_send_data(&id)))))
// .is_none());
// }
Ralith marked this conversation as resolved.
Show resolved Hide resolved
}
// bidirectional OR (unidirectional AND remote)
if bi || remote {
if remote {
assert!(self.recv.insert(id, None).is_none());
} else {
assert!(self
.recv
.insert(id, Some(Box::new(Recv::new(self.stream_receive_window))))
.is_none());
}
// if remote {
assert!(self.recv.insert(id, None).is_none());
// } else {
// assert!(self
// .recv
// .insert(id, Some(Box::new(Recv::new(self.stream_receive_window))))
// .is_none());
// }
}
}

Expand Down
Loading