Skip to content

Commit

Permalink
Merge #52034
Browse files Browse the repository at this point in the history
52034: kvtenant: implement SystemConfigProvider, remove Gossip from SQL-only server r=nvanbenschoten a=nvanbenschoten

Fixes #49445.

This commit introduces a new SystemConfigProvider abstraction, which is capable of providing the SystemConfig, as well as notifying clients of updates to the SystemConfig. Gossip already implements this interface. The commit then updates SQL to use this new dependency in place of Gossip whenever it needs to access the SystemConfig.

After making this change, it then updates the kvtenant.Proxy to implement the new SystemConfigProvider interface. This is powered by the GossipSubscription RPC that was added in #50520. The commit updates the subscription to also match on the "system-db" gossip key, and just like that, it can provide the SystemConfig to SQL [*].

Finally, with the kvtenant.Proxy serving the role of a SystemConfigProvider to SQL when applicable, we're able to remove gossip entirely from StartTenant. SQL-only servers will no longer join the gossip network, which is a nice milestone for all of this work.

[*] there are a few remaining questions about how exactly we want to enforce an access control policy on the system config gossip pattern. See the updated comment in `Node.GossipSubscription`. For now, we're just returning the entire SystemConfig to the subscription.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 3, 2020
2 parents b6cdf0f + 6c1a90b commit 470510e
Show file tree
Hide file tree
Showing 31 changed files with 304 additions and 207 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func splitAndFilterSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
g, err := gw.OptionalErr(47970)
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func createBenchmarkChangefeed(
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Sink: buf,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Config struct {
Settings *cluster.Settings
DB *kv.DB
Clock *hlc.Clock
Gossip gossip.DeprecatedGossip
Gossip gossip.OptionalGossip
Spans []roachpb.Span
Targets jobspb.ChangefeedTargets
Sink EventBufferWriter
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type kvScanner interface {

type scanRequestScanner struct {
settings *cluster.Settings
gossip gossip.DeprecatedGossip
gossip gossip.OptionalGossip
db *kv.DB
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescr
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
Expand Down
113 changes: 88 additions & 25 deletions pkg/ccl/kvccl/kvtenantccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"context"
"io"
"math/rand"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
Expand Down Expand Up @@ -49,12 +52,17 @@ type Proxy struct {
rpcRetryOptions retry.Options
rpcDialTimeout time.Duration // for testing
rpcDial singleflight.Group
defaultZoneCfg *zonepb.ZoneConfig
addrs []string
startupC chan struct{}

mu syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
mu struct {
syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
}
}

// Proxy is capable of providing information on each of the KV nodes in the
Expand All @@ -70,28 +78,29 @@ var _ kvcoord.NodeDescStore = (*Proxy)(nil)
// requested owned by the requesting tenant?).
var _ kvcoord.RangeDescriptorDB = (*Proxy)(nil)

// Proxy is capable of providing a filtered view of the SystemConfig containing
// only information applicable to secondary tenants. This obviates the need for
// SQL-only tenant processes to join the cluster-wide gossip network.
var _ config.SystemConfigProvider = (*Proxy)(nil)

// NewProxy creates a new Proxy.
func NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) *Proxy {
ac.AddLogTag("tenant-proxy", nil)
func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy {
cfg.AmbientCtx.AddLogTag("tenant-proxy", nil)
return &Proxy{
AmbientContext: ac,
rpcContext: rpcContext,
rpcRetryOptions: rpcRetryOptions,
AmbientContext: cfg.AmbientCtx,
rpcContext: cfg.RPCContext,
rpcRetryOptions: cfg.RPCRetryOptions,
defaultZoneCfg: cfg.DefaultZoneConfig,
addrs: addrs,
startupC: make(chan struct{}),
nodeDescs: make(map[roachpb.NodeID]*roachpb.NodeDescriptor),
}
}

// proxyFactory implements kvtenant.ProxyFactory.
type proxyFactory struct{}

func (proxyFactory) NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) (kvtenant.Proxy, error) {
return NewProxy(ac, rpcContext, rpcRetryOptions, addrs), nil
func (proxyFactory) NewProxy(cfg kvtenant.ProxyConfig, addrs []string) (kvtenant.Proxy, error) {
return NewProxy(cfg, addrs), nil
}

// Start launches the proxy's worker thread and waits for it to receive an
Expand Down Expand Up @@ -158,16 +167,18 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) {
}

var gossipSubsHandlers = map[string]func(*Proxy, context.Context, string, roachpb.Value){
// Subscribe to all *NodeDescriptor updates in the gossip network.
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Proxy).updateNodeAddress,
// TODO(nvanbenschoten): subscribe to updates to the tenant zones key.
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Proxy).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
patterns := make([]string, 0, len(gossipSubsHandlers))
for pattern := range gossipSubsHandlers {
patterns = append(patterns, pattern)
}
sort.Strings(patterns)
return patterns
}()

Expand All @@ -176,7 +187,7 @@ var gossipSubsPatterns = func() []string {
func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) {
desc := new(roachpb.NodeDescriptor)
if err := content.GetProto(desc); err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "could not unmarshal node descriptor: %v", err)
return
}

Expand All @@ -188,20 +199,72 @@ func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roach
// nothing ever removes them from Gossip.nodeDescs. Fix this.
p.mu.Lock()
defer p.mu.Unlock()
p.nodeDescs[desc.NodeID] = desc
if p.mu.nodeDescs == nil {
p.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
}
p.mu.nodeDescs[desc.NodeID] = desc
}

// GetNodeDescriptor implements the kvcoord.NodeDescStore interface.
func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
p.mu.RLock()
defer p.mu.RUnlock()
desc, ok := p.nodeDescs[nodeID]
desc, ok := p.mu.nodeDescs[nodeID]
if !ok {
return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID)
}
return desc, nil
}

// updateSystemConfig handles updates to a filtered view of the "system-db"
// gossip key, performing the corresponding update to the Proxy's cached
// SystemConfig.
func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) {
cfg := config.NewSystemConfig(p.defaultZoneCfg)
if err := content.GetProto(&cfg.SystemConfigEntries); err != nil {
log.Errorf(ctx, "could not unmarshal system config: %v", err)
return
}

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfig = cfg
for _, c := range p.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
}
}
}

// GetSystemConfig implements the config.SystemConfigProvider interface.
func (p *Proxy) GetSystemConfig() *config.SystemConfig {
// TODO(nvanbenschoten): we need to wait in `(*Proxy).Start()` until the
// system config is populated. As is, there's a small chance that we return
// nil, which SQL does not handle.
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.systemConfig
}

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (p *Proxy) RegisterSystemConfigChannel() <-chan struct{} {
// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent proxy from having to block on it.
c := make(chan struct{}, 1)

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfigChannels = append(p.mu.systemConfigChannels, c)

// Notify the channel right away if we have a config.
if p.mu.systemConfig != nil {
c <- struct{}{}
}
return c
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
func (p *Proxy) RangeLookup(
ctx context.Context, key roachpb.RKey, useReverseScan bool,
Expand Down Expand Up @@ -256,7 +319,7 @@ func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) {
// context is canceled.
func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
p.mu.RLock()
if c := p.client; c != nil {
if c := p.mu.client; c != nil {
p.mu.RUnlock()
return c, nil
}
Expand All @@ -269,7 +332,7 @@ func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
return nil, err
}
// NB: read lock not needed.
return p.client, nil
return p.mu.client, nil
})
p.mu.RUnlock()

Expand Down Expand Up @@ -299,7 +362,7 @@ func (p *Proxy) dialAddrs(ctx context.Context) error {
}
client := roachpb.NewInternalClient(conn)
p.mu.Lock()
p.client = client
p.mu.client = client
p.mu.Unlock()
return nil
}
Expand All @@ -326,7 +389,7 @@ func (p *Proxy) tryForgetClient(ctx context.Context, c roachpb.InternalClient) {
// Compare-and-swap to avoid thrashing.
p.mu.Lock()
defer p.mu.Unlock()
if p.client == c {
p.client = nil
if p.mu.client == c {
p.mu.client = nil
}
}
Loading

0 comments on commit 470510e

Please sign in to comment.