Skip to content

Commit

Permalink
transport: Fix deadlock in transport caused by GOAWAY race with new s…
Browse files Browse the repository at this point in the history
…tream creation (#5652)

* transport: Fix deadlock in transport caused by GOAWAY race with new stream creation
  • Loading branch information
zasweq authored Sep 21, 2022
1 parent 9c3e589 commit b1d7f56
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 7 deletions.
23 changes: 17 additions & 6 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,18 +1232,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}

t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}

streamsToClose := make([]*Stream, 0)
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
// Called outside t.mu because closeStream can take controlBuf's mu, which
// could induce deadlock and is not allowed.
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}

Expand Down
109 changes: 109 additions & 0 deletions test/clienttester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test

import (
"bytes"
"io"
"net"
"testing"

"golang.org/x/net/http2"
)

var (
clientPreface = []byte(http2.ClientPreface)
)

func newClientTester(t *testing.T, conn net.Conn) *clientTester {
ct := &clientTester{
t: t,
conn: conn,
}
ct.fr = http2.NewFramer(conn, conn)
ct.greet()
return ct
}

type clientTester struct {
t *testing.T
conn net.Conn
fr *http2.Framer
}

// greet() performs the necessary steps for http2 connection establishment on
// the server side.
func (ct *clientTester) greet() {
ct.wantClientPreface()
ct.wantSettingsFrame()
ct.writeSettingsFrame()
ct.writeSettingsAck()

for {
f, err := ct.fr.ReadFrame()
if err != nil {
ct.t.Errorf("error reading frame from client side: %v", err)
}
switch f := f.(type) {
case *http2.SettingsFrame:
if f.IsAck() { // HTTP/2 handshake completed.
return
}
default:
ct.t.Errorf("during greet, unexpected frame type %T", f)
}
}
}

func (ct *clientTester) wantClientPreface() {
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(ct.conn, preface); err != nil {
ct.t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
}
if !bytes.Equal(preface, clientPreface) {
ct.t.Errorf("received bogus greeting from client %q", preface)
}
}

func (ct *clientTester) wantSettingsFrame() {
frame, err := ct.fr.ReadFrame()
if err != nil {
ct.t.Errorf("error reading initial settings frame from client: %v", err)
}
_, ok := frame.(*http2.SettingsFrame)
if !ok {
ct.t.Errorf("initial frame sent from client is not a settings frame, type %T", frame)
}
}

func (ct *clientTester) writeSettingsFrame() {
if err := ct.fr.WriteSettings(); err != nil {
ct.t.Fatalf("Error writing initial SETTINGS frame from client to server: %v", err)
}
}

func (ct *clientTester) writeSettingsAck() {
if err := ct.fr.WriteSettingsAck(); err != nil {
ct.t.Fatalf("Error writing ACK of client's SETTINGS: %v", err)
}
}

func (ct *clientTester) writeGoAway(maxStreamID uint32, code http2.ErrCode, debugData []byte) {
if err := ct.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
ct.t.Fatalf("Error writing GOAWAY: %v", err)
}
}
56 changes: 55 additions & 1 deletion test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7407,7 +7407,6 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) {
return
}
writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.

var sid uint32
// Loop until conn is closed and framer returns io.EOF
for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) {
Expand Down Expand Up @@ -8130,3 +8129,58 @@ func (s) TestRecvWhileReturningStatus(t *testing.T) {
}
}
}

// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
// sends a goaway with a stream id that is smaller than some created streams on
// the client, while the client is simultaneously creating new streams. This
// should not induce a deadlock.
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("error listening: %v", err)
}

ctCh := testutils.NewChannel()
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("error in lis.Accept(): %v", err)
}
ct := newClientTester(t, conn)
ctCh.Send(ct)
}()

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("error dialing: %v", err)
}
defer cc.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

val, err := ctCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
}
ct := val.(*clientTester)

tc := testpb.NewTestServiceClient(cc)
someStreamsCreated := grpcsync.NewEvent()
goAwayWritten := grpcsync.NewEvent()
go func() {
for i := 0; i < 20; i++ {
if i == 10 {
<-goAwayWritten.Done()
}
tc.FullDuplexCall(ctx)
if i == 4 {
someStreamsCreated.Fire()
}
}
}()

<-someStreamsCreated.Done()
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
goAwayWritten.Fire()
}

0 comments on commit b1d7f56

Please sign in to comment.