-
-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ignore received frames on a stream locally reset #174
Conversation
self.flow.send_data(sz); | ||
|
||
// Track the data as in-flight | ||
self.in_flight_data += sz; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an open question of whether we should immediately release this connection capacity. If we don't, since there is no related stream to release the capacity, the connection capacity is essentially "leaked" forever. At the same time, if we locally reset this stream, but the remote just keeps sending us data anyways, giving capacity back would allow them to just keep flooding the connection with this stream we no longer want.
Perhaps if we add in the "for some time" part, by checking a time elapsed, then the eventual connection error would be a fine deterrent and allow us to free the connection window automatically.
Even without this question answered, it is still better that we count this anyway, because: it is correct that the window has been used, it is not required that we give the capacity back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should immediately release capacity. the window updates will be sequenced after the RST_STREAM frame, so the remote should not send any more data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, we won't ever increment the reset stream's window, so even if the remote keeps sending data, the window of that stream will eventually get depleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR. This is definitely a very tricky change. I added some comments inline.
Also, I didn't see how accumulated state was ever freed. Specifically, when are locally reset streams released.
The spec says "some amount of time", which is obviously tricky.
I would think that we could use one of the existing pointers to create a linked list of streams that are pending reset. The question is when to purge.
I would say that, for now, we should use an Instant
and maybe keep them around for ~30 seconds. Of course, this could cause unbounded growth and could expose a DOS vulnerability given that remotes can trigger local resets.
So, on top of keeping state around for at most 30 seconds, I would probably set a numeric limit as some factor of max concurrency. Then, once this limit is reached, we start aggressively purging to stay below the limit.
src/proto/streams/recv.rs
Outdated
// Update connection level flow control | ||
self.flow.send_data(sz); | ||
if stream.recv_flow.window_size() < sz { | ||
return Err(RecvError::Stream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is surprising to me. If this check is violated, it indicates a buggy peer. Would you mind adding a comment referencing the spec explaining why this is a stream level error?
src/proto/streams/state.rs
Outdated
} | ||
|
||
#[derive(Debug, Copy, Clone)] | ||
enum Peer { | ||
enum OpenPeer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather avoid this rename (in this commit at least, we can approach it later). Also, see below.
src/proto/streams/state.rs
Outdated
AwaitingHeaders, | ||
Streaming, | ||
} | ||
|
||
#[derive(Debug, Copy, Clone)] | ||
enum Cause { | ||
Proto(Reason), | ||
EndStream, | ||
Proto(Peer, Reason), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a Peer
variant, could we add a Cause::LocallyReset
variant. I believe that is the case that we really care about?
src/proto/streams/store.rs
Outdated
/// frames on that stream "for some time". | ||
// We could store an `Instant` here perhaps, and upon lookup, if the | ||
// elapsed time has been too long, pop it from this Map and return false. | ||
reset_streams: OrderMap<StreamId, ()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we wouldn't introduce a second map. Would it be possible to track the locally reset state as part of the stream state and have a single lookup path?
It also looks like introducing a second map doubles the number of hash lookups for all received data frames (from below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we could keep the streams around afterwards, but a Stream
is rather big in bytes, and we only care about a) the ID, b) that it was reset locally, c) how long ago. Additionally, locally resetting shouldn't be that uncommon, but hopefully receiving frames on a reset stream is less common, and so having a second map is the better situation...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@seanmonstar As of now, I wouldn't worry about the byte size of Stream
. We will be shrinking it significantly.
src/proto/streams/streams.rs
Outdated
@@ -172,9 +172,19 @@ where | |||
|
|||
let id = frame.stream_id(); | |||
|
|||
if me.store.is_reset(&id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, this doubles all hash lookups for received strams.
Also, I think that the stream window flow should still be validated? We just drop frames that the remote sent before seeing the RST_STREAM, we still want to ensure that the connection state is sane.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do keep a second smaller map, then I can do some stupid syntax crap to fix the borrow checker. Since a mutable stream is taken from the store, even the None
case can't access the store again...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding counting the stream window as well: since the stream is reset, the window should be closed.
Flow-controlled frames (i.e., DATA) received after sending RST_STREAM are counted toward the connection flow-control window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it needs to be counted against the connection flow control, but unless we keep the stream level flow state around too, we don't know if the received data was a valid size (exceeded the max size the stream would have accepted if it wasn't locally reset).
Basically, we can't differentiate between the remote peer being correct and just not having seen the RST_STREAM vs. the remote being buggy and violating flow control.
@seanmonstar Let me know if you can keep working on this or if I should take it over. I'd like to see this get over the finish line. |
21bcb0e
to
38382ea
Compare
The changes should be largely all here:
I need to expose the config in the server builder, and resolve some merge conflicts... |
- Adds config duration for how long to ignore frames on a reset stream - Adds config for how many reset streams can be held at a time
38382ea
to
066a3a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good. I left inline comments, mostly minor points.
The one thing I did see was that, as far as I can tell, when ignoring data frames due to the stream being reset, connection level capacity can be leaked.
src/client.rs
Outdated
@@ -242,6 +265,8 @@ impl Builder { | |||
impl Default for Builder { | |||
fn default() -> Builder { | |||
Builder { | |||
reset_stream_duration: Duration::from_secs(30), | |||
reset_stream_max: 10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably make this much higher... at least 100. I might also consider making it relative to the max number of streams.... maybe say 50% or something like that.
At the end of the day, though, we don't know what is the right default yet.
src/lib.rs
Outdated
@@ -1,4 +1,4 @@ | |||
#![deny(warnings, missing_debug_implementations, missing_docs)] | |||
//#![deny(warnings, missing_debug_implementations, missing_docs)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this stay enabled? I assume this line was accidentally committed.
if stream.is_closed() { | ||
stream.unlink(); | ||
if !stream.is_pending_reset_expiration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I'm following this logic (and that it is correct), but it would be super helpful to add a comment describing how transition_after
works now that it is getting pretty involved.
src/proto/streams/prioritize.rs
Outdated
@@ -530,6 +530,7 @@ impl Prioritize { | |||
trace!("pop_frame; stream={:?}", stream.id); | |||
|
|||
let is_counted = stream.is_counted(); | |||
let is_pending_reset = stream.is_pending_reset_expiration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line was confusing at first. My initial thought was that, at this point, the stream could never be pending reset expiration. However, a stream transitions to pending_reset_expiration
immediately when the client resets the stream. At this point, the RST_STREAM
frame is still queued, and this is probably the point at which we are reading it.
So, in fact, it is actually expected for is_pending_reset
to be true sometimes.
Could you add some comments describing this?
src/proto/streams/recv.rs
Outdated
// if max allow is 0, this won't be able to evict, | ||
// and then we'll just bail after | ||
if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { | ||
let is_counted = evicted.is_counted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line was confusing at first. My initial thought was that, at this point, the stream could never be "counted" as it has already been reset. However, a stream transitions to pending_reset_expiration
immediately when the client resets the stream, but the stream transitions to "not counted" once the RST_STREAM frame is sent. So, there is a period in which the stream is in the "pending reset expired" queue while it is still counted. Under a specific workload, this line could be hit while the stream is still counted.
Could you add some comments describing this?
src/proto/streams/recv.rs
Outdated
|
||
if is_ignoring_frame { | ||
trace!("recv_data frame being ignored on locally reset {:?} for some time", stream.id); | ||
return Ok(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the connection level window is consumed right above, but I don't see where it is "released". Specifically, how connection level window is not leaked in this case.
It would be nice to have a test for this.
// So, for violating the **stream** window, we can send either a | ||
// stream or connection error. We've opted to send a stream | ||
// error. | ||
return Err(RecvError::Stream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as above, we need to be very sure that if this code path is hit, the connection level window is not leaked. Again, it would be nice to have a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking around this function, it appears there's existing conditions that could cause a stream error after consuming the connection window, and in all of those cases, the window is "leaked".
To fix this, we could add a check outside this function, wherever streams.recv_data()
is being called, and if a stream error is returned, try to release the connection capacity again...
src/proto/streams/streams.rs
Outdated
@@ -172,9 +175,21 @@ where | |||
|
|||
let id = frame.stream_id(); | |||
|
|||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment block be removed?
src/proto/streams/recv.rs
Outdated
@@ -385,6 +401,7 @@ impl Recv { | |||
|
|||
if is_ignoring_frame { | |||
trace!("recv_data frame being ignored on locally reset {:?} for some time", stream.id); | |||
self.release_connection_capacity(sz, &mut None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, just to be extra clear (because, I am very tired and I got pretty confused). Could you add a comment that self.release_connection_capacity
isn't exactly the opposite of self.consume_connection_window(sz)?;
, and you still need to perform both fn calls to ensure that a WINDOW_UPDATE
gets sent out?
While working through this PR, we uncovered some additional flow control issues. Since they are unrelated to this specific change, I created #183 to track. |
PR looks good, feel free to merge whenever you are done w/ comments & all that. |
This adds a map of stream IDs that we have reset locally, since we need to remember even after we've freed the related
Stream
. Upon receiving a data frame, that map is also checked to see if the stream had been reset.When we do decide to ignore a data frame, the connection flow control is still adjusted, since the sender had to have adjusted their understanding of this connection's window also.
Closes #74
Closes #32