Skip to content

Commit

Permalink
sql: mark use of Gossip as deprecated
Browse files Browse the repository at this point in the history
This commit wraps Gossip in a wrapper when it is used in SQL. This small
wrapper helpfully keeps track, in code, of work that needs to be done to
enable multi-tenancy, and ensures that any deprecated use of Gossip is
associated with a Github issue. The wrapper also makes sure we're not
introducing new dependencies on Gossip.

Release note: None
  • Loading branch information
tbg committed Apr 23, 2020
1 parent c73fb58 commit d0896b5
Show file tree
Hide file tree
Showing 31 changed files with 266 additions and 83 deletions.
23 changes: 16 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,19 @@ func splitAndFilterSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(g *gossip.Gossip) int {
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
g, err := gw.Optional(47970)
if err != nil {
return 0, err
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
_ = g.IterateInfos(
gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
},
)
return nodes, nil
}

type spanAndTime struct {
Expand Down Expand Up @@ -516,7 +522,10 @@ func (b *backupResumer) Resume(
log.Warningf(ctx, "unable to load backup checkpoint while resuming job %d: %v", *b.job.ID(), err)
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
return err
}

res, err := backup(
ctx,
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,11 @@ func (r *restoreResumer) Resume(
return nil
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
return err
}

res, err := restore(
ctx,
p.ExecCfg().DB,
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@ func createBenchmarkChangefeed(
}
_, withDiff := details.Opts[changefeedbase.OptDiff]
kvfeedCfg := kvfeed.Config{
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: s.GossipI().(*gossip.Gossip),
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeDeprecatedGossip(
s.GossipI().(*gossip.Gossip),
false, /* tenant */
),
Spans: spans,
Targets: details.Targets,
Sink: buf,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Config struct {
Settings *cluster.Settings
DB *kv.DB
Clock *hlc.Clock
Gossip *gossip.Gossip
Gossip gossip.DeprecatedGossip
Spans []roachpb.Span
Targets jobspb.ChangefeedTargets
Sink EventBufferWriter
Expand Down
16 changes: 12 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type kvScanner interface {

type scanRequestScanner struct {
settings *cluster.Settings
gossip *gossip.Gossip
gossip gossip.DeprecatedGossip
db *kv.DB
}

Expand All @@ -58,7 +58,11 @@ func (p *scanRequestScanner) Scan(
// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
maxConcurrentExports := clusterNodeCount(p.gossip) *
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
return err
}
maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
exportsSem := make(chan struct{}, maxConcurrentExports)
g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -240,11 +244,15 @@ func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescr
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(g *gossip.Gossip) int {
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
g, err := gw.Optional(47971)
if err != nil {
return 0, err
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
return nodes, nil
}
37 changes: 37 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -1614,3 +1615,39 @@ func (g *Gossip) findClient(match func(*client) bool) *client {
}
return nil
}

// MakeDeprecatedGossip initializes a DeprecatedGossip instance. See
// MakeTenantSQLDeprecatedWrapper for details.
func MakeDeprecatedGossip(g *Gossip, tenant bool) DeprecatedGossip {
return DeprecatedGossip{
w: errorutil.MakeTenantSQLDeprecatedWrapper(g, tenant),
}
}

// DeprecatedGossip is a Gossip instance in a SQL tenant server, which is in the
// process of being phased out.
//
// See the comments on TenantSQLDeprecatedWrapper for details.
type DeprecatedGossip struct {
w errorutil.TenantSQLDeprecatedWrapper
}

// Deprecated trades a Github issue tracking the removal of the call for the
// wrapped Gossip instance.
func (dg DeprecatedGossip) Deprecated(issueNo int) *Gossip {
// NB: some tests use a nil Gossip.
g, _ := dg.w.Deprecated(issueNo).(*Gossip)
return g
}

// Optional returns the Gossip instance if the wrapper was set up to allow it.
// Otherwise, it returns an error referring to the optionally passed in issues.
func (dg DeprecatedGossip) Optional(issueNos ...int) (*Gossip, error) {
v, err := dg.w.Optional(issueNos...)
if err != nil {
return nil, err
}
// NB: some tests use a nil Gossip.
g, _ := v.(*Gossip)
return g, nil
}
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return sStatus, true
},
nodeLiveness: nodeLiveness,
gossip: g,
gossip: gossip.MakeDeprecatedGossip(g, false /* tenant */),
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
recorder: recorder,
Expand Down
14 changes: 9 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type sqlServerOptionalArgs struct {
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
// diagnostics registry, and the lease manager.
gossip *gossip.Gossip
gossip gossip.DeprecatedGossip
// Used by DistSQLConfig and DistSQLPlanner.
nodeDialer *nodedialer.Dialer
// To register blob and DistSQL servers.
Expand Down Expand Up @@ -400,15 +400,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
cfg.rpcContext,
distSQLServer,
cfg.distSender,
cfg.gossip,
cfg.gossip.Deprecated(47900),
cfg.stopper,
cfg.nodeLiveness,
cfg.nodeDialer,
),

TableStatsCache: stats.NewTableStatisticsCache(
cfg.SQLTableStatCacheSize,
cfg.gossip,
cfg.gossip.Deprecated(47925),
cfg.db,
cfg.circularInternalExecutor,
),
Expand Down Expand Up @@ -527,13 +527,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
)
execCfg.InternalExecutor = cfg.circularInternalExecutor
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor, cfg.db, cfg.gossip, cfg.Settings)
cfg.circularInternalExecutor,
cfg.db,
cfg.gossip.Deprecated(47893),
cfg.Settings,
)
if statusServer, ok := cfg.statusServer(); ok {
statusServer.setStmtDiagnosticsRequester(stmtDiagnosticsRegistry)
}
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip)
leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip.Deprecated(47150))
leaseMgr.PeriodicallyRefreshSomeLeases()

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {
distSender: ds,
statusServer: noStatusServer,
nodeLiveness: nl,
gossip: g,
gossip: gossip.MakeDeprecatedGossip(g, true /* tenant */),
nodeDialer: nd,
grpcServer: dummyRPCServer,
recorder: dummyRecorder,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,8 @@ func injectTableStats(
// update is handled asynchronously).
params.extendedEvalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(params.ctx, desc.ID)

return stats.GossipTableStatAdded(params.extendedEvalCtx.ExecCfg.Gossip, desc.ID)
return stats.GossipTableStatAdded(
params.extendedEvalCtx.ExecCfg.Gossip.Deprecated(47925), desc.ID)
}

func (p *planner) removeColumnComment(
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/cancel_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {

statusServer, ok := params.extendedEvalCtx.StatusServer()
if !ok {
return false, pgerror.UnsupportedWithMultiTenancy()
return false, errorutil.UnsupportedWithMultiTenancy()
}

queryIDString, ok := tree.AsDString(datum)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/cancel_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *cancelSessionsNode) Next(params runParams) (bool, error) {

statusServer, ok := params.extendedEvalCtx.StatusServer()
if !ok {
return false, pgerror.UnsupportedWithMultiTenancy()
return false, errorutil.UnsupportedWithMultiTenancy()
}
sessionIDString, ok := tree.AsDString(datum)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func makeMetrics(internal bool) Metrics {

// Start starts the Server's background processing.
func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
gossipUpdateC := s.cfg.Gossip.RegisterSystemConfigChannel()
g := s.cfg.Gossip.Deprecated(47150)
gossipUpdateC := g.RegisterSystemConfigChannel()
stopper.RunWorker(ctx, func(ctx context.Context) {
for {
select {
case <-gossipUpdateC:
sysCfg := s.cfg.Gossip.GetSystemConfig()
sysCfg := g.GetSystemConfig()
s.dbCache.updateSystemConfig(sysCfg)
case <-stopper.ShouldStop():
return
Expand Down
Loading

0 comments on commit d0896b5

Please sign in to comment.