From ceb331da53ac06d9067373795b6da139feb6b9ab Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 28 Jul 2020 16:58:53 -0400 Subject: [PATCH] kvtenant: implement SystemConfigProvider, remove Gossip from SQL-only server Fixes #49445. This commit introduces a new SystemConfigProvider abstraction, which is capable of providing the SystemConfig, as well as notifying clients of updates to the SystemConfig. Gossip already implements this interface. The commit then updates SQL to use this new dependency in place of Gossip whenever it needs to access the SystemConfig. After making this change, it then updates the kvtenant.Proxy to implement the new SystemConfigProvider interface. This is powered by the GossipSubscription RPC that was added in #50520. The commit updates the subscription to also match on the "system-db" gossip key, and just like that, it can provide the SystemConfig to SQL [*]. Finally, with the kvtenant.Proxy serving the role of a SystemConfigProvider to SQL when applicable, we're able to remove gossip entirely from StartTenant. SQL-only servers will no longer join the gossip network, which is a nice milestone for all of this work. [*] there are a few remaining questions about how exactly we want to enforce an access control policy on the system config gossip pattern. See the updated comment in `Node.GossipSubscription`. For now, we're just returning the entire SystemConfig to the subscription. --- pkg/ccl/kvccl/kvtenantccl/proxy.go | 114 ++++++++++++++++++------ pkg/ccl/kvccl/kvtenantccl/proxy_test.go | 80 +++++++++++++++-- pkg/config/provider.go | 25 ++++++ pkg/gossip/gossip.go | 45 ---------- pkg/kv/kvclient/kvtenant/proxy.go | 22 ++++- pkg/server/node.go | 8 ++ pkg/server/server.go | 1 + pkg/server/server_sql.go | 5 ++ pkg/server/testserver.go | 57 +++--------- pkg/sql/conn_executor.go | 4 +- pkg/sql/exec_util.go | 1 + pkg/sql/gcjob/gc_job.go | 2 +- pkg/sql/gcjob/refresh_statuses.go | 6 +- pkg/sql/opt_catalog.go | 2 +- 14 files changed, 238 insertions(+), 134 deletions(-) create mode 100644 pkg/config/provider.go diff --git a/pkg/ccl/kvccl/kvtenantccl/proxy.go b/pkg/ccl/kvccl/kvtenantccl/proxy.go index e0aee54366a4..27054a998b11 100644 --- a/pkg/ccl/kvccl/kvtenantccl/proxy.go +++ b/pkg/ccl/kvccl/kvtenantccl/proxy.go @@ -14,8 +14,11 @@ import ( "context" "io" "math/rand" + "sort" "time" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" @@ -49,12 +52,17 @@ type Proxy struct { rpcRetryOptions retry.Options rpcDialTimeout time.Duration // for testing rpcDial singleflight.Group + defaultZoneCfg *zonepb.ZoneConfig addrs []string startupC chan struct{} - mu syncutil.RWMutex - client roachpb.InternalClient - nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor + mu struct { + syncutil.RWMutex + client roachpb.InternalClient + nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor + systemConfig *config.SystemConfig + systemConfigChannels []chan<- struct{} + } } // Proxy is capable of providing information on each of the KV nodes in the @@ -70,28 +78,29 @@ var _ kvcoord.NodeDescStore = (*Proxy)(nil) // requested owned by the requesting tenant?). var _ kvcoord.RangeDescriptorDB = (*Proxy)(nil) +// Proxy is capable of providing a filtered view of the SystemConfig containing +// only information applicable to secondary tenants. This obviates the need for +// SQL-only tenant processes to join the cluster-wide gossip network. +var _ config.SystemConfigProvider = (*Proxy)(nil) + // NewProxy creates a new Proxy. -func NewProxy( - ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string, -) *Proxy { - ac.AddLogTag("tenant-proxy", nil) +func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy { + cfg.AmbientCtx.AddLogTag("tenant-proxy", nil) return &Proxy{ - AmbientContext: ac, - rpcContext: rpcContext, - rpcRetryOptions: rpcRetryOptions, + AmbientContext: cfg.AmbientCtx, + rpcContext: cfg.RPCContext, + rpcRetryOptions: cfg.RPCRetryOptions, + defaultZoneCfg: cfg.DefaultZoneConfig, addrs: addrs, startupC: make(chan struct{}), - nodeDescs: make(map[roachpb.NodeID]*roachpb.NodeDescriptor), } } // proxyFactory implements kvtenant.ProxyFactory. type proxyFactory struct{} -func (proxyFactory) NewProxy( - ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string, -) (kvtenant.Proxy, error) { - return NewProxy(ac, rpcContext, rpcRetryOptions, addrs), nil +func (proxyFactory) NewProxy(cfg kvtenant.ProxyConfig, addrs []string) (kvtenant.Proxy, error) { + return NewProxy(cfg, addrs), nil } // Start launches the proxy's worker thread and waits for it to receive an @@ -158,9 +167,10 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) { } var gossipSubsHandlers = map[string]func(*Proxy, context.Context, string, roachpb.Value){ - // Subscribe to all *NodeDescriptor updates in the gossip network. + // Subscribe to all *NodeDescriptor updates. gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Proxy).updateNodeAddress, - // TODO(nvanbenschoten): subscribe to updates to the tenant zones key. + // Subscribe to a filtered view of *SystemConfig updates. + gossip.KeySystemConfig: (*Proxy).updateSystemConfig, } var gossipSubsPatterns = func() []string { @@ -168,6 +178,7 @@ var gossipSubsPatterns = func() []string { for pattern := range gossipSubsHandlers { patterns = append(patterns, pattern) } + sort.Strings(patterns) return patterns }() @@ -176,7 +187,7 @@ var gossipSubsPatterns = func() []string { func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) { desc := new(roachpb.NodeDescriptor) if err := content.GetProto(desc); err != nil { - log.Errorf(ctx, "%v", err) + log.Errorf(ctx, "could not unmarshal node descriptor: %v", err) return } @@ -188,20 +199,73 @@ func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roach // nothing ever removes them from Gossip.nodeDescs. Fix this. p.mu.Lock() defer p.mu.Unlock() - p.nodeDescs[desc.NodeID] = desc + if p.mu.nodeDescs == nil { + p.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) + } + p.mu.nodeDescs[desc.NodeID] = desc } // GetNodeDescriptor implements the kvcoord.NodeDescStore interface. func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) { p.mu.RLock() defer p.mu.RUnlock() - desc, ok := p.nodeDescs[nodeID] + desc, ok := p.mu.nodeDescs[nodeID] if !ok { return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID) } return desc, nil } +// updateSystemConfig handles updates to a filtered view of the "system-db" +// gossip key, performing the corresponding update to the Proxy's cached +// SystemConfig. +func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) { + cfg := config.NewSystemConfig(p.defaultZoneCfg) + if err := content.GetProto(&cfg.SystemConfigEntries); err != nil { + log.Errorf(ctx, "could not unmarshal system config: %v", err) + return + } + + p.mu.Lock() + defer p.mu.Unlock() + p.mu.systemConfig = cfg + for _, c := range p.mu.systemConfigChannels { + select { + case c <- struct{}{}: + default: + } + } +} + +// GetSystemConfig implements the config.SystemConfigProvider interface. +func (p *Proxy) GetSystemConfig() *config.SystemConfig { + // TODO DURING REVIEW: how does this work with Gossip when the SystemConfig + // hasn't reached a newly-started node? The comment on Gossip says that it + // returns nil, but then do we wait for this to be non-nil somewhere? Is SQL + // really able to handle a nil SystemConfig on all paths? I doubt that. + p.mu.RLock() + defer p.mu.RUnlock() + return p.mu.systemConfig +} + +// RegisterSystemConfigChannel implements the config.SystemConfigProvider +// interface. +func (p *Proxy) RegisterSystemConfigChannel() <-chan struct{} { + // Create channel that receives new system config notifications. + // The channel has a size of 1 to prevent proxy from blocking on it. + c := make(chan struct{}, 1) + + p.mu.Lock() + defer p.mu.Unlock() + p.mu.systemConfigChannels = append(p.mu.systemConfigChannels, c) + + // Notify the channel right away if we have a config. + if p.mu.systemConfig != nil { + c <- struct{}{} + } + return c +} + // RangeLookup implements the kvcoord.RangeDescriptorDB interface. func (p *Proxy) RangeLookup( ctx context.Context, key roachpb.RKey, useReverseScan bool, @@ -256,7 +320,7 @@ func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) { // context is canceled. func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) { p.mu.RLock() - if c := p.client; c != nil { + if c := p.mu.client; c != nil { p.mu.RUnlock() return c, nil } @@ -269,7 +333,7 @@ func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) { return nil, err } // NB: read lock not needed. - return p.client, nil + return p.mu.client, nil }) p.mu.RUnlock() @@ -299,7 +363,7 @@ func (p *Proxy) dialAddrs(ctx context.Context) error { } client := roachpb.NewInternalClient(conn) p.mu.Lock() - p.client = client + p.mu.client = client p.mu.Unlock() return nil } @@ -326,7 +390,7 @@ func (p *Proxy) tryForgetClient(ctx context.Context, c roachpb.InternalClient) { // Compare-and-swap to avoid thrashing. p.mu.Lock() defer p.mu.Unlock() - if p.client == c { - p.client = nil + if p.mu.client == c { + p.mu.client = nil } } diff --git a/pkg/ccl/kvccl/kvtenantccl/proxy_test.go b/pkg/ccl/kvccl/kvtenantccl/proxy_test.go index 64f89000f5e8..4dd1fcb5b959 100644 --- a/pkg/ccl/kvccl/kvtenantccl/proxy_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/proxy_test.go @@ -14,7 +14,9 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -73,6 +75,18 @@ func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscri } } +func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.GossipSubscriptionEvent { + val, err := protoutil.Marshal(cfg) + if err != nil { + panic(err) + } + return &roachpb.GossipSubscriptionEvent{ + Key: gossip.KeySystemConfig, + Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}), + PatternMatched: gossip.KeySystemConfig, + } +} + func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) { t.Helper() testutils.SucceedsSoon(t, func() error { @@ -81,7 +95,8 @@ func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) { }) } -// TestProxyGossipSubscription tests Proxy's role as a kvcoord.NodeDescStore. +// TestProxyGossipSubscription tests Proxy's roles as a kvcoord.NodeDescStore +// and as a config.SystemConfigProvider. func TestProxyGossipSubscription(t *testing.T) { defer leaktest.AfterTest(t)() @@ -95,8 +110,9 @@ func TestProxyGossipSubscription(t *testing.T) { gossipSubC := make(chan *roachpb.GossipSubscriptionEvent) defer close(gossipSubC) gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error { - assert.Len(t, req.Patterns, 1) + assert.Len(t, req.Patterns, 2) assert.Equal(t, "node:.*", req.Patterns[0]) + assert.Equal(t, "system-db", req.Patterns[1]) for gossipSub := range gossipSubC { if err := stream.Send(gossipSub); err != nil { return err @@ -108,8 +124,13 @@ func TestProxyGossipSubscription(t *testing.T) { ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) require.NoError(t, err) + cfg := kvtenant.ProxyConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOpts, + } addrs := []string{ln.Addr().String()} - p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs) + p := NewProxy(cfg, addrs) // Start should block until the first GossipSubscription response. startedC := make(chan error) @@ -158,6 +179,42 @@ func TestProxyGossipSubscription(t *testing.T) { desc, err = p.GetNodeDescriptor(3) require.Equal(t, node3, desc) require.NoError(t, err) + + // Test config.SystemConfigProvider impl. Should not have a SystemConfig yet. + sysCfg := p.GetSystemConfig() + require.Nil(t, sysCfg) + sysCfgC := p.RegisterSystemConfigChannel() + require.Len(t, sysCfgC, 0) + + // Return first SystemConfig response. + sysCfgEntries := &config.SystemConfigEntries{Values: []roachpb.KeyValue{ + {Key: roachpb.Key("a")}, + {Key: roachpb.Key("b")}, + }} + gossipSubC <- gossipEventForSystemConfig(sysCfgEntries) + + // Test config.SystemConfigProvider impl. Wait for update first. + <-sysCfgC + sysCfg = p.GetSystemConfig() + require.NotNil(t, sysCfg) + require.Equal(t, sysCfgEntries.Values, sysCfg.Values) + + // Return updated SystemConfig response. + sysCfgEntriesUp := &config.SystemConfigEntries{Values: []roachpb.KeyValue{ + {Key: roachpb.Key("a")}, + {Key: roachpb.Key("c")}, + }} + gossipSubC <- gossipEventForSystemConfig(sysCfgEntriesUp) + + // Test config.SystemConfigProvider impl. Wait for update first. + <-sysCfgC + sysCfg = p.GetSystemConfig() + require.NotNil(t, sysCfg) + require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values) + + // A newly registered SystemConfig channel will be immediately notified. + sysCfgC2 := p.RegisterSystemConfigChannel() + require.Len(t, sysCfgC2, 1) } // TestProxyGossipSubscription tests Proxy's role as a kvcoord.RangeDescriptorDB. @@ -187,8 +244,13 @@ func TestProxyRangeLookup(t *testing.T) { ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) require.NoError(t, err) + cfg := kvtenant.ProxyConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOpts, + } addrs := []string{ln.Addr().String()} - p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs) + p := NewProxy(cfg, addrs) // NOTE: we don't actually start the proxy worker. That's ok, as // RangeDescriptorDB methods don't require it to be running. @@ -256,8 +318,9 @@ func TestProxyRetriesUnreachable(t *testing.T) { gossipEventForNodeDesc(node2), } gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error { - assert.Len(t, req.Patterns, 1) + assert.Len(t, req.Patterns, 2) assert.Equal(t, "node:.*", req.Patterns[0]) + assert.Equal(t, "system-db", req.Patterns[1]) for _, event := range gossipSubEvents { if err := stream.Send(event); err != nil { return err @@ -278,8 +341,13 @@ func TestProxyRetriesUnreachable(t *testing.T) { }) // Add listen address into list of other bogus addresses. + cfg := kvtenant.ProxyConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOpts, + } addrs := []string{"1.1.1.1:9999", ln.Addr().String(), "2.2.2.2:9999"} - p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs) + p := NewProxy(cfg, addrs) p.rpcDialTimeout = 5 * time.Millisecond // speed up test // Start should block until the first GossipSubscription response. diff --git a/pkg/config/provider.go b/pkg/config/provider.go new file mode 100644 index 000000000000..c7012cab5f19 --- /dev/null +++ b/pkg/config/provider.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package config + +// SystemConfigProvider is capable of providing the SystemConfig, as well as +// notifying clients of updates to the SystemConfig. +type SystemConfigProvider interface { + // GetSystemConfig returns the local unmarshaled version of the system + // config. Returns nil if the system config hasn't been set yet. + GetSystemConfig() *SystemConfig + + // RegisterSystemConfigChannel registers a channel to signify updates for + // the system config. It is notified after registration (if a system config + // is already set), and whenever a new system config is successfully + // unmarshaled. + RegisterSystemConfigChannel() <-chan struct{} +} diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index a7115e0e0d66..a2df1dc62553 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -1120,15 +1120,6 @@ var Redundant redundantCallbacks // received. The callback method is invoked with the info key which // matched pattern. Returns a function to unregister the callback. func (g *Gossip) RegisterCallback(pattern string, method Callback, opts ...CallbackOption) func() { - if pattern == KeySystemConfig { - ctx := g.AnnotateCtx(context.TODO()) - log.Warningf( - ctx, - "raw gossip callback registered on %s, consider using RegisterSystemConfigChannel", - KeySystemConfig, - ) - } - g.mu.Lock() unregister := g.mu.is.registerCallback(pattern, method, opts...) g.mu.Unlock() @@ -1669,42 +1660,6 @@ type DeprecatedGossip struct { w errorutil.TenantSQLDeprecatedWrapper } -// Start calls .Start() on the underlying Gossip instance, which is assumed to -// be non-nil. -func (dg DeprecatedGossip) Start(advertAddr net.Addr, resolvers []resolver.Resolver) { - dg.w.Deprecated(0).(*Gossip).Start(advertAddr, resolvers) -} - -// deprecated trades a Github issue tracking the removal of the call for the -// wrapped Gossip instance. -func (dg DeprecatedGossip) deprecated(issueNo int) *Gossip { - // NB: some tests use a nil Gossip. - g, _ := dg.w.Deprecated(issueNo).(*Gossip) - return g -} - -// DeprecatedSystemConfig calls GetSystemConfig on the wrapped Gossip instance. -// -// Use of Gossip from within the SQL layer is **deprecated**. Please do not -// introduce new uses of it. -func (dg DeprecatedGossip) DeprecatedSystemConfig(issueNo int) *config.SystemConfig { - g := dg.deprecated(issueNo) - if g == nil { - return nil // a few unit tests - } - return g.GetSystemConfig() -} - -// DeprecatedRegisterSystemConfigChannel calls RegisterSystemConfigChannel on -// the wrapped Gossip instance. -// -// Use of Gossip from within the SQL layer is **deprecated**. Please do not -// introduce new uses of it. -func (dg DeprecatedGossip) DeprecatedRegisterSystemConfigChannel(issueNo int) <-chan struct{} { - g := dg.deprecated(issueNo) - return g.RegisterSystemConfigChannel() -} - // OptionalErr returns the Gossip instance if the wrapper was set up to allow // it. Otherwise, it returns an error referring to the optionally passed in // issues. diff --git a/pkg/kv/kvclient/kvtenant/proxy.go b/pkg/kv/kvclient/kvtenant/proxy.go index a82e1c22f9f0..52478a95a451 100644 --- a/pkg/kv/kvclient/kvtenant/proxy.go +++ b/pkg/kv/kvclient/kvtenant/proxy.go @@ -16,6 +16,8 @@ import ( "context" "net" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -47,12 +49,26 @@ type Proxy interface { // nodes while being subject to additional validation (e.g. is the Range being // requested owned by the requesting tenant?). kvcoord.RangeDescriptorDB + + // Proxy is capable of providing a filtered view of the SystemConfig + // containing only information applicable to secondary tenants. This + // obviates the need for SQL-only tenant processes to join the cluster-wide + // gossip network. + config.SystemConfigProvider +} + +// ProxyConfig encompasses the configuration required to create a Proxy. +type ProxyConfig struct { + AmbientCtx log.AmbientContext + RPCContext *rpc.Context + RPCRetryOptions retry.Options + DefaultZoneConfig *zonepb.ZoneConfig } // ProxyFactory constructs a new tenant proxy from the provide network addresses // pointing to KV nodes. type ProxyFactory interface { - NewProxy(_ log.AmbientContext, _ *rpc.Context, _ retry.Options, addrs []string) (Proxy, error) + NewProxy(cfg ProxyConfig, addrs []string) (Proxy, error) } // Factory is a hook for binaries that include CCL code to inject a ProxyFactory. @@ -60,9 +76,7 @@ var Factory ProxyFactory = requiresCCLBinaryFactory{} type requiresCCLBinaryFactory struct{} -func (requiresCCLBinaryFactory) NewProxy( - _ log.AmbientContext, _ *rpc.Context, _ retry.Options, _ []string, -) (Proxy, error) { +func (requiresCCLBinaryFactory) NewProxy(_ ProxyConfig, _ []string) (Proxy, error) { return nil, errors.Errorf(`tenant proxy requires a CCL binary`) } diff --git a/pkg/server/node.go b/pkg/server/node.go index 289ac79a9ca3..d36035a8b021 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1026,6 +1026,14 @@ func (n *Node) GossipSubscription( // and filter system config updates. Luckily, SystemConfigDeltaFilter // supports a "keyPrefix" that should help here. We'll also want to use // RegisterSystemConfigChannel for any SystemConfig patterns. + // + // UPDATE: the SystemConfig pattern story is even more complicated + // because of ZoneConfig inheritance/recursion. We'll also need to + // return the default zone config. In that case, it probably makes sense + // to perform the filtering here (based on whether a tenant marker is + // present in the ctx) without baking it into the protocol itself. So + // the request will simply specify "system-db" but we'll only return the + // subset of key/values that the tenant is allowed to / needs to see. callback := func(key string, content roachpb.Value) { callbackMu.Lock() diff --git a/pkg/server/server.go b/pkg/server/server.go index 7acfd3bf9433..9dc31cfcfea7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -607,6 +607,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { runtime: runtimeSampler, rpcContext: rpcContext, nodeDescs: g, + systemConfigProvider: g, nodeDialer: nodeDialer, distSender: distSender, db: db, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 975e016dad10..5f14d55fc8df 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/blobs/blobspb" + "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" @@ -147,6 +148,9 @@ type sqlServerArgs struct { // Used by DistSQLPlanner. nodeDescs kvcoord.NodeDescStore + // Used by the executor config. + systemConfigProvider config.SystemConfigProvider + // Used by DistSQLPlanner. nodeDialer *nodedialer.Dialer @@ -397,6 +401,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { AmbientCtx: cfg.AmbientCtx, DB: cfg.db, Gossip: cfg.gossip, + SystemConfig: cfg.systemConfigProvider, MetricsRecorder: cfg.recorder, DistSender: cfg.distSender, RPCContext: cfg.rpcContext, diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index b63f30503b9b..2f3a2322ed29 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" - "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -478,33 +477,13 @@ func makeSQLServerArgs( } rpcRetryOptions := base.DefaultRetryOptions() - // TODO(ajwerner): this use of Gossip needs to go. Tracked in: - // https://github.com/cockroachdb/cockroach/issues/47150 - var g *gossip.Gossip - { - var nodeID base.NodeIDContainer - nodeID.Set(context.Background(), fakeNodeID) - var clusterID base.ClusterIDContainer - dummyGossipGRPC := rpc.NewServer(rpcContext) // never Serve()s anything - g = gossip.New( - baseCfg.AmbientCtx, - &clusterID, - &nodeID, - rpcContext, - dummyGossipGRPC, - stopper, - registry, - baseCfg.Locality, - &baseCfg.DefaultZoneConfig, - ) - } - - tenantProxy, err := kvtenant.Factory.NewProxy( - baseCfg.AmbientCtx, - rpcContext, - rpcRetryOptions, - sqlCfg.TenantKVAddrs, - ) + tpCfg := kvtenant.ProxyConfig{ + AmbientCtx: baseCfg.AmbientCtx, + RPCContext: rpcContext, + RPCRetryOptions: rpcRetryOptions, + DefaultZoneConfig: &baseCfg.DefaultZoneConfig, + } + tenantProxy, err := kvtenant.Factory.NewProxy(tpCfg, sqlCfg.TenantKVAddrs) if err != nil { return sqlServerArgs{}, err } @@ -573,12 +552,11 @@ func makeSQLServerArgs( // server to register against (but they'll never get RPCs at the time of // writing): the blob service and DistSQL. dummyRPCServer := rpc.NewServer(rpcContext) - noStatusServer := serverpb.MakeOptionalStatusServer(nil) return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ - statusServer: noStatusServer, + statusServer: serverpb.MakeOptionalStatusServer(nil), nodeLiveness: sqlbase.MakeOptionalNodeLiveness(nil), - gossip: gossip.MakeUnexposedGossip(g), + gossip: gossip.MakeUnexposedGossip(nil), grpcServer: dummyRPCServer, recorder: dummyRecorder, isMeta1Leaseholder: func(_ context.Context, timestamp hlc.Timestamp) (bool, error) { @@ -603,6 +581,7 @@ func makeSQLServerArgs( runtime: status.NewRuntimeStatSampler(context.Background(), clock), rpcContext: rpcContext, nodeDescs: tenantProxy, + systemConfigProvider: tenantProxy, nodeDialer: nodeDialer, distSender: ds, db: db, @@ -693,22 +672,6 @@ func StartTenant( ) orphanedLeasesTimeThresholdNanos := args.clock.Now().WallTime - // TODO(ajwerner): this use of Gossip needs to go. Tracked in: - // https://github.com/cockroachdb/cockroach/issues/47150 - { - rs := make([]resolver.Resolver, len(sqlCfg.TenantKVAddrs)) - for i := range rs { - var err error - rs[i], err = resolver.NewResolver(sqlCfg.TenantKVAddrs[i]) - if err != nil { - return "", err - } - } - // NB: gossip server is not bound to any address, so the advertise addr does - // not matter. - args.gossip.Start(pgL.Addr(), rs) - } - if err := s.start(ctx, args.stopper, args.TestingKnobs, diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 39af07422f01..4f973cd03358 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -325,12 +325,12 @@ func makeMetrics(internal bool) Metrics { // Start starts the Server's background processing. func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { if s.cfg.Codec.ForSystemTenant() { - gossipUpdateC := s.cfg.Gossip.DeprecatedRegisterSystemConfigChannel(47150) + gossipUpdateC := s.cfg.SystemConfig.RegisterSystemConfigChannel() stopper.RunWorker(ctx, func(ctx context.Context) { for { select { case <-gossipUpdateC: - sysCfg := s.cfg.Gossip.DeprecatedSystemConfig(47150) + sysCfg := s.cfg.SystemConfig.GetSystemConfig() s.dbCache.updateSystemConfig(sysCfg) case <-stopper.ShouldStop(): return diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fbdc0d18ccf1..11f9f9525aab 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -653,6 +653,7 @@ type ExecutorConfig struct { AmbientCtx log.AmbientContext DB *kv.DB Gossip gossip.DeprecatedGossip + SystemConfig config.SystemConfigProvider DistSender *kvcoord.DistSender RPCContext *rpc.Context LeaseManager *lease.Manager diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 4ba924e25dce..4f1798a8326b 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -126,7 +126,7 @@ func (r schemaChangeGCResumer) Resume( // TTL whenever we get an update on one of the tables/indexes (or the db) // that this job is responsible for, and computing the earliest deadline // from our set of cached TTL values. - cfg := execCfg.Gossip.DeprecatedSystemConfig(47150) + cfg := execCfg.SystemConfig.GetSystemConfig() zoneConfigUpdated := false zoneCfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) { zoneConfigUpdated = true diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index f7e4bdcfe309..077b82d9622c 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -81,7 +81,7 @@ func updateStatusForGCElements( progress *jobspb.SchemaChangeGCProgress, ) (expired bool, timeToNextTrigger time.Time) { defTTL := execCfg.DefaultZoneConfig.GC.TTLSeconds - cfg := execCfg.Gossip.DeprecatedSystemConfig(47150) + cfg := execCfg.SystemConfig.GetSystemConfig() protectedtsCache := execCfg.ProtectedTimestampProvider earliestDeadline := timeutil.Unix(0, int64(math.MaxInt64)) @@ -270,8 +270,8 @@ func isProtected( func setupConfigWatcher( execCfg *sql.ExecutorConfig, ) (gossip.SystemConfigDeltaFilter, <-chan struct{}) { - k := execCfg.Codec.IndexPrefix(uint32(keys.ZonesTableID), uint32(keys.ZonesTablePrimaryIndexID)) + k := execCfg.Codec.IndexPrefix(keys.ZonesTableID, keys.ZonesTablePrimaryIndexID) zoneCfgFilter := gossip.MakeSystemConfigDeltaFilter(k) - gossipUpdateC := execCfg.Gossip.DeprecatedRegisterSystemConfigChannel(47150) + gossipUpdateC := execCfg.SystemConfig.RegisterSystemConfigChannel() return zoneCfgFilter, gossipUpdateC } diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 7e241ee1162a..dd9115c590fc 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -76,7 +76,7 @@ func (oc *optCatalog) reset() { oc.dataSources = make(map[*sqlbase.ImmutableTableDescriptor]cat.DataSource) } - oc.cfg = oc.planner.execCfg.Gossip.DeprecatedSystemConfig(47150) + oc.cfg = oc.planner.execCfg.SystemConfig.GetSystemConfig() } // optSchema represents the parent database and schema for an object. It