Skip to content

Commit

Permalink
server,sqlliveness: rework shutting down SQL pods on sqlliveness prob…
Browse files Browse the repository at this point in the history
…lems

This patch changes how stopping the stopper and, in turn, terminating
the process happens for tenant SQL pods when the sqlliveness session is
expired. Before this patch, the liveness code was calling the
Stopper.Stop() directly. This patch replaces that with bubbling up an
event up to the cli layer, which is where the logic around terminating
the process lives. That guy has  more complex shutdown logic - it has
the opportunity to perform draining if it wants (or at least it will
in #90143), and it also prints a message with the cause of the exit.

An alternative considered was to extend the Stopper.Stop() interface
somehow with a termination message, and continue to use the stopper as
the mechanism used by arbitrary modules to signal the desire to
terminate the server. I've decided against it, though, in favor of
introducing another mechanism for signaling requests to terminate up to
the cli layer, on the argument that the stopper is passed around so
widely and used in a lot of non-production libraries, such that it's
hard to trace and tell what production modules call Stop() on it and
under which circumstances. Also, stopper.Stop() cannot be deadlocks if
called from inside a stopper task (and our code generally runs inside
stopper tasks), so we'd also need to add a new async stop interface to
the stopper.
Thus, the goal of this patch is to keep the number of callers to
Stopper.Stop() small - right now there are two of them: one in the cli,
and one in the drainServer. In a followup patch, I might remove the
drainServer call, and then we'd be left with exactly one caller to
Stopper.Stop().

Fixes #85540

Release note: None
Epic: None
  • Loading branch information
andreimatei committed Oct 26, 2022
1 parent bb8ffd3 commit e873a0e
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 131 deletions.
4 changes: 1 addition & 3 deletions pkg/cli/mt_start_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,11 @@ func runStartSQL(cmd *cobra.Command, args []string) error {
// logging to files, periodic memory output, heap and goroutine dumps.
// Then use them here.

serverStartupErrC := make(chan error, 1)
var serverStatusMu serverStatus
serverStatusMu.started = true

return waitForShutdown(
func() serverShutdownInterface { return tenantServer },
stopper,
serverStartupErrC, signalCh,
stopper, tenantServer.ShutdownRequested(), signalCh,
&serverStatusMu)
}
90 changes: 55 additions & 35 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ If problems persist, please see %s.`
// if we get stuck on something during initialization (#10138).
var serverStatusMu serverStatus
var s *server.Server
serverStartupErrC := make(chan error, 1)
shutdownErrC := make(chan error, 1)
go func() {
// Ensure that the log files see the startup messages immediately.
defer log.Flush()
Expand All @@ -599,11 +599,11 @@ If problems persist, please see %s.`
defer startupSpan.Finish()

// Any error beyond this point should be reported through the
// serverStartupErrC defined above. However, in Go the code pattern "if err
// shutdownErrC defined above. However, in Go the code pattern "if err
// != nil { return err }" is more common. Expecting contributors
// to remember to write "if err != nil { serverStartupErrC <- err }" beyond
// to remember to write "if err != nil { shutdownErrC <- err }" beyond
// this point is optimistic. To avoid any mistake, we capture all
// the error returns in a closure, and do the serverStartupErrC reporting,
// the error returns in a closure, and do the shutdownErrC reporting,
// if needed, when that function returns.
if err := func() error {
// Instantiate the server.
Expand Down Expand Up @@ -674,18 +674,48 @@ If problems persist, please see %s.`
return reportServerInfo(ctx, tBegin, &serverCfg, s.ClusterSettings(),
true /* isHostNode */, initialStart, uuid.UUID{} /* tenantClusterID */)
}(); err != nil {
serverStartupErrC <- err
shutdownErrC <- newServerStartupError(err)
} else {
// Start a goroutine that watches for shutdown requests and notifies
// errChan.
go func() {
select {
case err := <-s.ShutdownRequested():
shutdownErrC <- err
case <-stopper.ShouldQuiesce():
}
}()
}
}()

return waitForShutdown(
// NB: we delay the access to s, as it is assigned
// asynchronously in a goroutine above.
func() serverShutdownInterface { return s },
stopper, serverStartupErrC, signalCh,
stopper, shutdownErrC, signalCh,
&serverStatusMu)
}

// serverStartupError wraps errors encoutered during server startup.
type serverStartupError struct {
cause error
}

var _ errors.Wrapper = &serverStartupError{}

func (e *serverStartupError) Error() string {
return fmt.Sprintf("error during server startup: %s", e.Unwrap())
}

// Unwrap implements the errors.Wrapper interface.
func (e *serverStartupError) Unwrap() error {
return e.cause
}

func newServerStartupError(cause error) error {
return &serverStartupError{cause: cause}
}

// serverStatus coordinates the async goroutine that starts the server
// up (e.g. in runStart) and the async goroutine that stops the server
// (in waitForShutdown).
Expand Down Expand Up @@ -743,24 +773,21 @@ type serverShutdownInterface interface {
Drain(ctx context.Context, verbose bool) (uint64, redact.RedactableString, error)
}

// waitForShutdown lets the server run asynchronously and waits for
// shutdown, either due to the server spontaneously shutting down
// (signaled by stopper), or due to a server error (signaled on
// serverStartupErrC), by receiving a signal (signaled by signalCh).
// waitForShutdown blocks until interrupted by a shutdown signal, which can come
// in several forms:
// - a call to stopper.Stop(). This is done, by example, by the DrainServer.
// - a shutdown request coming from an internal module being signaled on shutdownC.
// - receiving a Unix signal on signalCh.
//
// Depending on what interruption is received, the server might be drained
// before shutting down.
func waitForShutdown(
getS func() serverShutdownInterface,
stopper *stop.Stopper,
serverStartupErrC chan error,
shutdownC <-chan error,
signalCh chan os.Signal,
serverStatusMu *serverStatus,
) (returnErr error) {
// The remainder of the main function executes concurrently with the
// start up goroutine started above.
//
// It is concerned with determining when the server should stop
// because the main process is being shut down, e.g. via a RPC call
// or a signal.

// We'll want to log any shutdown activity against a separate span.
// We cannot use s.AnnotateCtx here because the server might not have
// been assigned yet (the goroutine above runs asynchronously).
Expand All @@ -769,33 +796,26 @@ func waitForShutdown(

stopWithoutDrain := make(chan struct{}) // closed if interrupted very early

// Block until one of the signals above is received or the stopper
// is stopped externally (for example, via the quit endpoint).
select {
case err := <-serverStartupErrC:
// An error in errChat signals that the early server startup failed.
case err := <-shutdownC:
returnErr = err
// At this point, we do not expect any application load, etc., and
// therefore we are OK with an expedited shutdown: pass false to
// shouldDrain.
startShutdownAsync(getS, stopper, serverStatusMu, stopWithoutDrain, false /* shouldDrain */)
// There's no point in draining if the server didn't even fully start.
drain := !errors.HasType(err, &serverStartupError{})
startShutdownAsync(getS, stopper, serverStatusMu, stopWithoutDrain, drain)
// We do not return here, on purpose, because we want the common
// shutdown logic below to apply for this case as well.

case <-stopper.ShouldQuiesce():
// Receiving a signal on ShouldQuiesce means that a shutdown was
// requested via the Drain RPC. The RPC code takes ownership
// of calling stopper.Stop.
//
// We fall through to the common logic below so that an operator
// looking at a server running in the foreground on their terminal
// can see what is going on.
// Receiving a signal on ShouldQuiesce means that a shutdown was requested
// via the Drain RPC. The RPC handler called stopper.Stop(), so there's no
// need (and it's also too late) for us to call startShutdownAsync().

// StartAlwaysFlush both flushes and ensures that subsequent log
// writes are flushed too.
log.StartAlwaysFlush()
// We do not return here, on purpose, because we want the common
// shutdown logic below to apply for this case as well.
// We fall through to the common logic below so that an operator
// looking at a server running in the foreground on their terminal
// can see what is going on.

case sig := <-signalCh:
// We start flushing log writes from here, because if a
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ go_library(
"//pkg/sql/sqlinstance",
"//pkg/sql/sqlinstance/instanceprovider",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/sqlliveness/slprovider",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlstats/insights",
Expand Down
9 changes: 9 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type Server struct {
}

// NewServer creates a Server from a server.Config.
//
// The caller is responsible for listening on the server's ShutdownRequested()
// channel and calling stopper.Stop().
func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
if err := cfg.ValidateAddrs(context.Background()); err != nil {
return nil, err
Expand Down Expand Up @@ -1782,6 +1785,12 @@ func (s *Server) Stop() {
s.stopper.Stop(context.Background())
}

// ShutdownRequested returns a channel that is signaled when a subsystem wants
// the server to be shut down.
func (s *Server) ShutdownRequested() <-chan error {
return s.sqlServer.ShutdownRequested()
}

// TempDir returns the filepath of the temporary directory used for temp storage.
// It is empty for an in-memory temp storage.
func (s *Server) TempDir() string {
Expand Down
89 changes: 84 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instanceprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -135,6 +136,7 @@ import (
type SQLServer struct {
ambientCtx log.AmbientContext
stopper *stop.Stopper
stopTrigger *stopTrigger
sqlIDContainer *base.SQLIDContainer
pgServer *pgwire.Server
distSQLServer *distsql.ServerImpl
Expand Down Expand Up @@ -237,6 +239,47 @@ type sqlServerOptionalTenantArgs struct {
tenantConnect kvtenant.Connector
}

// stopTrigger is used by modules to signal the desire to stop the server. When
// signaled, the stopTrigger notifies listeners on a channel.
type stopTrigger struct {
mu struct {
syncutil.Mutex
shutdownErr error
}
c chan error
}

func newStopTrigger() *stopTrigger {
return &stopTrigger{
// The channel is buffered so that there's no requirement that anyone ever
// calls C() and reads from this channel.
c: make(chan error, 1),
}
}

// signalStop is used to signal that the server should shut down. The shutdown
// is asynchronous.
func (s *stopTrigger) signalStop(err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.shutdownErr != nil {
// Someone else already triggered the shutdown.
return
}
s.mu.shutdownErr = err
// Writing to s.c is done under the lock, so there can ever only be one value
// written and the writer does not block.
s.c <- err
}

// C returns the channel that is signaled by signaledStop().
//
// Generally, there should be only one caller to C(); shutdown requests are
// delivered once, not broadcast.
func (s *stopTrigger) C() <-chan error {
return s.c
}

type sqlServerArgs struct {
sqlServerOptionalKVArgs
sqlServerOptionalTenantArgs
Expand Down Expand Up @@ -404,6 +447,21 @@ func newRootSQLMemoryMonitor(opts monitorAndMetricsOptions) monitorAndMetrics {
}
}

// stopperSessionEventListener implements slinstance.SessionEventListener and
// turns a session deletion event into a request to stop the server.
type stopperSessionEventListener struct {
trigger *stopTrigger
}

var _ slinstance.SessionEventListener = &stopperSessionEventListener{}

func (s *stopperSessionEventListener) OnSessionDeleted() {
s.trigger.signalStop(errors.New("sql liveness session deleted"))
}

// newSQLServer constructs a new SQLServer. The caller is responsible for
// listening to the server's ShutdownRequested() channel and stopping
// cfg.stopper when signaled.
func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// NB: ValidateAddrs also fills in defaults.
if err := cfg.Config.ValidateAddrs(ctx); err != nil {
Expand All @@ -428,15 +486,29 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
tracingService := service.New(cfg.Tracer)
tracingservicepb.RegisterTracingServer(cfg.grpcServer, tracingService)

sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs)
cfg.sqlLivenessProvider = slprovider.New(
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs,
)
stopTrigger := newStopTrigger()

// If the node id is already populated, we only need to create a placeholder
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID()

sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs)
var sessionEventsConsumer slinstance.SessionEventListener
if !isNotSQLPod {
// For SQL pods, we want the process to shutdown when the session liveness
// record is found to be deleted. This is because, if the session is
// deleted, the instance ID used by this server may have been stolen by
// another server, or it may be stolen in the future. This server shouldn't
// use the instance ID anymore, and there's no mechanism for allocating a
// new one after startup.
sessionEventsConsumer = &stopperSessionEventListener{trigger: stopTrigger}
}
cfg.sqlLivenessProvider = slprovider.New(
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer,
)

if isNotSQLPod {
cfg.sqlInstanceProvider = sqlinstance.NewFakeSQLProvider()
} else {
Expand Down Expand Up @@ -1160,6 +1232,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
stopTrigger: stopTrigger,
sqlIDContainer: cfg.nodeIDContainer,
pgServer: pgServer,
distSQLServer: distSQLServer,
Expand Down Expand Up @@ -1615,3 +1688,9 @@ func prepareUnixSocket(
func (s *SQLServer) LogicalClusterID() uuid.UUID {
return s.execCfg.NodeInfo.LogicalClusterID()
}

// ShutdownRequested returns a channel that is signaled when a subsystem wants
// the server to be shut down.
func (s *SQLServer) ShutdownRequested() <-chan error {
return s.stopTrigger.C()
}
12 changes: 11 additions & 1 deletion pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func (s *SQLServerWrapper) Drain(
return s.drainServer.runDrain(ctx, verbose)
}

// NewTenantServer creates a tenant-specific, SQL-only server against a KV backend.
// NewTenantServer creates a tenant-specific, SQL-only server against a KV
// backend.
//
// The caller is responsible for listening to the server's ShutdownRequested()
// channel and stopping cfg.stopper when signaled.
func NewTenantServer(
ctx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, sqlCfg SQLConfig,
) (*SQLServerWrapper, error) {
Expand Down Expand Up @@ -690,6 +694,12 @@ func (s *SQLServerWrapper) StartDiagnostics(ctx context.Context) {
s.sqlServer.StartDiagnostics(ctx)
}

// ShutdownRequested returns a channel that is signaled when a subsystem wants
// the server to be shut down.
func (s *SQLServerWrapper) ShutdownRequested() <-chan error {
return s.sqlServer.ShutdownRequested()
}

func makeTenantSQLServerArgs(
startupCtx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, sqlCfg SQLConfig,
) (sqlServerArgs, error) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,15 @@ func (ts *TestServer) Start(ctx context.Context) error {
ts.Stopper().Stop(context.Background())
return err
}
go func() {
// If the server requests a shutdown, do that simply by stopping the
// stopper.
select {
case <-ts.Server.ShutdownRequested():
ts.Stopper().Stop(ts.Server.AnnotateCtx(context.Background()))
case <-ts.Stopper().ShouldQuiesce():
}
}()
return nil
}

Expand Down Expand Up @@ -905,6 +914,15 @@ func (ts *TestServer) StartTenant(
if err != nil {
return nil, err
}
go func() {
// If the server requests a shutdown, do that simply by stopping the
// tenant's stopper.
select {
case <-sw.ShutdownRequested():
stopper.Stop(sw.AnnotateCtx(context.Background()))
case <-stopper.ShouldQuiesce():
}
}()

if err := sw.Start(ctx); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit e873a0e

Please sign in to comment.