Skip to content

Commit

Permalink
transport/http2server: wait in Close() until loopyWriter terminated
Browse files Browse the repository at this point in the history
When Server.Stop() returns it is not guaranteed that the loopyWriter
go-routine terminated.

This can lead to a panic or a testing.(*common).logDepth() race
condition in Go Tests because t.Log is used after or during the testcase
terminates.
This can happen when:
- a GRPC server is started in a Go test,
- the GRPC logs are forwarded to t.Log,
- loopyWriter.run logs an error after server.Stop() and the Test
  method returns.

grpc@v1.57.0/internal/leakcheck is unable to detect that the
loopyWriter go-routine continues to run after server.Stop() because it
waits up to 10s for go-routines to terminate after a test finishes.
The loopyWriter returns much faster after Stop() returns.

To make server.Stop() wait until loopyWriter terminated:
- rename the existing writerDone field, which is only used in tests, to
  loopyWriterDone, the writerDone channel is closed when the loopyWriter
  go-routine exited
- change http2server.Close to wait until loopyWriterDone is closed
  • Loading branch information
fho committed Jul 31, 2023
1 parent 20c51a9 commit ad96f3c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
31 changes: 17 additions & 14 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ var serverConnectionCounter uint64

// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
loopyWriterDone chan struct{}
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
Expand Down Expand Up @@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
authInfo: authInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
loopyWriterDone: make(chan struct{}),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
Expand Down Expand Up @@ -300,6 +300,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

defer func() {
if err != nil {
close(t.loopyWriterDone)
t.Close(err)
}
}()
Expand Down Expand Up @@ -339,7 +340,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
close(t.writerDone)
close(t.loopyWriterDone)
}()
go t.keepalive()
return t, nil
Expand Down Expand Up @@ -1249,6 +1250,8 @@ func (t *http2Server) Close(err error) {
connEnd := &stats.ConnEnd{}
sh.HandleConn(t.ctx, connEnd)
}

<-t.loopyWriterDone
}

// deleteStream deletes the stream s from transport's active streams.
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
client.Close(errors.New("closed manually by test"))
st.Close(errors.New("closed manually by test"))
<-st.readerDone
<-st.writerDone
<-st.loopyWriterDone
<-client.readerDone
<-client.writerDone
for _, cstream := range clientStreams {
Expand Down

0 comments on commit ad96f3c

Please sign in to comment.