Skip to content

Commit

Permalink
kvtenant: implement SystemConfigProvider, remove Gossip from SQL-only…
Browse files Browse the repository at this point in the history
… server

Fixes cockroachdb#49445.

This commit introduces a new SystemConfigProvider abstraction, which is
capable of providing the SystemConfig, as well as notifying clients of
updates to the SystemConfig. Gossip already implements this interface.
The commit then updates SQL to use this new dependency in place of
Gossip whenever it needs to access the SystemConfig.

After making this change, it then updates the kvtenant.Proxy to
implement the new SystemConfigProvider interface. This is powered by the
GossipSubscription RPC that was added in cockroachdb#50520. The commit updates the
subscription to also match on the "system-db" gossip key, and just like
that, it can provide the SystemConfig to SQL [*].

Finally, with the kvtenant.Proxy serving the role of a
SystemConfigProvider to SQL when applicable, we're able to remove gossip
entirely from StartTenant. SQL-only servers will no longer join the
gossip network, which is a nice milestone for all of this work.

[*] there are a few remaining questions about how exactly we want to
enforce an access control policy on the system config gossip pattern.
See the updated comment in `Node.GossipSubscription`. For now, we're
just returning the entire SystemConfig to the subscription.
  • Loading branch information
nvanbenschoten committed Jul 28, 2020
1 parent 5c97350 commit ceb331d
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 134 deletions.
114 changes: 89 additions & 25 deletions pkg/ccl/kvccl/kvtenantccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"context"
"io"
"math/rand"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
Expand Down Expand Up @@ -49,12 +52,17 @@ type Proxy struct {
rpcRetryOptions retry.Options
rpcDialTimeout time.Duration // for testing
rpcDial singleflight.Group
defaultZoneCfg *zonepb.ZoneConfig
addrs []string
startupC chan struct{}

mu syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
mu struct {
syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
}
}

// Proxy is capable of providing information on each of the KV nodes in the
Expand All @@ -70,28 +78,29 @@ var _ kvcoord.NodeDescStore = (*Proxy)(nil)
// requested owned by the requesting tenant?).
var _ kvcoord.RangeDescriptorDB = (*Proxy)(nil)

// Proxy is capable of providing a filtered view of the SystemConfig containing
// only information applicable to secondary tenants. This obviates the need for
// SQL-only tenant processes to join the cluster-wide gossip network.
var _ config.SystemConfigProvider = (*Proxy)(nil)

// NewProxy creates a new Proxy.
func NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) *Proxy {
ac.AddLogTag("tenant-proxy", nil)
func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy {
cfg.AmbientCtx.AddLogTag("tenant-proxy", nil)
return &Proxy{
AmbientContext: ac,
rpcContext: rpcContext,
rpcRetryOptions: rpcRetryOptions,
AmbientContext: cfg.AmbientCtx,
rpcContext: cfg.RPCContext,
rpcRetryOptions: cfg.RPCRetryOptions,
defaultZoneCfg: cfg.DefaultZoneConfig,
addrs: addrs,
startupC: make(chan struct{}),
nodeDescs: make(map[roachpb.NodeID]*roachpb.NodeDescriptor),
}
}

// proxyFactory implements kvtenant.ProxyFactory.
type proxyFactory struct{}

func (proxyFactory) NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) (kvtenant.Proxy, error) {
return NewProxy(ac, rpcContext, rpcRetryOptions, addrs), nil
func (proxyFactory) NewProxy(cfg kvtenant.ProxyConfig, addrs []string) (kvtenant.Proxy, error) {
return NewProxy(cfg, addrs), nil
}

// Start launches the proxy's worker thread and waits for it to receive an
Expand Down Expand Up @@ -158,16 +167,18 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) {
}

var gossipSubsHandlers = map[string]func(*Proxy, context.Context, string, roachpb.Value){
// Subscribe to all *NodeDescriptor updates in the gossip network.
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Proxy).updateNodeAddress,
// TODO(nvanbenschoten): subscribe to updates to the tenant zones key.
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Proxy).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
patterns := make([]string, 0, len(gossipSubsHandlers))
for pattern := range gossipSubsHandlers {
patterns = append(patterns, pattern)
}
sort.Strings(patterns)
return patterns
}()

Expand All @@ -176,7 +187,7 @@ var gossipSubsPatterns = func() []string {
func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) {
desc := new(roachpb.NodeDescriptor)
if err := content.GetProto(desc); err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "could not unmarshal node descriptor: %v", err)
return
}

Expand All @@ -188,20 +199,73 @@ func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roach
// nothing ever removes them from Gossip.nodeDescs. Fix this.
p.mu.Lock()
defer p.mu.Unlock()
p.nodeDescs[desc.NodeID] = desc
if p.mu.nodeDescs == nil {
p.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
}
p.mu.nodeDescs[desc.NodeID] = desc
}

// GetNodeDescriptor implements the kvcoord.NodeDescStore interface.
func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
p.mu.RLock()
defer p.mu.RUnlock()
desc, ok := p.nodeDescs[nodeID]
desc, ok := p.mu.nodeDescs[nodeID]
if !ok {
return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID)
}
return desc, nil
}

// updateSystemConfig handles updates to a filtered view of the "system-db"
// gossip key, performing the corresponding update to the Proxy's cached
// SystemConfig.
func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) {
cfg := config.NewSystemConfig(p.defaultZoneCfg)
if err := content.GetProto(&cfg.SystemConfigEntries); err != nil {
log.Errorf(ctx, "could not unmarshal system config: %v", err)
return
}

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfig = cfg
for _, c := range p.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
}
}
}

// GetSystemConfig implements the config.SystemConfigProvider interface.
func (p *Proxy) GetSystemConfig() *config.SystemConfig {
// TODO DURING REVIEW: how does this work with Gossip when the SystemConfig
// hasn't reached a newly-started node? The comment on Gossip says that it
// returns nil, but then do we wait for this to be non-nil somewhere? Is SQL
// really able to handle a nil SystemConfig on all paths? I doubt that.
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.systemConfig
}

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (p *Proxy) RegisterSystemConfigChannel() <-chan struct{} {
// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent proxy from blocking on it.
c := make(chan struct{}, 1)

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfigChannels = append(p.mu.systemConfigChannels, c)

// Notify the channel right away if we have a config.
if p.mu.systemConfig != nil {
c <- struct{}{}
}
return c
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
func (p *Proxy) RangeLookup(
ctx context.Context, key roachpb.RKey, useReverseScan bool,
Expand Down Expand Up @@ -256,7 +320,7 @@ func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) {
// context is canceled.
func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
p.mu.RLock()
if c := p.client; c != nil {
if c := p.mu.client; c != nil {
p.mu.RUnlock()
return c, nil
}
Expand All @@ -269,7 +333,7 @@ func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
return nil, err
}
// NB: read lock not needed.
return p.client, nil
return p.mu.client, nil
})
p.mu.RUnlock()

Expand Down Expand Up @@ -299,7 +363,7 @@ func (p *Proxy) dialAddrs(ctx context.Context) error {
}
client := roachpb.NewInternalClient(conn)
p.mu.Lock()
p.client = client
p.mu.client = client
p.mu.Unlock()
return nil
}
Expand All @@ -326,7 +390,7 @@ func (p *Proxy) tryForgetClient(ctx context.Context, c roachpb.InternalClient) {
// Compare-and-swap to avoid thrashing.
p.mu.Lock()
defer p.mu.Unlock()
if p.client == c {
p.client = nil
if p.mu.client == c {
p.mu.client = nil
}
}
80 changes: 74 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -73,6 +75,18 @@ func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscri
}
}

func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.GossipSubscriptionEvent {
val, err := protoutil.Marshal(cfg)
if err != nil {
panic(err)
}
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeySystemConfig,
Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}),
PatternMatched: gossip.KeySystemConfig,
}
}

func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
Expand All @@ -81,7 +95,8 @@ func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) {
})
}

// TestProxyGossipSubscription tests Proxy's role as a kvcoord.NodeDescStore.
// TestProxyGossipSubscription tests Proxy's roles as a kvcoord.NodeDescStore
// and as a config.SystemConfigProvider.
func TestProxyGossipSubscription(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -95,8 +110,9 @@ func TestProxyGossipSubscription(t *testing.T) {
gossipSubC := make(chan *roachpb.GossipSubscriptionEvent)
defer close(gossipSubC)
gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error {
assert.Len(t, req.Patterns, 1)
assert.Len(t, req.Patterns, 2)
assert.Equal(t, "node:.*", req.Patterns[0])
assert.Equal(t, "system-db", req.Patterns[1])
for gossipSub := range gossipSubC {
if err := stream.Send(gossipSub); err != nil {
return err
Expand All @@ -108,8 +124,13 @@ func TestProxyGossipSubscription(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
require.NoError(t, err)

cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{ln.Addr().String()}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)

// Start should block until the first GossipSubscription response.
startedC := make(chan error)
Expand Down Expand Up @@ -158,6 +179,42 @@ func TestProxyGossipSubscription(t *testing.T) {
desc, err = p.GetNodeDescriptor(3)
require.Equal(t, node3, desc)
require.NoError(t, err)

// Test config.SystemConfigProvider impl. Should not have a SystemConfig yet.
sysCfg := p.GetSystemConfig()
require.Nil(t, sysCfg)
sysCfgC := p.RegisterSystemConfigChannel()
require.Len(t, sysCfgC, 0)

// Return first SystemConfig response.
sysCfgEntries := &config.SystemConfigEntries{Values: []roachpb.KeyValue{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("b")},
}}
gossipSubC <- gossipEventForSystemConfig(sysCfgEntries)

// Test config.SystemConfigProvider impl. Wait for update first.
<-sysCfgC
sysCfg = p.GetSystemConfig()
require.NotNil(t, sysCfg)
require.Equal(t, sysCfgEntries.Values, sysCfg.Values)

// Return updated SystemConfig response.
sysCfgEntriesUp := &config.SystemConfigEntries{Values: []roachpb.KeyValue{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("c")},
}}
gossipSubC <- gossipEventForSystemConfig(sysCfgEntriesUp)

// Test config.SystemConfigProvider impl. Wait for update first.
<-sysCfgC
sysCfg = p.GetSystemConfig()
require.NotNil(t, sysCfg)
require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values)

// A newly registered SystemConfig channel will be immediately notified.
sysCfgC2 := p.RegisterSystemConfigChannel()
require.Len(t, sysCfgC2, 1)
}

// TestProxyGossipSubscription tests Proxy's role as a kvcoord.RangeDescriptorDB.
Expand Down Expand Up @@ -187,8 +244,13 @@ func TestProxyRangeLookup(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
require.NoError(t, err)

cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{ln.Addr().String()}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)
// NOTE: we don't actually start the proxy worker. That's ok, as
// RangeDescriptorDB methods don't require it to be running.

Expand Down Expand Up @@ -256,8 +318,9 @@ func TestProxyRetriesUnreachable(t *testing.T) {
gossipEventForNodeDesc(node2),
}
gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error {
assert.Len(t, req.Patterns, 1)
assert.Len(t, req.Patterns, 2)
assert.Equal(t, "node:.*", req.Patterns[0])
assert.Equal(t, "system-db", req.Patterns[1])
for _, event := range gossipSubEvents {
if err := stream.Send(event); err != nil {
return err
Expand All @@ -278,8 +341,13 @@ func TestProxyRetriesUnreachable(t *testing.T) {
})

// Add listen address into list of other bogus addresses.
cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{"1.1.1.1:9999", ln.Addr().String(), "2.2.2.2:9999"}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)
p.rpcDialTimeout = 5 * time.Millisecond // speed up test

// Start should block until the first GossipSubscription response.
Expand Down
25 changes: 25 additions & 0 deletions pkg/config/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package config

// SystemConfigProvider is capable of providing the SystemConfig, as well as
// notifying clients of updates to the SystemConfig.
type SystemConfigProvider interface {
// GetSystemConfig returns the local unmarshaled version of the system
// config. Returns nil if the system config hasn't been set yet.
GetSystemConfig() *SystemConfig

// RegisterSystemConfigChannel registers a channel to signify updates for
// the system config. It is notified after registration (if a system config
// is already set), and whenever a new system config is successfully
// unmarshaled.
RegisterSystemConfigChannel() <-chan struct{}
}
Loading

0 comments on commit ceb331d

Please sign in to comment.