Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: mark use of Gossip as deprecated #47972

Merged
merged 1 commit into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,19 @@ func splitAndFilterSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(g *gossip.Gossip) int {
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
g, err := gw.OptionalErr(47970)
if err != nil {
return 0, err
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
_ = g.IterateInfos(
gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
},
)
return nodes, nil
}

type spanAndTime struct {
Expand Down Expand Up @@ -516,7 +522,10 @@ func (b *backupResumer) Resume(
log.Warningf(ctx, "unable to load backup checkpoint while resuming job %d: %v", *b.job.ID(), err)
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
return err
}

res, err := backup(
ctx,
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,11 @@ func (r *restoreResumer) Resume(
return nil
}

numClusterNodes := clusterNodeCount(p.ExecCfg().Gossip)
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
return err
}

res, err := restore(
ctx,
p.ExecCfg().DB,
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@ func createBenchmarkChangefeed(
}
_, withDiff := details.Opts[changefeedbase.OptDiff]
kvfeedCfg := kvfeed.Config{
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: s.GossipI().(*gossip.Gossip),
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeDeprecatedGossip(
s.GossipI().(*gossip.Gossip),
true, /* exposed */
),
Spans: spans,
Targets: details.Targets,
Sink: buf,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Config struct {
Settings *cluster.Settings
DB *kv.DB
Clock *hlc.Clock
Gossip *gossip.Gossip
Gossip gossip.DeprecatedGossip
Spans []roachpb.Span
Targets jobspb.ChangefeedTargets
Sink EventBufferWriter
Expand Down
16 changes: 12 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type kvScanner interface {

type scanRequestScanner struct {
settings *cluster.Settings
gossip *gossip.Gossip
gossip gossip.DeprecatedGossip
db *kv.DB
}

Expand All @@ -58,7 +58,11 @@ func (p *scanRequestScanner) Scan(
// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
maxConcurrentExports := clusterNodeCount(p.gossip) *
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
return err
}
maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
exportsSem := make(chan struct{}, maxConcurrentExports)
g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -240,11 +244,15 @@ func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescr
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(g *gossip.Gossip) int {
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
return nodes, nil
}
50 changes: 50 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -1614,3 +1615,52 @@ func (g *Gossip) findClient(match func(*client) bool) *client {
}
return nil
}

// MakeDeprecatedGossip initializes a DeprecatedGossip instance.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
//
// See TenantSQLDeprecatedWrapper for details.
func MakeDeprecatedGossip(g *Gossip, exposed bool) DeprecatedGossip {
return DeprecatedGossip{
w: errorutil.MakeTenantSQLDeprecatedWrapper(g, exposed),
}
}

// DeprecatedGossip is a Gossip instance in a SQL tenant server.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
//
// See TenantSQLDeprecatedWrapper for details.
type DeprecatedGossip struct {
w errorutil.TenantSQLDeprecatedWrapper
}

// Deprecated trades a Github issue tracking the removal of the call for the
// wrapped Gossip instance.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
func (dg DeprecatedGossip) Deprecated(issueNo int) *Gossip {
// NB: some tests use a nil Gossip.
g, _ := dg.w.Deprecated(issueNo).(*Gossip)
return g
}

// OptionalErr returns the Gossip instance if the wrapper was set up to allow
// it. Otherwise, it returns an error referring to the optionally passed in
// issues.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
func (dg DeprecatedGossip) OptionalErr(issueNos ...int) (*Gossip, error) {
v, err := dg.w.OptionalErr(issueNos...)
if err != nil {
return nil, err
}
// NB: some tests use a nil Gossip.
g, _ := v.(*Gossip)
return g, nil
}
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return sStatus, true
},
nodeLiveness: nodeLiveness,
gossip: g,
gossip: gossip.MakeDeprecatedGossip(g, true /* exposed */),
nodeDialer: nodeDialer,
grpcServer: grpcServer.Server,
recorder: recorder,
Expand Down
14 changes: 9 additions & 5 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type sqlServerOptionalArgs struct {
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
// diagnostics registry, and the lease manager.
gossip *gossip.Gossip
gossip gossip.DeprecatedGossip
// Used by DistSQLConfig and DistSQLPlanner.
nodeDialer *nodedialer.Dialer
// To register blob and DistSQL servers.
Expand Down Expand Up @@ -400,15 +400,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
cfg.rpcContext,
distSQLServer,
cfg.distSender,
cfg.gossip,
cfg.gossip.Deprecated(distsql.GossipIssueNo),
cfg.stopper,
cfg.nodeLiveness,
cfg.nodeDialer,
),

TableStatsCache: stats.NewTableStatisticsCache(
cfg.SQLTableStatCacheSize,
cfg.gossip,
cfg.gossip.Deprecated(47925),
cfg.db,
cfg.circularInternalExecutor,
),
Expand Down Expand Up @@ -527,13 +527,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
)
execCfg.InternalExecutor = cfg.circularInternalExecutor
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor, cfg.db, cfg.gossip, cfg.Settings)
cfg.circularInternalExecutor,
cfg.db,
cfg.gossip.Deprecated(47893),
cfg.Settings,
)
if statusServer, ok := cfg.statusServer(); ok {
statusServer.setStmtDiagnosticsRequester(stmtDiagnosticsRegistry)
}
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip)
leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip.Deprecated(47150))
leaseMgr.PeriodicallyRefreshSomeLeases()

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func testSQLServerArgs(ts *TestServer) sqlServerArgs {
distSender: ds,
statusServer: noStatusServer,
nodeLiveness: nl,
gossip: g,
gossip: gossip.MakeDeprecatedGossip(g, false /* exposed */),
nodeDialer: nd,
grpcServer: dummyRPCServer,
recorder: dummyRecorder,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,8 @@ func injectTableStats(
// update is handled asynchronously).
params.extendedEvalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(params.ctx, desc.ID)

return stats.GossipTableStatAdded(params.extendedEvalCtx.ExecCfg.Gossip, desc.ID)
return stats.GossipTableStatAdded(
params.extendedEvalCtx.ExecCfg.Gossip.Deprecated(47925), desc.ID)
}

func (p *planner) removeColumnComment(
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/cancel_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {

statusServer, ok := params.extendedEvalCtx.StatusServer()
if !ok {
return false, pgerror.UnsupportedWithMultiTenancy()
return false, errorutil.UnsupportedWithMultiTenancy()
}

queryIDString, ok := tree.AsDString(datum)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/cancel_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func (n *cancelSessionsNode) Next(params runParams) (bool, error) {

statusServer, ok := params.extendedEvalCtx.StatusServer()
if !ok {
return false, pgerror.UnsupportedWithMultiTenancy()
return false, errorutil.UnsupportedWithMultiTenancy()
}
sessionIDString, ok := tree.AsDString(datum)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func makeMetrics(internal bool) Metrics {

// Start starts the Server's background processing.
func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
gossipUpdateC := s.cfg.Gossip.RegisterSystemConfigChannel()
g := s.cfg.Gossip.Deprecated(47150)
gossipUpdateC := g.RegisterSystemConfigChannel()
stopper.RunWorker(ctx, func(ctx context.Context) {
for {
select {
case <-gossipUpdateC:
sysCfg := s.cfg.Gossip.GetSystemConfig()
sysCfg := g.GetSystemConfig()
s.dbCache.updateSystemConfig(sysCfg)
case <-stopper.ShouldStop():
return
Expand Down
Loading