Skip to content

Commit

Permalink
Merge #47693 #47746
Browse files Browse the repository at this point in the history
47693: sql: make statusServer optional r=andreimatei,nvanbenschoten a=tbg

SQL depends on the status server to list sessions and nodes. A
pure SQL server (used for multi-tenancy) will not have access to
these.

Wrap the status server in a `(serverpb.StatusServer, bool)` wrapper
that indicates when the status server is not present, and return the
newly introduced `pgerr.UnsupportedWithMultiTenancy()` in such cases.

Over time, we'll have to take stock of the functionality lost that
way and we may have to restore some of it, however at the moment
the focus is on discovering which functionality is affected, and
turning their resolution into work items which can be handed to
the SQL team.

The intention is that looking at the callers of the above error
constructor provides an overview to that end.

Release note: None

47746: roachtest: add stmt_timeout to consistency checks r=knz a=tbg

Context cancellation doesn't do anything since lib/pq uses the Postgres
cancellation protocol which we don't implement. See discussion on:

#34520

The TL;DR is that we ought to be using pgx instead, which would just
close the underlying conn on cancellation which is just what we want
here.

Release note: None

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Apr 21, 2020
3 parents 03d7efc + 0b07504 + ff23783 commit 3e6bf86
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 250 deletions.
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,13 @@ func (c *cluster) FailOnDeadNodes(ctx context.Context, t *test) {
// check since we know that such spurious errors are possibly without any relation
// to the check having failed.
func (c *cluster) CheckReplicaDivergenceOnDB(ctx context.Context, db *gosql.DB) error {
// NB: we set a statement_timeout since context cancellation won't work here,
// see:
// https://github.com/cockroachdb/cockroach/pull/34520
//
// We've seen the consistency checks hang indefinitely in some cases.
rows, err := db.QueryContext(ctx, `
SET statement_timeout = '3m';
SELECT t.range_id, t.start_key_pretty, t.status, t.detail
FROM
crdb_internal.check_consistency(true, '', '') as t
Expand Down
48 changes: 26 additions & 22 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
tsServer := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)

// The InternalExecutor will be further initialized later, as we create more
// of the server's components. There's a circular dependency - many things
Expand Down Expand Up @@ -478,14 +478,14 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

lateBoundServer := &Server{}
// TODO(tbg): give adminServer only what it needs (and avoid circular deps).
adminServer := newAdminServer(lateBoundServer)
sAdmin := newAdminServer(lateBoundServer)
sessionRegistry := sql.NewSessionRegistry()

statusServer := newStatusServer(
sStatus := newStatusServer(
cfg.AmbientCtx,
st,
cfg.Config,
adminServer,
sAdmin,
db,
g,
recorder,
Expand All @@ -498,36 +498,40 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
internalExecutor,
)
// TODO(tbg): don't pass all of Server into this to avoid this hack.
authenticationServer := newAuthenticationServer(lateBoundServer)
for i, gw := range []grpcGatewayServer{adminServer, statusServer, authenticationServer, &tsServer} {
sAuth := newAuthenticationServer(lateBoundServer)
for i, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth, &sTS} {
if reflect.ValueOf(gw).IsNil() {
return nil, errors.Errorf("%d: nil", i)
}
gw.RegisterService(grpcServer.Server)
}

sqlServer, err := newSQLServer(ctx, sqlServerArgs{
sqlServerOptionalArgs: sqlServerOptionalArgs{
rpcContext: rpcContext,
distSender: distSender,
statusServer: func() (*statusServer, bool) {
return sStatus, true
},
nodeLiveness: nodeLiveness,
gossip: g,
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
recorder: recorder,
nodeIDContainer: nodeIDContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
},
Config: &cfg, // NB: s.cfg has a populated AmbientContext.
stopper: stopper,
clock: clock,
rpcContext: rpcContext,
distSender: distSender,
status: statusServer,
nodeLiveness: nodeLiveness,
protectedtsProvider: protectedtsProvider,
gossip: g,
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
recorder: recorder,
runtime: runtimeSampler,
db: db,
registry: registry,
circularInternalExecutor: internalExecutor,
nodeIDContainer: nodeIDContainer,
externalStorage: externalStorage,
externalStorageFromURI: externalStorageFromURI,
jobRegistry: jobRegistry,
isMeta1Leaseholder: node.stores.IsMeta1Leaseholder,
})
if err != nil {
return nil, err
Expand All @@ -553,11 +557,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
registry: registry,
recorder: recorder,
runtime: runtimeSampler,
admin: adminServer,
status: statusServer,
authentication: authenticationServer,
admin: sAdmin,
status: sStatus,
authentication: sAuth,
tsDB: tsDB,
tsServer: &tsServer,
tsServer: &sTS,
raftTransport: raftTransport,
stopper: stopper,
debug: debugServer,
Expand Down
78 changes: 50 additions & 28 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
Expand Down Expand Up @@ -82,14 +83,13 @@ type sqlServer struct {
stmtDiagnosticsRegistry *stmtdiagnostics.Registry
}

type sqlServerArgs struct {
*Config
stopper *stop.Stopper

// SQL uses the clock to assign timestamps to transactions, among many
// others.
clock *hlc.Clock

// sqlServerOptionalArgs are the arguments supplied to newSQLServer which
// are only available if the SQL server runs as part of a KV node.
//
// TODO(tbg): give all of these fields a wrapper that can signal whether the
// respective object is available. When it is not, return
// UnsupportedWithMultiTenancy.
type sqlServerOptionalArgs struct {
// DistSQL uses rpcContext to set up flows. Less centrally, the executor
// also uses rpcContext in a number of places to learn whether the server
// is running insecure, and to read the cluster name.
Expand All @@ -99,16 +99,16 @@ type sqlServerArgs struct {
// uses range descriptors and leaseholders, which DistSender maintains,
// for debugging and DistSQL planning purposes.
distSender *kvcoord.DistSender
// The executorConfig depends on the status server.
// The status server is handed the stmtDiagnosticsRegistry.
status *statusServer
// statusServer gives access to the Status service.
//
// In a SQL tenant server, this is not available (returning false) and
// pgerror.UnsupportedWithMultiTenancy should be returned.
statusServer func() (*statusServer, bool)
// Narrowed down version of *NodeLiveness.
nodeLiveness interface {
jobs.NodeLiveness // jobs uses this
IsLive(roachpb.NodeID) (bool, error) // DistSQLPlanner wants this
}
// The executorConfig uses the provider.
protectedtsProvider protectedts.Provider
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
// diagnostics registry, and the lease manager.
Expand All @@ -121,6 +121,26 @@ type sqlServerArgs struct {
recorder *status.MetricsRecorder
// For the temporaryObjectCleaner.
isMeta1Leaseholder func(hlc.Timestamp) (bool, error)
// DistSQL, lease management, and others want to know the node they're on.
nodeIDContainer *base.NodeIDContainer

// Used by backup/restore.
externalStorage cloud.ExternalStorageFactory
externalStorageFromURI cloud.ExternalStorageFromURIFactory
}

type sqlServerArgs struct {
sqlServerOptionalArgs

*Config
stopper *stop.Stopper

// SQL uses the clock to assign timestamps to transactions, among many
// others.
clock *hlc.Clock

// The executorConfig uses the provider.
protectedtsProvider protectedts.Provider
// DistSQLCfg holds on to this to check for node CPU utilization in
// samplerProcessor.
runtime *status.RuntimeStatSampler
Expand All @@ -136,23 +156,21 @@ type sqlServerArgs struct {
//
// TODO(tbg): make this less hacky.
circularInternalExecutor *sql.InternalExecutor // empty initially
// DistSQL, lease management, and others want to know the node they're on.
//
// TODO(tbg): replace this with a method that can refuse to return a result
// because once we have multi-tenancy, a NodeID will not be available.
nodeIDContainer *base.NodeIDContainer

// Used by backup/restore.
externalStorage cloud.ExternalStorageFactory
externalStorageFromURI cloud.ExternalStorageFromURIFactory

// The protected timestamps KV subsystem depends on this, so it is bound
// early but only gets filled in newSQLServer.
// The protected timestamps KV subsystem depends on this, so we pass a
// pointer to an empty struct in this configuration, which newSQLServer
// fills.
jobRegistry *jobs.Registry
}

func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
sessionRegistry := cfg.status.sessionRegistry
var sessionRegistry *sql.SessionRegistry
if statusServer, ok := cfg.statusServer(); ok {
sessionRegistry = statusServer.sessionRegistry
} else {
sessionRegistry = sql.NewSessionRegistry()
}

execCfg := &sql.ExecutorConfig{}
var jobAdoptionStopFile string
for _, spec := range cfg.Stores.Specs {
Expand Down Expand Up @@ -364,7 +382,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
LeaseManager: leaseMgr,
Clock: cfg.clock,
DistSQLSrv: distSQLServer,
StatusServer: cfg.status,
StatusServer: func() (serverpb.StatusServer, bool) { return cfg.statusServer() },
SessionRegistry: sessionRegistry,
JobRegistry: jobRegistry,
VirtualSchemas: virtualSchemas,
Expand Down Expand Up @@ -511,7 +529,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
execCfg.InternalExecutor = cfg.circularInternalExecutor
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor, cfg.db, cfg.gossip, cfg.Settings)
cfg.status.setStmtDiagnosticsRequester(stmtDiagnosticsRegistry)
if statusServer, ok := cfg.statusServer(); ok {
statusServer.setStmtDiagnosticsRequester(stmtDiagnosticsRegistry)
}
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip)
Expand All @@ -522,7 +542,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
cfg.db,
cfg.registry,
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory,
cfg.status,
func() (serverpb.StatusServer, bool) {
return cfg.statusServer()
},
cfg.isMeta1Leaseholder,
sqlExecutorTestingKnobs,
)
Expand Down
Loading

0 comments on commit 3e6bf86

Please sign in to comment.