diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go index d5ee74ebda..03e0757a6d 100644 --- a/internal/quic/conn_flow_test.go +++ b/internal/quic/conn_flow_test.go @@ -394,3 +394,37 @@ func TestConnOutflowMetaAndData(t *testing.T) { data: data, }) } + +func TestConnOutflowResentData(t *testing.T) { + tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream, + permissiveTransportParameters, + func(p *transportParameters) { + p.initialMaxData = 10 + }) + tc.ignoreFrame(frameTypeAck) + + data := makeTestData(15) + s.Write(data[:8]) + tc.wantFrame("data is under MAX_DATA limit, all sent", + packetType1RTT, debugFrameStream{ + id: s.id, + data: data[:8], + }) + + // Lose the last STREAM packet. + const pto = false + tc.triggerLossOrPTO(packetType1RTT, false) + tc.wantFrame("lost STREAM data is retransmitted", + packetType1RTT, debugFrameStream{ + id: s.id, + data: data[:8], + }) + + s.Write(data[8:]) + tc.wantFrame("new data is sent up to the MAX_DATA limit", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 8, + data: data[8:10], + }) +} diff --git a/internal/quic/stream.go b/internal/quic/stream.go index 9310811c1b..89036b19b6 100644 --- a/internal/quic/stream.go +++ b/internal/quic/stream.go @@ -39,6 +39,7 @@ type Stream struct { outgate gate out pipe // buffered data to send outwin int64 // maximum MAX_STREAM_DATA received from the peer + outmaxsent int64 // maximum data offset we've sent to the peer outmaxbuf int64 // maximum amount of data we will buffer outunsent rangeset[int64] // ranges buffered but not yet sent outacked rangeset[int64] // ranges sent and acknowledged @@ -494,8 +495,12 @@ func (s *Stream) outUnlockNoQueue() streamState { case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED state = streamOutSendMeta case len(s.outunsent) > 0: // STREAM frame with data - state = streamOutSendData - case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent + if s.outunsent.min() < s.outmaxsent { + state = streamOutSendMeta // resent data, will not consume flow control + } else { + state = streamOutSendData // new data, requires flow control + } + case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit state = streamOutSendMeta case s.outopened.shouldSend(): // STREAM frame with no data state = streamOutSendMeta @@ -725,7 +730,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b for { // STREAM off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto) - size = min(size, s.conn.streams.outflow.avail()) + if end := off + size; end > s.outmaxsent { + // This will require connection-level flow control to send. + end = min(end, s.outmaxsent+s.conn.streams.outflow.avail()) + size = end - off + } fin := s.outclosed.isSet() && off+size == s.out.end shouldSend := size > 0 || // have data to send s.outopened.shouldSendPTO(pto) || // should open the stream @@ -738,8 +747,12 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b return false } s.out.copy(off, b) - s.conn.streams.outflow.consume(int64(len(b))) - s.outunsent.sub(off, off+int64(len(b))) + end := off + int64(len(b)) + if end > s.outmaxsent { + s.conn.streams.outflow.consume(end - s.outmaxsent) + s.outmaxsent = end + } + s.outunsent.sub(off, end) s.frameOpensStream(pnum) if fin { s.outclosed.setSent(pnum) diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go index 86eebc6989..7c1377faee 100644 --- a/internal/quic/stream_test.go +++ b/internal/quic/stream_test.go @@ -1094,6 +1094,44 @@ func TestStreamCloseUnblocked(t *testing.T) { } } +func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) { + ctx := canceledContext() + tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters, + func(p *transportParameters) { + //p.initialMaxData = 0 + p.initialMaxStreamDataUni = 0 + }) + tc.ignoreFrame(frameTypeStreamDataBlocked) + if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil { + t.Fatalf("s.Write = %v", err) + } + s.CloseWrite() + tc.wantIdle("stream write is blocked by flow control") + + tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{ + id: s.id, + max: 1, + }) + tc.wantFrame("send data up to flow control limit", + packetType1RTT, debugFrameStream{ + id: s.id, + data: []byte{0}, + }) + tc.wantIdle("stream write is again blocked by flow control") + + tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{ + id: s.id, + max: 2, + }) + tc.wantFrame("send remaining data and FIN", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 1, + data: []byte{1}, + fin: true, + }) +} + func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) { testStreamTypes(t, "", func(t *testing.T, styp streamType) { ctx := canceledContext()