Skip to content

Commit

Permalink
quic: fast path for stream reads
Browse files Browse the repository at this point in the history
Keep a reference to the next chunk of bytes available for reading
in an unsynchronized buffer. Read and ReadByte calls read from this
buffer when possible, avoiding the need to lock the stream.

This change makes it unnecessary to wrap a stream in a *bytes.Buffer
when making small reads, at the expense of making reads
concurrency-unsafe. Since the quic package is a low-level one and
this lets us avoid an extra buffer in the HTTP/3 implementation,
the tradeoff seems worthwhile.

For golang/go#58547

Change-Id: Ib3ca446311974571c2367295b302f36a6349b00d
Reviewed-on: https://go-review.googlesource.com/c/net/+/564495
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
  • Loading branch information
neild committed Feb 21, 2024
1 parent cc568ea commit 08d27e3
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 49 deletions.
52 changes: 26 additions & 26 deletions internal/quic/conn_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,29 @@ func TestConnInflowReturnOnRead(t *testing.T) {
})
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
data: make([]byte, 64),
data: make([]byte, 8),
})
const readSize = 8
if n, err := s.Read(make([]byte, readSize)); n != readSize || err != nil {
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize)
}
tc.wantFrame("available window increases, send a MAX_DATA",
packetType1RTT, debugFrameMaxData{
max: 64 + readSize,
})
if n, err := s.Read(make([]byte, 64)); n != 64-readSize || err != nil {
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize)
if n, err := s.Read(make([]byte, 8)); n != 8 || err != nil {
t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 8)
}
tc.wantFrame("available window increases, send a MAX_DATA",
packetType1RTT, debugFrameMaxData{
max: 128,
max: 64 + 8,
})
// Peer can write up to the new limit.
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 64,
off: 8,
data: make([]byte, 64),
})
tc.wantIdle("connection is idle")
if n, err := s.Read(make([]byte, 64)); n != 64 || err != nil {
t.Fatalf("offset 64: s.Read() = %v, %v; want %v, nil", n, err, 64)
if n, err := s.Read(make([]byte, 64+1)); n != 64 {
t.Fatalf("s.Read() = %v, %v; want %v, anything", n, err, 64)
}
tc.wantFrame("available window increases, send a MAX_DATA",
packetType1RTT, debugFrameMaxData{
max: 64 + 8 + 64,
})
tc.wantIdle("connection is idle")
}

func TestConnInflowReturnOnRacingReads(t *testing.T) {
Expand All @@ -63,11 +59,11 @@ func TestConnInflowReturnOnRacingReads(t *testing.T) {
tc.ignoreFrame(frameTypeAck)
tc.writeFrames(packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, uniStream, 0),
data: make([]byte, 32),
data: make([]byte, 16),
})
tc.writeFrames(packetType1RTT, debugFrameStream{
id: newStreamID(clientSide, uniStream, 1),
data: make([]byte, 32),
data: make([]byte, 1),
})
s1, err := tc.conn.AcceptStream(ctx)
if err != nil {
Expand Down Expand Up @@ -203,7 +199,6 @@ func TestConnInflowResetViolation(t *testing.T) {
}

func TestConnInflowMultipleStreams(t *testing.T) {
ctx := canceledContext()
tc := newTestConn(t, serverSide, func(c *Config) {
c.MaxConnReadBufferSize = 128
})
Expand All @@ -219,21 +214,26 @@ func TestConnInflowMultipleStreams(t *testing.T) {
} {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: id,
data: make([]byte, 32),
data: make([]byte, 1),
})
s, err := tc.conn.AcceptStream(ctx)
if err != nil {
t.Fatalf("AcceptStream() = %v", err)
}
s := tc.acceptStream()
streams = append(streams, s)
if n, err := s.Read(make([]byte, 1)); err != nil || n != 1 {
t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err)
}
}
tc.wantIdle("streams have read data, but not enough to update MAX_DATA")

if n, err := streams[0].Read(make([]byte, 32)); err != nil || n != 31 {
t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err)
for _, s := range streams {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 1,
data: make([]byte, 31),
})
}

if n, err := streams[0].Read(make([]byte, 32)); n != 31 {
t.Fatalf("s.Read() = %v, %v; want 31, anything", n, err)
}
tc.wantFrame("read enough data to trigger a MAX_DATA update",
packetType1RTT, debugFrameMaxData{
Expand Down
20 changes: 15 additions & 5 deletions internal/quic/conn_loss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ func TestLostMaxDataFrame(t *testing.T) {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 0,
data: make([]byte, maxWindowSize),
data: make([]byte, maxWindowSize-1),
})
if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
if n, err := s.Read(buf[:maxWindowSize]); err != nil || n != maxWindowSize-1 {
t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
}
tc.wantFrame("conn window is extended after reading data",
Expand All @@ -319,7 +319,12 @@ func TestLostMaxDataFrame(t *testing.T) {
})

// MAX_DATA = 64, which is only one more byte, so we don't send the frame.
if n, err := s.Read(buf); err != nil || n != 1 {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: maxWindowSize - 1,
data: make([]byte, 1),
})
if n, err := s.Read(buf[:1]); err != nil || n != 1 {
t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
}
tc.wantIdle("read doesn't extend window enough to send another MAX_DATA")
Expand Down Expand Up @@ -348,9 +353,9 @@ func TestLostMaxStreamDataFrame(t *testing.T) {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 0,
data: make([]byte, maxWindowSize),
data: make([]byte, maxWindowSize-1),
})
if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
if n, err := s.Read(buf[:maxWindowSize]); err != nil || n != maxWindowSize-1 {
t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
}
tc.wantFrame("stream window is extended after reading data",
Expand All @@ -360,6 +365,11 @@ func TestLostMaxStreamDataFrame(t *testing.T) {
})

// MAX_STREAM_DATA = 64, which is only one more byte, so we don't send the frame.
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: maxWindowSize - 1,
data: make([]byte, 1),
})
if n, err := s.Read(buf); err != nil || n != 1 {
t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
}
Expand Down
23 changes: 18 additions & 5 deletions internal/quic/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
// Writing past the end of the window extends it.
// Data may be discarded from the start of the pipe, advancing the window.
type pipe struct {
start int64
end int64
head *pipebuf
tail *pipebuf
start int64 // stream position of first stored byte
end int64 // stream position just past the last stored byte
head *pipebuf // if non-nil, then head.off + len(head.b) > start
tail *pipebuf // if non-nil, then tail.off + len(tail.b) == end
}

type pipebuf struct {
off int64
off int64 // stream position of b[0]
b []byte
next *pipebuf
}
Expand Down Expand Up @@ -111,6 +111,7 @@ func (p *pipe) copy(off int64, b []byte) {

// read calls f with the data in [off, off+n)
// The data may be provided sequentially across multiple calls to f.
// Note that read (unlike an io.Reader) does not consume the read data.
func (p *pipe) read(off int64, n int, f func([]byte) error) error {
if off < p.start {
panic("invalid read range")
Expand All @@ -135,6 +136,18 @@ func (p *pipe) read(off int64, n int, f func([]byte) error) error {
return nil
}

// peek returns a reference to up to n bytes of internal data buffer, starting at p.start.
// The returned slice is valid until the next call to discardBefore.
// The length of the returned slice will be in the range [0,n].
func (p *pipe) peek(n int64) []byte {
pb := p.head
if pb == nil {
return nil
}
b := pb.b[p.start-pb.off:]
return b[:min(int64(len(b)), n)]
}

// discardBefore discards all data prior to off.
func (p *pipe) discardBefore(off int64) {
for p.head != nil && p.head.end() < off {
Expand Down
67 changes: 56 additions & 11 deletions internal/quic/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Stream struct {
inctx context.Context
outctx context.Context

// ingate's lock guards all receive-related state.
// ingate's lock guards receive-related state.
//
// The gate condition is set if a read from the stream will not block,
// either because the stream has available data or because the read will fail.
Expand All @@ -37,7 +37,7 @@ type Stream struct {
inclosed sentVal // set by CloseRead
inresetcode int64 // RESET_STREAM code received from the peer; -1 if not reset

// outgate's lock guards all send-related state.
// outgate's lock guards send-related state.
//
// The gate condition is set if a write to the stream will not block,
// either because the stream has available flow control or because
Expand All @@ -57,6 +57,10 @@ type Stream struct {
outresetcode uint64 // reset code to send in RESET_STREAM
outdone chan struct{} // closed when all data sent

// Unsynchronized buffers, used for lock-free fast path.
inbuf []byte // received data
inbufoff int // bytes of inbuf which have been consumed

// Atomic stream state bits.
//
// These bits provide a fast way to coordinate between the
Expand Down Expand Up @@ -202,16 +206,35 @@ func (s *Stream) IsWriteOnly() bool {
// returning all data sent by the peer.
// If the peer aborts reads on the stream, Read returns
// an error wrapping StreamResetCode.
//
// It is not safe to call Read concurrently.
func (s *Stream) Read(b []byte) (n int, err error) {
if s.IsWriteOnly() {
return 0, errors.New("read from write-only stream")
}
if len(s.inbuf) > s.inbufoff {
// Fast path: If s.inbuf contains unread bytes, return them immediately
// without taking a lock.
n = copy(b, s.inbuf[s.inbufoff:])
s.inbufoff += n
return n, nil
}
if err := s.ingate.waitAndLock(s.inctx, s.conn.testHooks); err != nil {
return 0, err
}
if s.inbufoff > 0 {
// Discard bytes consumed by the fast path above.
s.in.discardBefore(s.in.start + int64(s.inbufoff))
s.inbufoff = 0
s.inbuf = nil
}
// bytesRead contains the number of bytes of connection-level flow control to return.
// We return flow control for bytes read by this Read call, as well as bytes moved
// to the fast-path read buffer (s.inbuf).
var bytesRead int64
defer func() {
s.inUnlock()
s.conn.handleStreamBytesReadOffLoop(int64(n)) // must be done with ingate unlocked
s.conn.handleStreamBytesReadOffLoop(bytesRead) // must be done with ingate unlocked
}()
if s.inresetcode != -1 {
return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
Expand All @@ -229,27 +252,48 @@ func (s *Stream) Read(b []byte) (n int, err error) {
if size := int(s.inset[0].end - s.in.start); size < len(b) {
b = b[:size]
}
bytesRead = int64(len(b))
start := s.in.start
end := start + int64(len(b))
s.in.copy(start, b)
s.in.discardBefore(end)
if end == s.insize {
// We have read up to the end of the stream.
// No need to update stream flow control.
return len(b), io.EOF
}
if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start {
// If we have more readable bytes available, put the next chunk of data
// in s.inbuf for lock-free reads.
s.inbuf = s.in.peek(s.inset[0].end - s.in.start)
bytesRead += int64(len(s.inbuf))
}
if s.insize == -1 || s.insize > s.inwin {
if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf
addedWindow := newWindow - s.inwin
if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) {
// Update stream flow control with a STREAM_MAX_DATA frame.
s.insendmax.setUnsent()
}
}
if end == s.insize {
return len(b), io.EOF
}
return len(b), nil
}

// ReadByte reads and returns a single byte from the stream.
//
// It is not safe to call ReadByte concurrently.
func (s *Stream) ReadByte() (byte, error) {
if len(s.inbuf) > s.inbufoff {
b := s.inbuf[s.inbufoff]
s.inbufoff++
return b, nil
}
var b [1]byte
_, err := s.Read(b[:])
return b[0], err
n, err := s.Read(b[:])
if n > 0 {
return b[0], err
}
return 0, err
}

// shouldUpdateFlowControl determines whether to send a flow control window update.
Expand Down Expand Up @@ -507,8 +551,9 @@ func (s *Stream) inUnlock() {
// inUnlockNoQueue is inUnlock,
// but reports whether s has frames to write rather than notifying the Conn.
func (s *Stream) inUnlockNoQueue() streamState {
canRead := s.inset.contains(s.in.start) || // data available to read
s.insize == s.in.start || // at EOF
nextByte := s.in.start + int64(len(s.inbuf))
canRead := s.inset.contains(nextByte) || // data available to read
s.insize == s.in.start+int64(len(s.inbuf)) || // at EOF
s.inresetcode != -1 || // reset by peer
s.inclosed.isSet() // closed locally
defer s.ingate.unlock(canRead)
Expand Down
30 changes: 28 additions & 2 deletions internal/quic/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,32 @@ func TestStreamReceiveDuplicateDataDoesNotViolateLimits(t *testing.T) {
})
}

func TestStreamReceiveEmptyEOF(t *testing.T) {
// A stream receives some data, we read a byte of that data
// (causing the rest to be pulled into the s.inbuf buffer),
// and then we receive a FIN with no additional data.
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
tc, s := newTestConnAndRemoteStream(t, serverSide, styp, permissiveTransportParameters)
want := []byte{1, 2, 3}
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
data: want,
})
if got, err := s.ReadByte(); got != want[0] || err != nil {
t.Fatalf("s.ReadByte() = %v, %v; want %v, nil", got, err, want[0])
}

tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 3,
fin: true,
})
if got, err := io.ReadAll(s); !bytes.Equal(got, want[1:]) || err != nil {
t.Fatalf("io.ReadAll(s) = {%x}, %v; want {%x}, nil", got, err, want[1:])
}
})
}

func finalSizeTest(t *testing.T, wantErr transportError, f func(tc *testConn, sid streamID) (finalSize int64), opts ...any) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
for _, test := range []struct {
Expand Down Expand Up @@ -1156,8 +1182,8 @@ func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
code: sentCode,
})
wantErr := StreamErrorCode(sentCode)
if n, err := s.Read(got); n != 0 || !errors.Is(err, wantErr) {
t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
if _, err := io.ReadAll(s); !errors.Is(err, wantErr) {
t.Fatalf("Read reset stream: ReadAll got error %v; want %v", err, wantErr)
}
})
}
Expand Down

0 comments on commit 08d27e3

Please sign in to comment.