Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1060 maximum number of streams on the client should be capped … #1071

Merged
merged 7 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,45 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
}
}

const defaultMaxStreamsClient = 100

func TestClientExceedMaxStreamsLimit(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TestExceedDefaultMaxStreamsLimit, as opposed to the previous test TestExceedMaxStreamsLimit.

defer leakCheck(t)()
for _, e := range listTestEnv() {
testClientExceedMaxStreamsLimit(t, e)
}
}

func testClientExceedMaxStreamsLimit(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise(
"http2Client.notifyError got notified that the client transport was broken",
"Conn.resetTransport failed to create client transport",
"grpc: the connection is closing",
)
te.maxStream = 0 // Server allows infinite streams. The cap should be on client side.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add something saying that "the server won't send the settings frame for MaxConcurrentStreams"?

te.startServer(&testServer{security: e.security})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

// Create as many streams as a client can.
for i := 0; i < defaultMaxStreamsClient; i++ {
if _, err := tc.StreamingInputCall(te.ctx); err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
}
}

// Trying to create one more should timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := tc.StreamingInputCall(ctx)
if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
}
}

func TestStreamsQuotaRecovery(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
Expand Down
5 changes: 3 additions & 2 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
initialWindowSize = defaultWindowSize // for an RPC
initialConnWindowSize = defaultWindowSize * 16 // for a connection
defaultMaxStreamsClient = 100
)

// The following defines various control items which could flow through
Expand Down
64 changes: 24 additions & 40 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32,
maxStreams: defaultMaxStreamsClient,
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
streamSendQuota: defaultWindowSize,
statsHandler: opts.StatsHandler,
}
Expand Down Expand Up @@ -337,31 +338,26 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
// Returns the quota balance back.
if sq > 1 {
t.streamsQuota.add(sq - 1)
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
if _, ok := err.(StreamError); ok {
t.streamsQuota.add(1)
}
return nil, err
}
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
if checkStreamsQuota {
t.streamsQuota.add(1)
}
t.streamsQuota.add(1)
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
Expand All @@ -374,16 +370,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s

// This stream is not counted when applySetings(...) initialize t.streamsQuota.
// Reset t.streamsQuota to the right value.
var reset bool
if !checkStreamsQuota && t.streamsQuota != nil {
reset = true
}
t.mu.Unlock()
if reset {
t.streamsQuota.add(-1)
}

// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
Expand Down Expand Up @@ -491,15 +478,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
if t.streamsQuota != nil {
updateStreams = true
}
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
Expand All @@ -508,10 +491,16 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
var rstStream bool
defer func() {
if !rstStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments here to explain what rstStream means and our special handling for streams quota pool.

t.streamsQuota.add(1)
return
}
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}()
s.mu.Lock()
rstStream = s.rstStream
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
Expand All @@ -528,7 +517,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
s.state = streamDone
s.mu.Unlock()
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
rstStream = true
}
}

Expand Down Expand Up @@ -769,10 +758,10 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
s.rstStream = true
close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
s.mu.Unlock()
Expand Down Expand Up @@ -1043,16 +1032,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
reset := t.streamsQuota != nil
if !reset {
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
}
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
t.streamsQuota.add(int(s.Val) - ms)
}
t.streamsQuota.add(int(s.Val) - ms)
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
Expand Down Expand Up @@ -1085,6 +1068,7 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
t.streamsQuota.add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain a little bit about the race.

t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
Expand Down
3 changes: 3 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ type Stream struct {
// the status received from the server.
statusCode codes.Code
statusDesc string
// rstStream is a flag that is true when a RST stream frame
// is sent to the server signifying that this stream is closing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rstStream indicates whether a RSTStream needs to be sent to the server to signify that this stream is closing.

"is sent" is a bit confusing.

rstStream bool
}

// RecvCompress returns the compression algorithm applied to the inbound
Expand Down
5 changes: 4 additions & 1 deletion transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,10 @@ func TestMaxStreams(t *testing.T) {
case <-cc.streamsQuota.acquire():
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
default:
if cc.streamsQuota.quota != 0 {
cc.streamsQuota.mu.Lock()
quota := cc.streamsQuota.quota
cc.streamsQuota.mu.Unlock()
if quota != 0 {
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
}
}
Expand Down