From 39690fc311fd0ddc408f0beda907ccd1ce08942c Mon Sep 17 00:00:00 2001 From: Azhng Date: Tue, 1 Feb 2022 23:40:42 +0000 Subject: [PATCH] server: remove contention event registry from baseStatusServer Previously, baseStatusServer holds a reference to contention.Registry. This reference of contention.Registry is used to power the /ListContentionEvents endpoint. However, this is not ideal for two reasons: 1. baseStatusServer already holds a reference to *server.SQLServer, which in turn contains contention.Registry through its ExecutorConfig field. This means that there's no good reason to have another field in baseStatusServer to hold this additional reference. 2. The ongoing contention event registry work will make contention registry depend on status server to perform transaction ID resolution protocol. As it stand today, the status server's construction depends on the creation of contention.Registry. By introducing transaction ID resolution protocol into contention.Registry, we will be introducing a cyclical reference, which can lead to ugly API design. This commit removes the baseStatusServer's reference on contention.Registry, and instead directly fetching from executor config. Release note: None --- pkg/server/server.go | 4 ++-- pkg/server/status.go | 33 +++++++++++++++------------------ pkg/server/tenant.go | 5 ++--- pkg/server/tenant_status.go | 18 ++++++++---------- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index d8b927743089..745b7de438e4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -629,7 +629,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sAdmin := newAdminServer(lateBoundServer, adminAuthzCheck, internalExecutor) sHTTP := newHTTPServer(cfg) sessionRegistry := sql.NewSessionRegistry() - contentionRegistry := contention.NewRegistry() flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st) sStatus := newStatusServer( @@ -647,10 +646,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { node.stores, stopper, sessionRegistry, - contentionRegistry, flowScheduler, internalExecutor, ) + + contentionRegistry := contention.NewRegistry() // TODO(tbg): don't pass all of Server into this to avoid this hack. sAuth := newAuthenticationServer(lateBoundServer) for i, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth, &sTS} { diff --git a/pkg/server/status.go b/pkg/server/status.go index dc8ec377232a..75384ac69e6a 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -135,14 +135,13 @@ type baseStatusServer struct { serverpb.UnimplementedStatusServer log.AmbientContext - privilegeChecker *adminPrivilegeChecker - sessionRegistry *sql.SessionRegistry - contentionRegistry *contention.Registry - flowScheduler *flowinfra.FlowScheduler - st *cluster.Settings - sqlServer *SQLServer - rpcCtx *rpc.Context - stopper *stop.Stopper + privilegeChecker *adminPrivilegeChecker + sessionRegistry *sql.SessionRegistry + flowScheduler *flowinfra.FlowScheduler + st *cluster.Settings + sqlServer *SQLServer + rpcCtx *rpc.Context + stopper *stop.Stopper } // getLocalSessions returns a list of local sessions on this node. Note that the @@ -307,7 +306,7 @@ func (b *baseStatusServer) ListLocalContentionEvents( } return &serverpb.ListContentionEventsResponse{ - Events: b.contentionRegistry.Serialize(), + Events: b.sqlServer.execCfg.ContentionRegistry.Serialize(), }, nil } @@ -415,21 +414,19 @@ func newStatusServer( stores *kvserver.Stores, stopper *stop.Stopper, sessionRegistry *sql.SessionRegistry, - contentionRegistry *contention.Registry, flowScheduler *flowinfra.FlowScheduler, internalExecutor *sql.InternalExecutor, ) *statusServer { ambient.AddLogTag("status", nil) server := &statusServer{ baseStatusServer: &baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: adminAuthzCheck, - sessionRegistry: sessionRegistry, - contentionRegistry: contentionRegistry, - flowScheduler: flowScheduler, - st: st, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: adminAuthzCheck, + sessionRegistry: sessionRegistry, + flowScheduler: flowScheduler, + st: st, + rpcCtx: rpcCtx, + stopper: stopper, }, cfg: cfg, admin: adminServer, diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 502fecd082fa..1bb5f89066a2 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -163,9 +163,10 @@ func StartTenant( // the SQL server object. tenantStatusServer := newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor}, - args.sessionRegistry, args.contentionRegistry, args.flowScheduler, baseCfg.Settings, nil, + args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil, args.rpcContext, args.stopper, ) + args.contentionRegistry = contention.NewRegistry() args.sqlStatusServer = tenantStatusServer s, err := newSQLServer(ctx, args) tenantStatusServer.sqlServer = s @@ -493,7 +494,6 @@ func makeTenantSQLServerArgs( // writing): the blob service and DistSQL. dummyRPCServer := rpc.NewServer(rpcContext) sessionRegistry := sql.NewSessionRegistry() - contentionRegistry := contention.NewRegistry() flowScheduler := flowinfra.NewFlowScheduler(baseCfg.AmbientCtx, stopper, st) return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ @@ -529,7 +529,6 @@ func makeTenantSQLServerArgs( registry: registry, recorder: recorder, sessionRegistry: sessionRegistry, - contentionRegistry: contentionRegistry, flowScheduler: flowScheduler, circularInternalExecutor: circularInternalExecutor, circularJobRegistry: circularJobRegistry, diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 67a317536c37..b12043d82961 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -77,7 +77,6 @@ func newTenantStatusServer( ambient log.AmbientContext, privilegeChecker *adminPrivilegeChecker, sessionRegistry *sql.SessionRegistry, - contentionRegistry *contention.Registry, flowScheduler *flowinfra.FlowScheduler, st *cluster.Settings, sqlServer *SQLServer, @@ -87,15 +86,14 @@ func newTenantStatusServer( ambient.AddLogTag("tenant-status", nil) return &tenantStatusServer{ baseStatusServer: baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: privilegeChecker, - sessionRegistry: sessionRegistry, - contentionRegistry: contentionRegistry, - flowScheduler: flowScheduler, - st: st, - sqlServer: sqlServer, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: privilegeChecker, + sessionRegistry: sessionRegistry, + flowScheduler: flowScheduler, + st: st, + sqlServer: sqlServer, + rpcCtx: rpcCtx, + stopper: stopper, }, } }