Skip to content

Commit

Permalink
kvtenant: implement SystemConfigProvider, remove Gossip from SQL-only…
Browse files Browse the repository at this point in the history
… server

Fixes cockroachdb#49445.

This commit introduces a new SystemConfigProvider abstraction, which is
capable of providing the SystemConfig, as well as notifying clients of
updates to the SystemConfig. Gossip already implements this interface.
The commit then updates SQL to use this new dependency in place of
Gossip whenever it needs to access the SystemConfig.

After making this change, it then updates the kvtenant.Proxy to
implement the new SystemConfigProvider interface. This is powered by the
GossipSubscription RPC that was added in cockroachdb#50520. The commit updates the
subscription to also match on the "system-db" gossip key, and just like
that, it can provide the SystemConfig to SQL [*].

Finally, with the kvtenant.Proxy serving the role of a
SystemConfigProvider to SQL when applicable, we're able to remove gossip
entirely from StartTenant. SQL-only servers will no longer join the
gossip network, which is a nice milestone for all of this work.

[*] there are a few remaining questions about how exactly we want to
enforce an access control policy on the system config gossip pattern.
See the updated comment in `Node.GossipSubscription`. For now, we're
just returning the entire SystemConfig to the subscription.
  • Loading branch information
nvanbenschoten committed Aug 3, 2020
1 parent c4e1643 commit 8759499
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 135 deletions.
113 changes: 88 additions & 25 deletions pkg/ccl/kvccl/kvtenantccl/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"context"
"io"
"math/rand"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
Expand Down Expand Up @@ -49,12 +52,17 @@ type Proxy struct {
rpcRetryOptions retry.Options
rpcDialTimeout time.Duration // for testing
rpcDial singleflight.Group
defaultZoneCfg *zonepb.ZoneConfig
addrs []string
startupC chan struct{}

mu syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
mu struct {
syncutil.RWMutex
client roachpb.InternalClient
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
}
}

// Proxy is capable of providing information on each of the KV nodes in the
Expand All @@ -70,28 +78,29 @@ var _ kvcoord.NodeDescStore = (*Proxy)(nil)
// requested owned by the requesting tenant?).
var _ kvcoord.RangeDescriptorDB = (*Proxy)(nil)

// Proxy is capable of providing a filtered view of the SystemConfig containing
// only information applicable to secondary tenants. This obviates the need for
// SQL-only tenant processes to join the cluster-wide gossip network.
var _ config.SystemConfigProvider = (*Proxy)(nil)

// NewProxy creates a new Proxy.
func NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) *Proxy {
ac.AddLogTag("tenant-proxy", nil)
func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy {
cfg.AmbientCtx.AddLogTag("tenant-proxy", nil)
return &Proxy{
AmbientContext: ac,
rpcContext: rpcContext,
rpcRetryOptions: rpcRetryOptions,
AmbientContext: cfg.AmbientCtx,
rpcContext: cfg.RPCContext,
rpcRetryOptions: cfg.RPCRetryOptions,
defaultZoneCfg: cfg.DefaultZoneConfig,
addrs: addrs,
startupC: make(chan struct{}),
nodeDescs: make(map[roachpb.NodeID]*roachpb.NodeDescriptor),
}
}

// proxyFactory implements kvtenant.ProxyFactory.
type proxyFactory struct{}

func (proxyFactory) NewProxy(
ac log.AmbientContext, rpcContext *rpc.Context, rpcRetryOptions retry.Options, addrs []string,
) (kvtenant.Proxy, error) {
return NewProxy(ac, rpcContext, rpcRetryOptions, addrs), nil
func (proxyFactory) NewProxy(cfg kvtenant.ProxyConfig, addrs []string) (kvtenant.Proxy, error) {
return NewProxy(cfg, addrs), nil
}

// Start launches the proxy's worker thread and waits for it to receive an
Expand Down Expand Up @@ -158,16 +167,18 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) {
}

var gossipSubsHandlers = map[string]func(*Proxy, context.Context, string, roachpb.Value){
// Subscribe to all *NodeDescriptor updates in the gossip network.
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Proxy).updateNodeAddress,
// TODO(nvanbenschoten): subscribe to updates to the tenant zones key.
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Proxy).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
patterns := make([]string, 0, len(gossipSubsHandlers))
for pattern := range gossipSubsHandlers {
patterns = append(patterns, pattern)
}
sort.Strings(patterns)
return patterns
}()

Expand All @@ -176,7 +187,7 @@ var gossipSubsPatterns = func() []string {
func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) {
desc := new(roachpb.NodeDescriptor)
if err := content.GetProto(desc); err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "could not unmarshal node descriptor: %v", err)
return
}

Expand All @@ -188,20 +199,72 @@ func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roach
// nothing ever removes them from Gossip.nodeDescs. Fix this.
p.mu.Lock()
defer p.mu.Unlock()
p.nodeDescs[desc.NodeID] = desc
if p.mu.nodeDescs == nil {
p.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
}
p.mu.nodeDescs[desc.NodeID] = desc
}

// GetNodeDescriptor implements the kvcoord.NodeDescStore interface.
func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) {
p.mu.RLock()
defer p.mu.RUnlock()
desc, ok := p.nodeDescs[nodeID]
desc, ok := p.mu.nodeDescs[nodeID]
if !ok {
return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID)
}
return desc, nil
}

// updateSystemConfig handles updates to a filtered view of the "system-db"
// gossip key, performing the corresponding update to the Proxy's cached
// SystemConfig.
func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) {
cfg := config.NewSystemConfig(p.defaultZoneCfg)
if err := content.GetProto(&cfg.SystemConfigEntries); err != nil {
log.Errorf(ctx, "could not unmarshal system config: %v", err)
return
}

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfig = cfg
for _, c := range p.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
}
}
}

// GetSystemConfig implements the config.SystemConfigProvider interface.
func (p *Proxy) GetSystemConfig() *config.SystemConfig {
// TODO(nvanbenschoten): we need to wait in `(*Proxy).Start()` until the
// system config is populated. As is, there's a small chance that we return
// nil, which SQL does not handle.
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.systemConfig
}

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (p *Proxy) RegisterSystemConfigChannel() <-chan struct{} {
// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent proxy from having to block on it.
c := make(chan struct{}, 1)

p.mu.Lock()
defer p.mu.Unlock()
p.mu.systemConfigChannels = append(p.mu.systemConfigChannels, c)

// Notify the channel right away if we have a config.
if p.mu.systemConfig != nil {
c <- struct{}{}
}
return c
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
func (p *Proxy) RangeLookup(
ctx context.Context, key roachpb.RKey, useReverseScan bool,
Expand Down Expand Up @@ -256,7 +319,7 @@ func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) {
// context is canceled.
func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
p.mu.RLock()
if c := p.client; c != nil {
if c := p.mu.client; c != nil {
p.mu.RUnlock()
return c, nil
}
Expand All @@ -269,7 +332,7 @@ func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) {
return nil, err
}
// NB: read lock not needed.
return p.client, nil
return p.mu.client, nil
})
p.mu.RUnlock()

Expand Down Expand Up @@ -299,7 +362,7 @@ func (p *Proxy) dialAddrs(ctx context.Context) error {
}
client := roachpb.NewInternalClient(conn)
p.mu.Lock()
p.client = client
p.mu.client = client
p.mu.Unlock()
return nil
}
Expand All @@ -326,7 +389,7 @@ func (p *Proxy) tryForgetClient(ctx context.Context, c roachpb.InternalClient) {
// Compare-and-swap to avoid thrashing.
p.mu.Lock()
defer p.mu.Unlock()
if p.client == c {
p.client = nil
if p.mu.client == c {
p.mu.client = nil
}
}
80 changes: 74 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -73,6 +75,18 @@ func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscri
}
}

func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.GossipSubscriptionEvent {
val, err := protoutil.Marshal(cfg)
if err != nil {
panic(err)
}
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeySystemConfig,
Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}),
PatternMatched: gossip.KeySystemConfig,
}
}

func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
Expand All @@ -81,7 +95,8 @@ func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) {
})
}

// TestProxyGossipSubscription tests Proxy's role as a kvcoord.NodeDescStore.
// TestProxyGossipSubscription tests Proxy's roles as a kvcoord.NodeDescStore
// and as a config.SystemConfigProvider.
func TestProxyGossipSubscription(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -95,8 +110,9 @@ func TestProxyGossipSubscription(t *testing.T) {
gossipSubC := make(chan *roachpb.GossipSubscriptionEvent)
defer close(gossipSubC)
gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error {
assert.Len(t, req.Patterns, 1)
assert.Len(t, req.Patterns, 2)
assert.Equal(t, "node:.*", req.Patterns[0])
assert.Equal(t, "system-db", req.Patterns[1])
for gossipSub := range gossipSubC {
if err := stream.Send(gossipSub); err != nil {
return err
Expand All @@ -108,8 +124,13 @@ func TestProxyGossipSubscription(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
require.NoError(t, err)

cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{ln.Addr().String()}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)

// Start should block until the first GossipSubscription response.
startedC := make(chan error)
Expand Down Expand Up @@ -158,6 +179,42 @@ func TestProxyGossipSubscription(t *testing.T) {
desc, err = p.GetNodeDescriptor(3)
require.Equal(t, node3, desc)
require.NoError(t, err)

// Test config.SystemConfigProvider impl. Should not have a SystemConfig yet.
sysCfg := p.GetSystemConfig()
require.Nil(t, sysCfg)
sysCfgC := p.RegisterSystemConfigChannel()
require.Len(t, sysCfgC, 0)

// Return first SystemConfig response.
sysCfgEntries := &config.SystemConfigEntries{Values: []roachpb.KeyValue{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("b")},
}}
gossipSubC <- gossipEventForSystemConfig(sysCfgEntries)

// Test config.SystemConfigProvider impl. Wait for update first.
<-sysCfgC
sysCfg = p.GetSystemConfig()
require.NotNil(t, sysCfg)
require.Equal(t, sysCfgEntries.Values, sysCfg.Values)

// Return updated SystemConfig response.
sysCfgEntriesUp := &config.SystemConfigEntries{Values: []roachpb.KeyValue{
{Key: roachpb.Key("a")},
{Key: roachpb.Key("c")},
}}
gossipSubC <- gossipEventForSystemConfig(sysCfgEntriesUp)

// Test config.SystemConfigProvider impl. Wait for update first.
<-sysCfgC
sysCfg = p.GetSystemConfig()
require.NotNil(t, sysCfg)
require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values)

// A newly registered SystemConfig channel will be immediately notified.
sysCfgC2 := p.RegisterSystemConfigChannel()
require.Len(t, sysCfgC2, 1)
}

// TestProxyGossipSubscription tests Proxy's role as a kvcoord.RangeDescriptorDB.
Expand Down Expand Up @@ -187,8 +244,13 @@ func TestProxyRangeLookup(t *testing.T) {
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
require.NoError(t, err)

cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{ln.Addr().String()}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)
// NOTE: we don't actually start the proxy worker. That's ok, as
// RangeDescriptorDB methods don't require it to be running.

Expand Down Expand Up @@ -256,8 +318,9 @@ func TestProxyRetriesUnreachable(t *testing.T) {
gossipEventForNodeDesc(node2),
}
gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error {
assert.Len(t, req.Patterns, 1)
assert.Len(t, req.Patterns, 2)
assert.Equal(t, "node:.*", req.Patterns[0])
assert.Equal(t, "system-db", req.Patterns[1])
for _, event := range gossipSubEvents {
if err := stream.Send(event); err != nil {
return err
Expand All @@ -278,8 +341,13 @@ func TestProxyRetriesUnreachable(t *testing.T) {
})

// Add listen address into list of other bogus addresses.
cfg := kvtenant.ProxyConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
RPCContext: rpcContext,
RPCRetryOptions: rpcRetryOpts,
}
addrs := []string{"1.1.1.1:9999", ln.Addr().String(), "2.2.2.2:9999"}
p := NewProxy(log.AmbientContext{Tracer: tracing.NewTracer()}, rpcContext, rpcRetryOpts, addrs)
p := NewProxy(cfg, addrs)
p.rpcDialTimeout = 5 * time.Millisecond // speed up test

// Start should block until the first GossipSubscription response.
Expand Down
Loading

0 comments on commit 8759499

Please sign in to comment.