Skip to content

Commit

Permalink
server: remove contention event registry from baseStatusServer
Browse files Browse the repository at this point in the history
Previously, baseStatusServer holds a reference to contention.Registry.
This reference of contention.Registry is used to power the
/ListContentionEvents endpoint. However, this is not ideal for two
reasons:
1. baseStatusServer already holds a reference to *server.SQLServer,
   which in turn contains contention.Registry through its ExecutorConfig
   field. This means that there's no good reason to have another field
   in baseStatusServer to hold this additional reference.
2. The ongoing contention event registry work will make contention
   registry depend on status server to perform transaction ID resolution
   protocol. As it stand today, the status server's construction depends
   on the creation of contention.Registry. By introducing transaction ID
   resolution protocol into contention.Registry, we will be introducing
   a cyclical reference, which can lead to ugly API design.

This commit removes the baseStatusServer's reference on
contention.Registry, and instead directly fetching from executor config.

Release note: None
  • Loading branch information
Azhng committed Feb 2, 2022
1 parent 8533644 commit 4a4ff9a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sAdmin := newAdminServer(lateBoundServer, adminAuthzCheck, internalExecutor)
sHTTP := newHTTPServer(cfg)
sessionRegistry := sql.NewSessionRegistry()
contentionRegistry := contention.NewRegistry()
flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st)

sStatus := newStatusServer(
Expand All @@ -647,10 +646,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node.stores,
stopper,
sessionRegistry,
contentionRegistry,
flowScheduler,
internalExecutor,
)

contentionRegistry := contention.NewRegistry()
// TODO(tbg): don't pass all of Server into this to avoid this hack.
sAuth := newAuthenticationServer(lateBoundServer)
for i, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth, &sTS} {
Expand Down
33 changes: 15 additions & 18 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,13 @@ type baseStatusServer struct {
serverpb.UnimplementedStatusServer

log.AmbientContext
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
contentionRegistry *contention.Registry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
}

// getLocalSessions returns a list of local sessions on this node. Note that the
Expand Down Expand Up @@ -307,7 +306,7 @@ func (b *baseStatusServer) ListLocalContentionEvents(
}

return &serverpb.ListContentionEventsResponse{
Events: b.contentionRegistry.Serialize(),
Events: b.sqlServer.execCfg.ContentionRegistry.Serialize(),
}, nil
}

Expand Down Expand Up @@ -415,21 +414,19 @@ func newStatusServer(
stores *kvserver.Stores,
stopper *stop.Stopper,
sessionRegistry *sql.SessionRegistry,
contentionRegistry *contention.Registry,
flowScheduler *flowinfra.FlowScheduler,
internalExecutor *sql.InternalExecutor,
) *statusServer {
ambient.AddLogTag("status", nil)
server := &statusServer{
baseStatusServer: &baseStatusServer{
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
},
cfg: cfg,
admin: adminServer,
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ func StartTenant(
// the SQL server object.
tenantStatusServer := newTenantStatusServer(
baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor},
args.sessionRegistry, args.contentionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
)
contentionRegistry := contention.NewRegistry()
args.contentionRegistry = contentionRegistry
args.sqlStatusServer = tenantStatusServer
s, err := newSQLServer(ctx, args)
tenantStatusServer.sqlServer = s
Expand Down Expand Up @@ -493,7 +495,6 @@ func makeTenantSQLServerArgs(
// writing): the blob service and DistSQL.
dummyRPCServer := rpc.NewServer(rpcContext)
sessionRegistry := sql.NewSessionRegistry()
contentionRegistry := contention.NewRegistry()
flowScheduler := flowinfra.NewFlowScheduler(baseCfg.AmbientCtx, stopper, st)
return sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
Expand Down Expand Up @@ -529,7 +530,6 @@ func makeTenantSQLServerArgs(
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
circularInternalExecutor: circularInternalExecutor,
circularJobRegistry: circularJobRegistry,
Expand Down
18 changes: 8 additions & 10 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func newTenantStatusServer(
ambient log.AmbientContext,
privilegeChecker *adminPrivilegeChecker,
sessionRegistry *sql.SessionRegistry,
contentionRegistry *contention.Registry,
flowScheduler *flowinfra.FlowScheduler,
st *cluster.Settings,
sqlServer *SQLServer,
Expand All @@ -87,15 +86,14 @@ func newTenantStatusServer(
ambient.AddLogTag("tenant-status", nil)
return &tenantStatusServer{
baseStatusServer: baseStatusServer{
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
},
}
}
Expand Down

0 comments on commit 4a4ff9a

Please sign in to comment.