From 569f5b8315ae0dbac549f9e236fb73e2d6bb7d4c Mon Sep 17 00:00:00 2001 From: Sebastien Boeuf Date: Tue, 29 Jan 2019 11:12:45 -0800 Subject: [PATCH] agent: Properly stop the gRPC server This commit attempts to close cleanly the gRPC server so that tracing will be ended properly. Fixes #445 Signed-off-by: Sebastien Boeuf --- agent.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- grpc.go | 4 ++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/agent.go b/agent.go index ecc0e3a57b..83ec53d84a 100644 --- a/agent.go +++ b/agent.go @@ -120,6 +120,7 @@ type sandbox struct { enableGrpcTrace bool sandboxPidNs bool storages map[string]*sandboxStorage + stopServer chan struct{} } var agentFields = logrus.Fields{ @@ -524,6 +525,44 @@ func (s *sandbox) teardownSharedPidNs() error { return nil } +func (s *sandbox) waitForStopServer() { + fieldLogger := agentLog.WithField("subsystem", "stopserverwatcher") + + fieldLogger.Info("Waiting for stopServer signal...") + + // Wait for DestroySandbox() to signal this thread about the need to + // stop the server. + <-s.stopServer + + fieldLogger.Info("stopServer signal received") + + if s.server == nil { + fieldLogger.Info("No server initialized, nothing to stop") + return + } + + defer fieldLogger.Info("gRPC server stopped") + + // Try to gracefully stop the server for a minute + timeout := time.Minute + done := make(chan struct{}) + go func() { + s.server.GracefulStop() + close(done) + }() + + select { + case <-done: + s.server = nil + return + case <-time.After(timeout): + fieldLogger.WithField("timeout", timeout).Warn("Could not gracefully stop the server") + } + + fieldLogger.Info("Force stopping the server now") + s.stopGRPC() +} + func (s *sandbox) listenToUdevEvents() { fieldLogger := agentLog.WithField("subsystem", "udevlistener") @@ -810,6 +849,7 @@ func (s *sandbox) startGRPC() { defer s.wg.Done() var err error + var servErr error for { agentLog.Info("agent grpc server starts") @@ -833,8 +873,8 @@ func (s *sandbox) startGRPC() { } // l is closed when Serve() returns - err = grpcServer.Serve(l) - if err != nil { + servErr = grpcServer.Serve(l) + if servErr != nil { agentLog.WithError(err).Warn("agent grpc server quits") } @@ -842,6 +882,17 @@ func (s *sandbox) startGRPC() { if err != nil { agentLog.WithError(err).Warn("agent grpc channel teardown failed") } + + // Based on the definition of grpc.Serve(), the function + // returns nil in case of a proper stop triggered by either + // grpc.GracefulStop() or grpc.Stop(). Those calls can only + // be issued by the chain of events coming from DestroySandbox + // and explicitly means the server should not try to listen + // again, as the sandbox is being completely removed. + if servErr == nil { + agentLog.Info("agent grpc server has been explicitly stopped") + return + } } }() } @@ -1019,6 +1070,7 @@ func realMain() { pciDeviceMap: make(map[string]string), deviceWatchers: make(map[string](chan string)), storages: make(map[string]*sandboxStorage), + stopServer: make(chan struct{}), } if err = s.initLogger(); err != nil { @@ -1046,6 +1098,8 @@ func realMain() { // Start gRPC server. s.startGRPC() + go s.waitForStopServer() + go s.listenToUdevEvents() s.wg.Wait() diff --git a/grpc.go b/grpc.go index 71581bec09..03a7e5a207 100644 --- a/grpc.go +++ b/grpc.go @@ -1341,6 +1341,10 @@ func (a *agentGRPC) DestroySandbox(ctx context.Context, req *pb.DestroySandboxRe return emptyResp, err } + // Close stopServer channel to signal the main agent code to stop + // the server when all gRPC calls will be completed. + close(a.sandbox.stopServer) + a.sandbox.hostname = "" a.sandbox.id = "" a.sandbox.containers = make(map[string]*container)