Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: Wait until resources finish cleaning up in Stop() and GracefulStop() #6489

Merged
merged 31 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ad96f3c
transport/http2server: wait in Close() until loopyWriter terminated
fho Jul 31, 2023
26fd669
Revert "transport/http2server: wait in Close() until loopyWriter term…
fho Aug 7, 2023
cb7cfa7
server: Wait in GracefulStop and Stop until loopyWriter goroutine exits
fho Aug 8, 2023
7eb46c0
server: remove stream handler waitgroup
fho Aug 11, 2023
5c056bd
Merge remote-tracking branch 'upstream/master' into wait_for_loopy_ro…
fho Aug 11, 2023
60700e0
Merge remote-tracking branch 'upstream/master' into wait_for_loopy_ro…
fho Aug 28, 2023
f0250c2
http2_server: Merge HandleStreams defer statements into one
fho Aug 28, 2023
e73ad2c
server: do not abort GracefulStop() when Stop() is called
fho Aug 28, 2023
17fd57e
server: remove obsolete s.cv.Broadcast() call
fho Aug 28, 2023
62a55c3
server: simplify conn removal loop in GracefulStop
fho Aug 28, 2023
6143cf7
server: stopServerWorkers also in GracefulStop
fho Aug 28, 2023
8a4ebe0
server: consolidate shared Stop and GracefulStop code
fho Aug 28, 2023
8bba089
Merge remote-tracking branch 'upstream/master' into wait_for_loopy_ro…
fho Sep 7, 2023
ecd6289
Merge remote-tracking branch 'upstream/master' into wait_for_loopy_ro…
fho Oct 20, 2023
c0b97bb
server: close listeners and wait for serveWG threads before draining
fho Oct 20, 2023
b55bee0
Revert "server: close listeners and wait for serveWG threads before d…
fho Oct 20, 2023
4b1e3eb
move shared Stop() and GracefulStop() code into stop(graceful)
fho Oct 20, 2023
acc0243
server: fix: stop sequence
fho Oct 26, 2023
8a58769
Merge remote-tracking branch 'upstream/master' into wait_for_loopy_ro…
fho Oct 26, 2023
e85749a
tests: add server.Stop() and server.GracefulStop testcases
fho Oct 20, 2023
11254c2
Delete internal/transport/http2_server.go.orig
fho Oct 26, 2023
ecfd5d1
tests: lower sleep timeout from 1s to 100ms and use time.After
fho Oct 26, 2023
3feef89
fixup! tests: lower sleep timeout from 1s to 100ms and use time.After
fho Oct 27, 2023
c03201f
fixup! tests: add server.Stop() and server.GracefulStop testcases
fho Oct 27, 2023
e7b4971
fixup! tests: add server.Stop() and server.GracefulStop testcases
fho Oct 27, 2023
cb549f2
replace "grpc call" with rpc in docs and error messages
fho Oct 30, 2023
8354300
fixup! replace "grpc call" with rpc in docs and error messages
fho Oct 30, 2023
8bc0f7b
tests: convert in-code comments to godoc method comments
fho Oct 30, 2023
d976997
server: make waitForServerConnRemoval lock comment part of godoc
fho Oct 30, 2023
702ac45
server: inline waitForServerConnRemoval, finishEventLog
fho Oct 30, 2023
66b367c
server: add Locked suffix to functions requiring caller to acquire lock
fho Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ var serverConnectionCounter uint64

// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
writerDone chan struct{} // sync point to enable testing.
peer peer.Peer
inTapHandle tap.ServerInHandle
framer *framer
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
loopyWriterDone chan struct{}
peer peer.Peer
inTapHandle tap.ServerInHandle
framer *framer
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
Expand Down Expand Up @@ -251,7 +251,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
peer: peer,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
loopyWriterDone: make(chan struct{}),
maxStreams: config.MaxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
Expand Down Expand Up @@ -323,7 +323,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
close(t.writerDone)
close(t.loopyWriterDone)
}()
go t.keepalive()
return t, nil
Expand Down Expand Up @@ -608,7 +608,10 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer close(t.readerDone)
defer func() {
<-t.loopyWriterDone
close(t.readerDone)
}()
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
client.Close(errors.New("closed manually by test"))
st.Close(errors.New("closed manually by test"))
<-st.readerDone
<-st.writerDone
<-st.loopyWriterDone
<-client.readerDone
<-client.writerDone
for _, cstream := range clientStreams {
Expand Down
109 changes: 54 additions & 55 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,15 +992,11 @@
}
}()

var wg sync.WaitGroup
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(ctx, func(stream *transport.Stream) {
wg.Add(1)

streamQuota.acquire()
f := func() {
defer streamQuota.release()
defer wg.Done()
s.handleStream(st, stream)
}

Expand All @@ -1014,7 +1010,6 @@
}
go f()
})
wg.Wait()
}

var _ http.Handler = (*Server)(nil)
Expand Down Expand Up @@ -1854,62 +1849,66 @@
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
s.quit.Fire()
s.stop(false)
}

defer func() {
s.serveWG.Wait()
s.done.Fire()
}()
// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
s.stop(true)
}

func (s *Server) stop(graceful bool) {
s.quit.Fire()
defer s.done.Fire()

s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })

s.mu.Lock()
listeners := s.lis
s.lis = nil
conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
s.closeListeners()
// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()

for lis := range listeners {
lis.Close()
}
for _, cs := range conns {
for st := range cs {
st.Close(errors.New("Server.Stop called"))
}
s.mu.Lock()
defer s.mu.Unlock()

if graceful {
s.drainAllServerTransports()
} else {
s.closeServerTransports()
}

if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
}

s.mu.Lock()
if s.events != nil {
s.events.Finish()
s.events = nil
}
s.mu.Unlock()
s.waitForServerConnRemoval()
s.finishEventLog()
fho marked this conversation as resolved.
Show resolved Hide resolved
}

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
s.quit.Fire()
defer s.done.Fire()

s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
return
// s.mu must be held by the caller.
// waitForServerConnRemoval blocks until s.conns is empty.
fho marked this conversation as resolved.
Show resolved Hide resolved
func (s *Server) waitForServerConnRemoval() {
for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil
}

for lis := range s.lis {
lis.Close()
// s.mu must be held by the caller.
func (s *Server) closeServerTransports() {
fho marked this conversation as resolved.
Show resolved Hide resolved
for _, conns := range s.conns {
for st := range conns {
st.Close(errors.New("Server.Stop called"))
}
}
s.lis = nil
}

// s.mu must be held by the caller.
func (s *Server) drainAllServerTransports() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's keep the names consistent between this and above. Maybe name this one drainServerTransports

Copy link
Contributor Author

@fho fho Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server.drainServerTransports already exist, that is why I added All to the name.
Currently I don't have a better naming idea.

fho marked this conversation as resolved.
Show resolved Hide resolved
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
Expand All @@ -1918,22 +1917,22 @@
}
s.drain = true
}
}

// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()
s.mu.Lock()

for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil
// s.mu must be held by the caller.
func (s *Server) finishEventLog() {
fho marked this conversation as resolved.
Show resolved Hide resolved
if s.events != nil {
s.events.Finish()
s.events = nil
}
s.mu.Unlock()
}

// s.mu must be held by the caller.
func (s *Server) closeListeners() {
fho marked this conversation as resolved.
Show resolved Hide resolved
for lis := range s.lis {

Check warning on line 1932 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L1930-L1932

Added lines #L1930 - L1932 were not covered by tests
lis.Close()
}
s.lis = nil
}

// contentSubtype must be lowercase
Expand Down
87 changes: 87 additions & 0 deletions test/gracefulstop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"sync"
"testing"
"time"

"golang.org/x/net/http2"
"google.golang.org/grpc"
Expand Down Expand Up @@ -216,3 +217,89 @@ func (s) TestGracefulStopClosesConnAfterLastStream(t *testing.T) {
<-gracefulStopDone // Wait for GracefulStop to return.
})
}

func (s) TestGracefulStopBlocksUntilGRPCConnectionsTerminate(t *testing.T) {
// This tests ensures that GracefulStop() blocks until all ongoing
// client GRPC calls finished.
fho marked this conversation as resolved.
Show resolved Hide resolved
unblockGRPCCall := make(chan struct{})
grpcCallExecuting := make(chan struct{})
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
close(grpcCallExecuting)
<-unblockGRPCCall
return &testpb.SimpleResponse{}, nil
},
}

err := ss.Start(nil)
if err != nil {
t.Fatalf("StubServer.start failed: %s", err)
}
t.Cleanup(ss.Stop)

grpcClientCallReturned := make(chan struct{})
go func() {
clt := ss.Client
_, err := clt.UnaryCall(context.Background(), &testpb.SimpleRequest{})
if err != nil {
t.Errorf("grpc call failed with error: %s", err)
}
close(grpcClientCallReturned)
}()

gracefulStopReturned := make(chan struct{})
<-grpcCallExecuting
go func() {
ss.S.GracefulStop()
close(gracefulStopReturned)
}()

select {
case <-gracefulStopReturned:
t.Error("GracefulStop returned before GRPC method call ended")
case <-time.After(defaultTestShortTimeout):
}

unblockGRPCCall <- struct{}{}
<-grpcClientCallReturned
<-gracefulStopReturned
}

func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
// This tests ensures that when Stop() is called while an ongoing grpc
// call is blocking that:
// - Stop() returns
// - and the GRPC call on the client side fails with an connection
// closed error
fho marked this conversation as resolved.
Show resolved Hide resolved
unblockGRPCCall := make(chan struct{})
grpcCallExecuting := make(chan struct{})
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
close(grpcCallExecuting)
<-unblockGRPCCall
return &testpb.SimpleResponse{}, nil
},
}

err := ss.Start(nil)
if err != nil {
t.Fatalf("StubServer.start failed: %s", err)
}
t.Cleanup(ss.Stop)

grpcClientCallReturned := make(chan struct{})
go func() {
clt := ss.Client
_, err := clt.UnaryCall(context.Background(), &testpb.SimpleRequest{})
if err == nil || !isConnClosedErr(err) {
t.Errorf("expected grpc call to fail with connection closed error, got: %v", err)
}
close(grpcClientCallReturned)
}()

<-grpcCallExecuting
ss.S.Stop()

unblockGRPCCall <- struct{}{}
<-grpcClientCallReturned
}
Loading