Skip to content

Commit

Permalink
quic: implement stream flush
Browse files Browse the repository at this point in the history
Do not commit data written to a stream to the network until
the user explicitly flushes the stream, the stream output
buffer fills, or the output buffer contains enough data to
fill a packet.

We could write data immediately (as net.TCPConn does),
but this can require the user to put their own buffer in
front of the stream. Since we necessarily need to maintain
a retransmit buffer in the stream, this is redundant.

We could do something like Nagle's algorithm, but nobody
wants that.

So make flushes explicit.

For golang/go#58547

Change-Id: I29dc9d79556c7a358a360ef79beb38b45040b6bc
Reviewed-on: https://go-review.googlesource.com/c/net/+/543083
Auto-Submit: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
neild authored and gopherbot committed Nov 17, 2023
1 parent d87f99b commit 399218d
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 30 deletions.
4 changes: 1 addition & 3 deletions internal/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@ func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip
}
}

// The smallest allowed maximum QUIC datagram size is 1200 bytes.
// TODO: PMTU discovery.
const maxDatagramSize = 1200
c.logConnectionStarted(cids.originalDstConnID, peerAddr)
c.keysAppData.init()
c.loss.init(c.side, maxDatagramSize, now)
c.loss.init(c.side, smallestMaxDatagramSize, now)
c.streamsInit()
c.lifetimeInit()
c.restartIdleTimer(now)
Expand Down
7 changes: 7 additions & 0 deletions internal/quic/conn_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func TestConnOutflowBlocked(t *testing.T) {
if n != len(data) || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
}
s.Flush()

tc.wantFrame("stream writes data up to MAX_DATA limit",
packetType1RTT, debugFrameStream{
Expand Down Expand Up @@ -310,6 +311,7 @@ func TestConnOutflowMaxDataDecreases(t *testing.T) {
if n != len(data) || err != nil {
t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
}
s.Flush()

tc.wantFrame("stream writes data up to MAX_DATA limit",
packetType1RTT, debugFrameStream{
Expand Down Expand Up @@ -337,7 +339,9 @@ func TestConnOutflowMaxDataRoundRobin(t *testing.T) {
}

s1.Write(make([]byte, 10))
s1.Flush()
s2.Write(make([]byte, 10))
s2.Flush()

tc.writeFrames(packetType1RTT, debugFrameMaxData{
max: 1,
Expand Down Expand Up @@ -378,6 +382,7 @@ func TestConnOutflowMetaAndData(t *testing.T) {

data := makeTestData(32)
s.Write(data)
s.Flush()

s.CloseRead()
tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled",
Expand Down Expand Up @@ -405,6 +410,7 @@ func TestConnOutflowResentData(t *testing.T) {

data := makeTestData(15)
s.Write(data[:8])
s.Flush()
tc.wantFrame("data is under MAX_DATA limit, all sent",
packetType1RTT, debugFrameStream{
id: s.id,
Expand All @@ -421,6 +427,7 @@ func TestConnOutflowResentData(t *testing.T) {
})

s.Write(data[8:])
s.Flush()
tc.wantFrame("new data is sent up to the MAX_DATA limit",
packetType1RTT, debugFrameStream{
id: s.id,
Expand Down
5 changes: 4 additions & 1 deletion internal/quic/conn_loss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestLostStreamFrameEmpty(t *testing.T) {
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
c.Flush() // open the stream
tc.wantFrame("created bidirectional stream 0",
packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, bidiStream, 0),
Expand Down Expand Up @@ -213,13 +213,15 @@ func TestLostStreamWithData(t *testing.T) {
p.initialMaxStreamDataUni = 1 << 20
})
s.Write(data[:4])
s.Flush()
tc.wantFrame("send [0,4)",
packetType1RTT, debugFrameStream{
id: s.id,
off: 0,
data: data[:4],
})
s.Write(data[4:8])
s.Flush()
tc.wantFrame("send [4,8)",
packetType1RTT, debugFrameStream{
id: s.id,
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestLostStreamPartialLoss(t *testing.T) {
})
for i := range data {
s.Write(data[i : i+1])
s.Flush()
tc.wantFrame(fmt.Sprintf("send STREAM frame with byte %v", i),
packetType1RTT, debugFrameStream{
id: s.id,
Expand Down
16 changes: 8 additions & 8 deletions internal/quic/conn_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,33 @@ func TestStreamsCreate(t *testing.T) {
tc := newTestConn(t, clientSide, permissiveTransportParameters)
tc.handshake()

c, err := tc.conn.NewStream(ctx)
s, err := tc.conn.NewStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created bidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 0, // client-initiated, bidi, number 0
data: []byte{},
})

c, err = tc.conn.NewSendOnlyStream(ctx)
s, err = tc.conn.NewSendOnlyStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created unidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 2, // client-initiated, uni, number 0
data: []byte{},
})

c, err = tc.conn.NewStream(ctx)
s, err = tc.conn.NewStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created bidirectional stream 1",
packetType1RTT, debugFrameStream{
id: 4, // client-initiated, uni, number 4
Expand Down Expand Up @@ -177,11 +177,11 @@ func TestStreamsStreamSendOnly(t *testing.T) {
tc := newTestConn(t, serverSide, permissiveTransportParameters)
tc.handshake()

c, err := tc.conn.NewSendOnlyStream(ctx)
s, err := tc.conn.NewSendOnlyStream(ctx)
if err != nil {
t.Fatalf("NewStream: %v", err)
}
c.Write(nil) // open the stream
s.Flush() // open the stream
tc.wantFrame("created unidirectional stream 0",
packetType1RTT, debugFrameStream{
id: 3, // server-initiated, uni, number 0
Expand Down
6 changes: 5 additions & 1 deletion internal/quic/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ const defaultKeepAlivePeriod = 0
// https://www.rfc-editor.org/rfc/rfc9002.html#section-6.1.2-6
const timerGranularity = 1 * time.Millisecond

// The smallest allowed maximum datagram size.
// https://www.rfc-editor.org/rfc/rfc9000#section-14
const smallestMaxDatagramSize = 1200

// Minimum size of a UDP datagram sent by a client carrying an Initial packet,
// or a server containing an ack-eliciting Initial packet.
// https://www.rfc-editor.org/rfc/rfc9000#section-14.1
const paddedInitialDatagramSize = 1200
const paddedInitialDatagramSize = smallestMaxDatagramSize

// Maximum number of streams of a given type which may be created.
// https://www.rfc-editor.org/rfc/rfc9000.html#section-4.6-2
Expand Down
55 changes: 41 additions & 14 deletions internal/quic/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ type Stream struct {
// the write will fail.
outgate gate
out pipe // buffered data to send
outflushed int64 // offset of last flush call
outwin int64 // maximum MAX_STREAM_DATA received from the peer
outmaxsent int64 // maximum data offset we've sent to the peer
outmaxbuf int64 // maximum amount of data we will buffer
outunsent rangeset[int64] // ranges buffered but not yet sent
outunsent rangeset[int64] // ranges buffered but not yet sent (only flushed data)
outacked rangeset[int64] // ranges sent and acknowledged
outopened sentVal // set if we should open the stream
outclosed sentVal // set by CloseWrite
Expand Down Expand Up @@ -240,8 +241,6 @@ func (s *Stream) Write(b []byte) (n int, err error) {
// WriteContext writes data to the stream write buffer.
// Buffered data is only sent when the buffer is sufficiently full.
// Call the Flush method to ensure buffered data is sent.
//
// TODO: Implement Flush.
func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
if s.IsReadOnly() {
return 0, errors.New("write to read-only stream")
Expand Down Expand Up @@ -269,10 +268,6 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
s.outUnlock()
return n, errors.New("write to closed stream")
}
// We set outopened here rather than below,
// so if this is a zero-length write we still
// open the stream despite not writing any data to it.
s.outopened.set()
if len(b) == 0 {
break
}
Expand All @@ -282,13 +277,26 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
// Amount to write is min(the full buffer, data up to the write limit).
// This is a number of bytes.
nn := min(int64(len(b)), lim-s.out.end)
// Copy the data into the output buffer and mark it as unsent.
if s.out.end <= s.outwin {
s.outunsent.add(s.out.end, min(s.out.end+nn, s.outwin))
}
// Copy the data into the output buffer.
s.out.writeAt(b[:nn], s.out.end)
b = b[nn:]
n += int(nn)
// Possibly flush the output buffer.
// We automatically flush if:
// - We have enough data to consume the send window.
// Sending this data may cause the peer to extend the window.
// - We have buffered as much data as we're willing do.
// We need to send data to clear out buffer space.
// - We have enough data to fill a 1-RTT packet using the smallest
// possible maximum datagram size (1200 bytes, less header byte,
// connection ID, packet number, and AEAD overhead).
const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
shouldFlush := s.out.end >= s.outwin || // peer send window is full
s.out.end >= lim || // local send buffer is full
(s.out.end-s.outflushed) >= autoFlushSize // enough data buffered
if shouldFlush {
s.flushLocked()
}
if s.out.end > s.outwin {
// We're blocked by flow control.
// Send a STREAM_DATA_BLOCKED frame to let the peer know.
Expand All @@ -301,6 +309,23 @@ func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error)
return n, nil
}

// Flush flushes data written to the stream.
// It does not wait for the peer to acknowledge receipt of the data.
// Use CloseContext to wait for the peer's acknowledgement.
func (s *Stream) Flush() {
s.outgate.lock()
defer s.outUnlock()
s.flushLocked()
}

func (s *Stream) flushLocked() {
s.outopened.set()
if s.outflushed < s.outwin {
s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
}
s.outflushed = s.out.end
}

// Close closes the stream.
// See CloseContext for more details.
func (s *Stream) Close() error {
Expand Down Expand Up @@ -363,6 +388,7 @@ func (s *Stream) CloseWrite() {
s.outgate.lock()
defer s.outUnlock()
s.outclosed.set()
s.flushLocked()
}

// Reset aborts writes on the stream and notifies the peer
Expand Down Expand Up @@ -612,8 +638,8 @@ func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
if maxStreamData <= s.outwin {
return nil
}
if s.out.end > s.outwin {
s.outunsent.add(s.outwin, min(maxStreamData, s.out.end))
if s.outflushed > s.outwin {
s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
}
s.outwin = maxStreamData
if s.out.end > s.outwin {
Expand Down Expand Up @@ -741,10 +767,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b
}
for {
// STREAM
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
if end := off + size; end > s.outmaxsent {
// This will require connection-level flow control to send.
end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
end = max(end, off)
size = end - off
}
fin := s.outclosed.isSet() && off+size == s.out.end
Expand Down
Loading

0 comments on commit 399218d

Please sign in to comment.