From 5416c9c6b94291ff0fd6308fa9586b8cefd820ef Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 10 Aug 2020 17:42:07 -0400 Subject: [PATCH] kv/kvtenant: rename Proxy to Connector See discussion in #50503. We found "proxy" confusing, because the object was in the same process as the rest of the SQL-only process instead of run as a stand-alone component. "Connector" doesn't create the same confusion. --- .../kvtenantccl/{proxy.go => connector.go} | 230 +++++++++--------- .../{proxy_test.go => connector_test.go} | 87 +++---- pkg/kv/kvclient/kvtenant/connector.go | 96 ++++++++ pkg/kv/kvclient/kvtenant/proxy.go | 94 ------- pkg/server/server_sql.go | 10 +- pkg/server/testserver.go | 16 +- 6 files changed, 270 insertions(+), 263 deletions(-) rename pkg/ccl/kvccl/kvtenantccl/{proxy.go => connector.go} (61%) rename pkg/ccl/kvccl/kvtenantccl/{proxy_test.go => connector_test.go} (85%) create mode 100644 pkg/kv/kvclient/kvtenant/connector.go delete mode 100644 pkg/kv/kvclient/kvtenant/proxy.go diff --git a/pkg/ccl/kvccl/kvtenantccl/proxy.go b/pkg/ccl/kvccl/kvtenantccl/connector.go similarity index 61% rename from pkg/ccl/kvccl/kvtenantccl/proxy.go rename to pkg/ccl/kvccl/kvtenantccl/connector.go index 3e4fc990a6dc..cf46bc64c3e6 100644 --- a/pkg/ccl/kvccl/kvtenantccl/proxy.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -37,18 +37,19 @@ import ( ) func init() { - kvtenant.Factory = proxyFactory{} + kvtenant.Factory = connectorFactory{} } -// Proxy mediates the communication of cluster-wide state to sandboxed SQL-only -// tenant processes through a restricted interface. A Proxy is seeded with a set -// of one or more network addresses that reference existing KV nodes in the -// cluster (or a load-balancer which fans out to some/all KV nodes). On startup, -// it establishes contact with one of these nodes to learn about the topology of -// the cluster and bootstrap the rest of SQL <-> KV network communication. +// Connector mediates the communication of cluster-wide state to sandboxed +// SQL-only tenant processes through a restricted interface. A Connector is +// seeded with a set of one or more network addresses that reference existing KV +// nodes in the cluster (or a load-balancer which fans out to some/all KV +// nodes). On startup, it establishes contact with one of these nodes to learn +// about the topology of the cluster and bootstrap the rest of SQL <-> KV +// network communication. // -// See below for the Proxy's roles. -type Proxy struct { +// See below for the Connector's roles. +type Connector struct { log.AmbientContext rpcContext *rpc.Context @@ -68,28 +69,29 @@ type Proxy struct { } } -// Proxy is capable of providing information on each of the KV nodes in the +// Connector is capable of providing information on each of the KV nodes in the // cluster in the form of NodeDescriptors. This obviates the need for SQL-only // tenant processes to join the cluster-wide gossip network. -var _ kvcoord.NodeDescStore = (*Proxy)(nil) +var _ kvcoord.NodeDescStore = (*Connector)(nil) -// Proxy is capable of providing Range addressing information in the form of +// Connector is capable of providing Range addressing information in the form of // RangeDescriptors through delegated RangeLookup requests. This is necessary // because SQL-only tenants are restricted from reading Range Metadata keys // directly. Instead, the RangeLookup requests are proxied through existing KV // nodes while being subject to additional validation (e.g. is the Range being // requested owned by the requesting tenant?). -var _ kvcoord.RangeDescriptorDB = (*Proxy)(nil) - -// Proxy is capable of providing a filtered view of the SystemConfig containing -// only information applicable to secondary tenants. This obviates the need for -// SQL-only tenant processes to join the cluster-wide gossip network. -var _ config.SystemConfigProvider = (*Proxy)(nil) - -// NewProxy creates a new Proxy. -func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy { - cfg.AmbientCtx.AddLogTag("tenant-proxy", nil) - return &Proxy{ +var _ kvcoord.RangeDescriptorDB = (*Connector)(nil) + +// Connector is capable of providing a filtered view of the SystemConfig +// containing only information applicable to secondary tenants. This obviates +// the need for SQL-only tenant processes to join the cluster-wide gossip +// network. +var _ config.SystemConfigProvider = (*Connector)(nil) + +// NewConnector creates a new Connector. +func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector { + cfg.AmbientCtx.AddLogTag("tenant-connector", nil) + return &Connector{ AmbientContext: cfg.AmbientCtx, rpcContext: cfg.RPCContext, rpcRetryOptions: cfg.RPCRetryOptions, @@ -99,22 +101,24 @@ func NewProxy(cfg kvtenant.ProxyConfig, addrs []string) *Proxy { } } -// proxyFactory implements kvtenant.ProxyFactory. -type proxyFactory struct{} +// connectorFactory implements kvtenant.ConnectorFactory. +type connectorFactory struct{} -func (proxyFactory) NewProxy(cfg kvtenant.ProxyConfig, addrs []string) (kvtenant.Proxy, error) { - return NewProxy(cfg, addrs), nil +func (connectorFactory) NewConnector( + cfg kvtenant.ConnectorConfig, addrs []string, +) (kvtenant.Connector, error) { + return NewConnector(cfg, addrs), nil } -// Start launches the proxy's worker thread and waits for it to receive an +// Start launches the connector's worker thread and waits for it to receive an // initial GossipSubscription event. -func (p *Proxy) Start(ctx context.Context) error { - startupC := p.startupC - p.rpcContext.Stopper.RunWorker(context.Background(), func(ctx context.Context) { - ctx = p.AnnotateCtx(ctx) - ctx, cancel := p.rpcContext.Stopper.WithCancelOnQuiesce(ctx) +func (c *Connector) Start(ctx context.Context) error { + startupC := c.startupC + c.rpcContext.Stopper.RunWorker(context.Background(), func(ctx context.Context) { + ctx = c.AnnotateCtx(ctx) + ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx) defer cancel() - p.runGossipSubscription(ctx) + c.runGossipSubscription(ctx) }) // Synchronously block until the first GossipSubscription event. select { @@ -125,9 +129,9 @@ func (p *Proxy) Start(ctx context.Context) error { } } -func (p *Proxy) runGossipSubscription(ctx context.Context) { +func (c *Connector) runGossipSubscription(ctx context.Context) { for ctx.Err() == nil { - client, err := p.getClient(ctx) + client, err := c.getClient(ctx) if err != nil { continue } @@ -136,7 +140,7 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) { }) if err != nil { log.Warningf(ctx, "error issuing GossipSubscription RPC: %v", err) - p.tryForgetClient(ctx, client) + c.tryForgetClient(ctx, client) continue } for { @@ -147,7 +151,7 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) { } // Soft RPC error. Drop client and retry. log.Warningf(ctx, "error consuming GossipSubscription RPC: %v", err) - p.tryForgetClient(ctx, client) + c.tryForgetClient(ctx, client) break } if e.Error != nil { @@ -160,20 +164,20 @@ func (p *Proxy) runGossipSubscription(ctx context.Context) { log.Errorf(ctx, "unknown GossipSubscription pattern: %q", e.PatternMatched) continue } - handler(p, ctx, e.Key, e.Content) - if p.startupC != nil { - close(p.startupC) - p.startupC = nil + handler(c, ctx, e.Key, e.Content) + if c.startupC != nil { + close(c.startupC) + c.startupC = nil } } } } -var gossipSubsHandlers = map[string]func(*Proxy, context.Context, string, roachpb.Value){ +var gossipSubsHandlers = map[string]func(*Connector, context.Context, string, roachpb.Value){ // Subscribe to all *NodeDescriptor updates. - gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Proxy).updateNodeAddress, + gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Connector).updateNodeAddress, // Subscribe to a filtered view of *SystemConfig updates. - gossip.KeySystemConfig: (*Proxy).updateSystemConfig, + gossip.KeySystemConfig: (*Connector).updateSystemConfig, } var gossipSubsPatterns = func() []string { @@ -186,8 +190,8 @@ var gossipSubsPatterns = func() []string { }() // updateNodeAddress handles updates to "node" gossip keys, performing the -// corresponding update to the Proxy's cached NodeDescriptor set. -func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) { +// corresponding update to the Connector's cached NodeDescriptor set. +func (c *Connector) updateNodeAddress(ctx context.Context, key string, content roachpb.Value) { desc := new(roachpb.NodeDescriptor) if err := content.GetProto(desc); err != nil { log.Errorf(ctx, "could not unmarshal node descriptor: %v", err) @@ -200,19 +204,19 @@ func (p *Proxy) updateNodeAddress(ctx context.Context, key string, content roach // replaced network addresses, but that logic has been dead since 5bce267. // Other than that, gossip callbacks are not invoked on info expiration, so // nothing ever removes them from Gossip.nodeDescs. Fix this. - p.mu.Lock() - defer p.mu.Unlock() - if p.mu.nodeDescs == nil { - p.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) + c.mu.Lock() + defer c.mu.Unlock() + if c.mu.nodeDescs == nil { + c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor) } - p.mu.nodeDescs[desc.NodeID] = desc + c.mu.nodeDescs[desc.NodeID] = desc } // GetNodeDescriptor implements the kvcoord.NodeDescStore interface. -func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) { - p.mu.RLock() - defer p.mu.RUnlock() - desc, ok := p.mu.nodeDescs[nodeID] +func (c *Connector) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescriptor, error) { + c.mu.RLock() + defer c.mu.RUnlock() + desc, ok := c.mu.nodeDescs[nodeID] if !ok { return nil, errors.Errorf("unable to look up descriptor for n%d", nodeID) } @@ -220,19 +224,19 @@ func (p *Proxy) GetNodeDescriptor(nodeID roachpb.NodeID) (*roachpb.NodeDescripto } // updateSystemConfig handles updates to a filtered view of the "system-db" -// gossip key, performing the corresponding update to the Proxy's cached +// gossip key, performing the corresponding update to the Connector's cached // SystemConfig. -func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) { - cfg := config.NewSystemConfig(p.defaultZoneCfg) +func (c *Connector) updateSystemConfig(ctx context.Context, key string, content roachpb.Value) { + cfg := config.NewSystemConfig(c.defaultZoneCfg) if err := content.GetProto(&cfg.SystemConfigEntries); err != nil { log.Errorf(ctx, "could not unmarshal system config: %v", err) return } - p.mu.Lock() - defer p.mu.Unlock() - p.mu.systemConfig = cfg - for _, c := range p.mu.systemConfigChannels { + c.mu.Lock() + defer c.mu.Unlock() + c.mu.systemConfig = cfg + for _, c := range c.mu.systemConfigChannels { select { case c <- struct{}{}: default: @@ -241,41 +245,41 @@ func (p *Proxy) updateSystemConfig(ctx context.Context, key string, content roac } // GetSystemConfig implements the config.SystemConfigProvider interface. -func (p *Proxy) GetSystemConfig() *config.SystemConfig { - // TODO(nvanbenschoten): we need to wait in `(*Proxy).Start()` until the +func (c *Connector) GetSystemConfig() *config.SystemConfig { + // TODO(nvanbenschoten): we need to wait in `(*Connector).Start()` until the // system config is populated. As is, there's a small chance that we return // nil, which SQL does not handle. - p.mu.RLock() - defer p.mu.RUnlock() - return p.mu.systemConfig + c.mu.RLock() + defer c.mu.RUnlock() + return c.mu.systemConfig } // RegisterSystemConfigChannel implements the config.SystemConfigProvider // interface. -func (p *Proxy) RegisterSystemConfigChannel() <-chan struct{} { - // Create channel that receives new system config notifications. - // The channel has a size of 1 to prevent proxy from having to block on it. - c := make(chan struct{}, 1) +func (c *Connector) RegisterSystemConfigChannel() <-chan struct{} { + // Create channel that receives new system config notifications. The channel + // has a size of 1 to prevent connector from having to block on it. + ch := make(chan struct{}, 1) - p.mu.Lock() - defer p.mu.Unlock() - p.mu.systemConfigChannels = append(p.mu.systemConfigChannels, c) + c.mu.Lock() + defer c.mu.Unlock() + c.mu.systemConfigChannels = append(c.mu.systemConfigChannels, ch) // Notify the channel right away if we have a config. - if p.mu.systemConfig != nil { - c <- struct{}{} + if c.mu.systemConfig != nil { + ch <- struct{}{} } - return c + return ch } // RangeLookup implements the kvcoord.RangeDescriptorDB interface. -func (p *Proxy) RangeLookup( +func (c *Connector) RangeLookup( ctx context.Context, key roachpb.RKey, useReverseScan bool, ) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) { // Proxy range lookup requests through the Internal service. - ctx = p.AnnotateCtx(ctx) + ctx = c.AnnotateCtx(ctx) for ctx.Err() == nil { - client, err := p.getClient(ctx) + client, err := c.getClient(ctx) if err != nil { continue } @@ -303,7 +307,7 @@ func (p *Proxy) RangeLookup( return nil, nil, err } // Soft RPC error. Drop client and retry. - p.tryForgetClient(ctx, client) + c.tryForgetClient(ctx, client) continue } if resp.Error != nil { @@ -316,7 +320,7 @@ func (p *Proxy) RangeLookup( } // FirstRange implements the kvcoord.RangeDescriptorDB interface. -func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) { +func (c *Connector) FirstRange() (*roachpb.RangeDescriptor, error) { return nil, status.Error(codes.Unauthenticated, "kvtenant.Proxy does not have access to FirstRange") } @@ -324,24 +328,24 @@ func (p *Proxy) FirstRange() (*roachpb.RangeDescriptor, error) { // not, the method attempts to dial one of the configured addresses. The method // blocks until either a connection is successfully established or the provided // context is canceled. -func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) { - p.mu.RLock() - if c := p.mu.client; c != nil { - p.mu.RUnlock() - return c, nil +func (c *Connector) getClient(ctx context.Context) (roachpb.InternalClient, error) { + c.mu.RLock() + if client := c.mu.client; client != nil { + c.mu.RUnlock() + return client, nil } - ch, _ := p.rpcDial.DoChan("dial", func() (interface{}, error) { - dialCtx := p.AnnotateCtx(context.Background()) - dialCtx, cancel := p.rpcContext.Stopper.WithCancelOnQuiesce(dialCtx) + ch, _ := c.rpcDial.DoChan("dial", func() (interface{}, error) { + dialCtx := c.AnnotateCtx(context.Background()) + dialCtx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(dialCtx) defer cancel() - err := p.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Proxy: dial", p.dialAddrs) + err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial", c.dialAddrs) if err != nil { return nil, err } // NB: read lock not needed. - return p.mu.client, nil + return c.mu.client, nil }) - p.mu.RUnlock() + c.mu.RUnlock() select { case res := <-ch: @@ -356,47 +360,47 @@ func (p *Proxy) getClient(ctx context.Context) (roachpb.InternalClient, error) { // dialAddrs attempts to dial each of the configured addresses in a retry loop. // The method will only return a non-nil error on context cancellation. -func (p *Proxy) dialAddrs(ctx context.Context) error { - for r := retry.StartWithCtx(ctx, p.rpcRetryOptions); r.Next(); { +func (c *Connector) dialAddrs(ctx context.Context) error { + for r := retry.StartWithCtx(ctx, c.rpcRetryOptions); r.Next(); { // Try each address on each retry iteration. - randStart := rand.Intn(len(p.addrs)) - for i := range p.addrs { - addr := p.addrs[(i+randStart)%len(p.addrs)] - conn, err := p.dialAddr(ctx, addr) + randStart := rand.Intn(len(c.addrs)) + for i := range c.addrs { + addr := c.addrs[(i+randStart)%len(c.addrs)] + conn, err := c.dialAddr(ctx, addr) if err != nil { log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err) continue } client := roachpb.NewInternalClient(conn) - p.mu.Lock() - p.mu.client = client - p.mu.Unlock() + c.mu.Lock() + c.mu.client = client + c.mu.Unlock() return nil } } return ctx.Err() } -func (p *Proxy) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) { - if p.rpcDialTimeout == 0 { - return p.rpcContext.GRPCUnvalidatedDial(addr).Connect(ctx) +func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) { + if c.rpcDialTimeout == 0 { + return c.rpcContext.GRPCUnvalidatedDial(addr).Connect(ctx) } - err = contextutil.RunWithTimeout(ctx, "dial addr", p.rpcDialTimeout, func(ctx context.Context) error { - conn, err = p.rpcContext.GRPCUnvalidatedDial(addr).Connect(ctx) + err = contextutil.RunWithTimeout(ctx, "dial addr", c.rpcDialTimeout, func(ctx context.Context) error { + conn, err = c.rpcContext.GRPCUnvalidatedDial(addr).Connect(ctx) return err }) return conn, err } -func (p *Proxy) tryForgetClient(ctx context.Context, c roachpb.InternalClient) { +func (c *Connector) tryForgetClient(ctx context.Context, client roachpb.InternalClient) { if ctx.Err() != nil { // Error (may be) due to context. Don't forget client. return } // Compare-and-swap to avoid thrashing. - p.mu.Lock() - defer p.mu.Unlock() - if p.mu.client == c { - p.mu.client = nil + c.mu.Lock() + defer c.mu.Unlock() + if c.mu.client == client { + c.mu.client = nil } } diff --git a/pkg/ccl/kvccl/kvtenantccl/proxy_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go similarity index 85% rename from pkg/ccl/kvccl/kvtenantccl/proxy_test.go rename to pkg/ccl/kvccl/kvtenantccl/connector_test.go index a02da79cbf43..075b6ba7c8e6 100644 --- a/pkg/ccl/kvccl/kvtenantccl/proxy_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -88,17 +88,17 @@ func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.Gossip } } -func waitForNodeDesc(t *testing.T, p *Proxy, nodeID roachpb.NodeID) { +func waitForNodeDesc(t *testing.T, c *Connector, nodeID roachpb.NodeID) { t.Helper() testutils.SucceedsSoon(t, func() error { - _, err := p.GetNodeDescriptor(nodeID) + _, err := c.GetNodeDescriptor(nodeID) return err }) } -// TestProxyGossipSubscription tests Proxy's roles as a kvcoord.NodeDescStore -// and as a config.SystemConfigProvider. -func TestProxyGossipSubscription(t *testing.T) { +// TestConnectorGossipSubscription tests Connector's roles as a +// kvcoord.NodeDescStore and as a config.SystemConfigProvider. +func TestConnectorGossipSubscription(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -125,18 +125,18 @@ func TestProxyGossipSubscription(t *testing.T) { ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) require.NoError(t, err) - cfg := kvtenant.ProxyConfig{ + cfg := kvtenant.ConnectorConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, RPCContext: rpcContext, RPCRetryOptions: rpcRetryOpts, } addrs := []string{ln.Addr().String()} - p := NewProxy(cfg, addrs) + c := NewConnector(cfg, addrs) // Start should block until the first GossipSubscription response. startedC := make(chan error) go func() { - startedC <- p.Start(ctx) + startedC <- c.Start(ctx) }() select { case err := <-startedC: @@ -152,14 +152,14 @@ func TestProxyGossipSubscription(t *testing.T) { require.NoError(t, <-startedC) // Test kvcoord.NodeDescStore impl. Wait for full update first. - waitForNodeDesc(t, p, 2) - desc, err := p.GetNodeDescriptor(1) + waitForNodeDesc(t, c, 2) + desc, err := c.GetNodeDescriptor(1) require.Equal(t, node1, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(2) + desc, err = c.GetNodeDescriptor(2) require.Equal(t, node2, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(3) + desc, err = c.GetNodeDescriptor(3) require.Nil(t, desc) require.Regexp(t, "unable to look up descriptor for n3", err) @@ -170,21 +170,21 @@ func TestProxyGossipSubscription(t *testing.T) { gossipSubC <- gossipEventForNodeDesc(node3) // Test kvcoord.NodeDescStore impl. Wait for full update first. - waitForNodeDesc(t, p, 3) - desc, err = p.GetNodeDescriptor(1) + waitForNodeDesc(t, c, 3) + desc, err = c.GetNodeDescriptor(1) require.Equal(t, node1Up, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(2) + desc, err = c.GetNodeDescriptor(2) require.Equal(t, node2, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(3) + desc, err = c.GetNodeDescriptor(3) require.Equal(t, node3, desc) require.NoError(t, err) // Test config.SystemConfigProvider impl. Should not have a SystemConfig yet. - sysCfg := p.GetSystemConfig() + sysCfg := c.GetSystemConfig() require.Nil(t, sysCfg) - sysCfgC := p.RegisterSystemConfigChannel() + sysCfgC := c.RegisterSystemConfigChannel() require.Len(t, sysCfgC, 0) // Return first SystemConfig response. @@ -196,7 +196,7 @@ func TestProxyGossipSubscription(t *testing.T) { // Test config.SystemConfigProvider impl. Wait for update first. <-sysCfgC - sysCfg = p.GetSystemConfig() + sysCfg = c.GetSystemConfig() require.NotNil(t, sysCfg) require.Equal(t, sysCfgEntries.Values, sysCfg.Values) @@ -209,17 +209,18 @@ func TestProxyGossipSubscription(t *testing.T) { // Test config.SystemConfigProvider impl. Wait for update first. <-sysCfgC - sysCfg = p.GetSystemConfig() + sysCfg = c.GetSystemConfig() require.NotNil(t, sysCfg) require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values) // A newly registered SystemConfig channel will be immediately notified. - sysCfgC2 := p.RegisterSystemConfigChannel() + sysCfgC2 := c.RegisterSystemConfigChannel() require.Len(t, sysCfgC2, 1) } -// TestProxyGossipSubscription tests Proxy's role as a kvcoord.RangeDescriptorDB. -func TestProxyRangeLookup(t *testing.T) { +// TestConnectorGossipSubscription tests Connector's role as a +// kvcoord.RangeDescriptorDB. +func TestConnectorRangeLookup(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -245,14 +246,14 @@ func TestProxyRangeLookup(t *testing.T) { ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) require.NoError(t, err) - cfg := kvtenant.ProxyConfig{ + cfg := kvtenant.ConnectorConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, RPCContext: rpcContext, RPCRetryOptions: rpcRetryOpts, } addrs := []string{ln.Addr().String()} - p := NewProxy(cfg, addrs) - // NOTE: we don't actually start the proxy worker. That's ok, as + c := NewConnector(cfg, addrs) + // NOTE: we don't actually start the connector worker. That's ok, as // RangeDescriptorDB methods don't require it to be running. // Success case. @@ -261,7 +262,7 @@ func TestProxyRangeLookup(t *testing.T) { rangeLookupRespC <- &roachpb.RangeLookupResponse{ Descriptors: descs, PrefetchedDescriptors: preDescs, } - resDescs, resPreDescs, err := p.RangeLookup(ctx, roachpb.RKey("a"), false /* useReverseScan */) + resDescs, resPreDescs, err := c.RangeLookup(ctx, roachpb.RKey("a"), false /* useReverseScan */) require.Equal(t, descs, resDescs) require.Equal(t, preDescs, resPreDescs) require.NoError(t, err) @@ -270,7 +271,7 @@ func TestProxyRangeLookup(t *testing.T) { rangeLookupRespC <- &roachpb.RangeLookupResponse{ Error: roachpb.NewErrorf("hit error"), } - resDescs, resPreDescs, err = p.RangeLookup(ctx, roachpb.RKey("a"), false /* useReverseScan */) + resDescs, resPreDescs, err = c.RangeLookup(ctx, roachpb.RKey("a"), false /* useReverseScan */) require.Nil(t, resDescs) require.Nil(t, resPreDescs) require.Regexp(t, "hit error", err) @@ -287,22 +288,22 @@ func TestProxyRangeLookup(t *testing.T) { blockingC <- struct{}{} cancel() }() - resDescs, resPreDescs, err = p.RangeLookup(canceledCtx, roachpb.RKey("a"), false /* useReverseScan */) + resDescs, resPreDescs, err = c.RangeLookup(canceledCtx, roachpb.RKey("a"), false /* useReverseScan */) require.Nil(t, resDescs) require.Nil(t, resPreDescs) require.Regexp(t, context.Canceled.Error(), err) // FirstRange always returns error. - desc, err := p.FirstRange() + desc, err := c.FirstRange() require.Nil(t, desc) require.Regexp(t, "does not have access to FirstRange", err) require.True(t, grpcutil.IsAuthenticationError(err)) } -// TestProxyRetriesUnreachable tests that Proxy iterates over each of its -// provided addresses and retries until it is able to establish a connection on -// one of them. -func TestProxyRetriesUnreachable(t *testing.T) { +// TestConnectorRetriesUnreachable tests that Connector iterates over each of +// its provided addresses and retries until it is able to establish a connection +// on one of them. +func TestConnectorRetriesUnreachable(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -343,19 +344,19 @@ func TestProxyRetriesUnreachable(t *testing.T) { }) // Add listen address into list of other bogus addresses. - cfg := kvtenant.ProxyConfig{ + cfg := kvtenant.ConnectorConfig{ AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, RPCContext: rpcContext, RPCRetryOptions: rpcRetryOpts, } addrs := []string{"1.1.1.1:9999", ln.Addr().String(), "2.2.2.2:9999"} - p := NewProxy(cfg, addrs) - p.rpcDialTimeout = 5 * time.Millisecond // speed up test + c := NewConnector(cfg, addrs) + c.rpcDialTimeout = 5 * time.Millisecond // speed up test // Start should block until the first GossipSubscription response. startedC := make(chan error) go func() { - startedC <- p.Start(ctx) + startedC <- c.Start(ctx) }() select { case err := <-startedC: @@ -363,7 +364,7 @@ func TestProxyRetriesUnreachable(t *testing.T) { case <-time.After(25 * time.Millisecond): } - // Begin serving on gRPC server. Proxy should quickly connect + // Begin serving on gRPC server. Connector should quickly connect // and complete startup. stopper.RunWorker(ctx, func(context.Context) { netutil.FatalIfUnexpected(s.Serve(ln)) @@ -371,14 +372,14 @@ func TestProxyRetriesUnreachable(t *testing.T) { require.NoError(t, <-startedC) // Test kvcoord.NodeDescStore impl. Wait for full update first. - waitForNodeDesc(t, p, 2) - desc, err := p.GetNodeDescriptor(1) + waitForNodeDesc(t, c, 2) + desc, err := c.GetNodeDescriptor(1) require.Equal(t, node1, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(2) + desc, err = c.GetNodeDescriptor(2) require.Equal(t, node2, desc) require.NoError(t, err) - desc, err = p.GetNodeDescriptor(3) + desc, err = c.GetNodeDescriptor(3) require.Nil(t, desc) require.Regexp(t, "unable to look up descriptor for n3", err) } diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go new file mode 100644 index 000000000000..297d9f387663 --- /dev/null +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -0,0 +1,96 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvtenant provides utilities required by SQL-only tenant processes in +// order to interact with the key-value layer. +package kvtenant + +import ( + "context" + "net" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" +) + +// Connector mediates the communication of cluster-wide state to sandboxed +// SQL-only tenant processes through a restricted interface. A Connector is +// seeded with a set of one or more network addresses that reference existing +// KV nodes in the cluster (or a load-balancer which fans out to some/all KV +// nodes). On startup, it establishes contact with one of these nodes to learn +// about the topology of the cluster and bootstrap the rest of SQL <-> KV +// network communication. +type Connector interface { + // Start starts the connector. + Start(context.Context) error + + // Connector is capable of providing information on each of the KV nodes in + // the cluster in the form of NodeDescriptors. This obviates the need for + // SQL-only tenant processes to join the cluster-wide gossip network. + kvcoord.NodeDescStore + + // Connector is capable of providing Range addressing information in the + // form of RangeDescriptors through delegated RangeLookup requests. This is + // necessary because SQL-only tenants are restricted from reading Range + // Metadata keys directly. Instead, the RangeLookup requests are proxied + // through existing KV nodes while being subject to additional validation + // (e.g. is the Range being requested owned by the requesting tenant?). + kvcoord.RangeDescriptorDB + + // Connector is capable of providing a filtered view of the SystemConfig + // containing only information applicable to secondary tenants. This + // obviates the need for SQL-only tenant processes to join the cluster-wide + // gossip network. + config.SystemConfigProvider +} + +// ConnectorConfig encompasses the configuration required to create a Connector. +type ConnectorConfig struct { + AmbientCtx log.AmbientContext + RPCContext *rpc.Context + RPCRetryOptions retry.Options + DefaultZoneConfig *zonepb.ZoneConfig +} + +// ConnectorFactory constructs a new tenant Connector from the provide network +// addresses pointing to KV nodes. +type ConnectorFactory interface { + NewConnector(cfg ConnectorConfig, addrs []string) (Connector, error) +} + +// Factory is a hook for binaries that include CCL code to inject a +// ConnectorFactory. +var Factory ConnectorFactory = requiresCCLBinaryFactory{} + +type requiresCCLBinaryFactory struct{} + +func (requiresCCLBinaryFactory) NewConnector(_ ConnectorConfig, _ []string) (Connector, error) { + return nil, errors.Errorf(`tenant connector requires a CCL binary`) +} + +// AddressResolver wraps a Connector in an adapter that allows it be used as a +// nodedialer.AddressResolver. Addresses are resolved to a node's tenant KV +// address. See NodeDescriptor.CheckedTenantAddress. +func AddressResolver(c Connector) nodedialer.AddressResolver { + return func(nodeID roachpb.NodeID) (net.Addr, error) { + nd, err := c.GetNodeDescriptor(nodeID) + if err != nil { + return nil, err + } + return nd.CheckedTenantAddress(), nil + } +} diff --git a/pkg/kv/kvclient/kvtenant/proxy.go b/pkg/kv/kvclient/kvtenant/proxy.go deleted file mode 100644 index 52478a95a451..000000000000 --- a/pkg/kv/kvclient/kvtenant/proxy.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// Package kvtenant provides utilities required by SQL-only tenant processes in -// order to interact with the key-value layer. -package kvtenant - -import ( - "context" - "net" - - "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/errors" -) - -// Proxy mediates the communication of cluster-wide state to sandboxed SQL-only -// tenant processes through a restricted interface. A Proxy is seeded with a set -// of one or more network addresses that reference existing KV nodes in the -// cluster (or a load-balancer which fans out to some/all KV nodes). On startup, -// it establishes contact with one of these nodes to learn about the topology of -// the cluster and bootstrap the rest of SQL <-> KV network communication. -type Proxy interface { - // Start starts the proxy. - Start(context.Context) error - - // Proxy is capable of providing information on each of the KV nodes in the - // cluster in the form of NodeDescriptors. This obviates the need for SQL-only - // tenant processes to join the cluster-wide gossip network. - kvcoord.NodeDescStore - - // Proxy is capable of providing Range addressing information in the form of - // RangeDescriptors through delegated RangeLookup requests. This is necessary - // because SQL-only tenants are restricted from reading Range Metadata keys - // directly. Instead, the RangeLookup requests are proxied through existing KV - // nodes while being subject to additional validation (e.g. is the Range being - // requested owned by the requesting tenant?). - kvcoord.RangeDescriptorDB - - // Proxy is capable of providing a filtered view of the SystemConfig - // containing only information applicable to secondary tenants. This - // obviates the need for SQL-only tenant processes to join the cluster-wide - // gossip network. - config.SystemConfigProvider -} - -// ProxyConfig encompasses the configuration required to create a Proxy. -type ProxyConfig struct { - AmbientCtx log.AmbientContext - RPCContext *rpc.Context - RPCRetryOptions retry.Options - DefaultZoneConfig *zonepb.ZoneConfig -} - -// ProxyFactory constructs a new tenant proxy from the provide network addresses -// pointing to KV nodes. -type ProxyFactory interface { - NewProxy(cfg ProxyConfig, addrs []string) (Proxy, error) -} - -// Factory is a hook for binaries that include CCL code to inject a ProxyFactory. -var Factory ProxyFactory = requiresCCLBinaryFactory{} - -type requiresCCLBinaryFactory struct{} - -func (requiresCCLBinaryFactory) NewProxy(_ ProxyConfig, _ []string) (Proxy, error) { - return nil, errors.Errorf(`tenant proxy requires a CCL binary`) -} - -// AddressResolver wraps a Proxy in an adapter that allows it be used as a -// nodedialer.AddressResolver. Addresses are resolved to a node's tenant KV -// address. See NodeDescriptor.CheckedTenantAddress. -func AddressResolver(p Proxy) nodedialer.AddressResolver { - return func(nodeID roachpb.NodeID) (net.Addr, error) { - nd, err := p.GetNodeDescriptor(nodeID) - if err != nil { - return nil, err - } - return nd.CheckedTenantAddress(), nil - } -} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index aa39ca09b7c1..02fee8d7b0a3 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -74,7 +74,7 @@ type sqlServer struct { internalExecutor *sql.InternalExecutor leaseMgr *lease.Manager blobService *blobs.Service - tenantProxy kvtenant.Proxy + tenantConnect kvtenant.Connector // sessionRegistry can be queried for info on running SQL sessions. It is // shared between the sql.Server and the statusServer. sessionRegistry *sql.SessionRegistry @@ -121,7 +121,7 @@ type sqlServerOptionalKVArgs struct { // sqlServerOptionalTenantArgs are the arguments supplied to newSQLServer which // are only available if the SQL server runs as part of a standalone SQL node. type sqlServerOptionalTenantArgs struct { - tenantProxy kvtenant.Proxy + tenantConnect kvtenant.Connector } type sqlServerArgs struct { @@ -592,7 +592,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { internalExecutor: cfg.circularInternalExecutor, leaseMgr: leaseMgr, blobService: blobService, - tenantProxy: cfg.tenantProxy, + tenantConnect: cfg.tenantConnect, sessionRegistry: cfg.sessionRegistry, jobRegistry: jobRegistry, statsRefresher: statsRefresher, @@ -615,8 +615,8 @@ func (s *sqlServer) start( ) error { // If necessary, start the tenant proxy first, to ensure all other // components can properly route to KV nodes. - if s.tenantProxy != nil { - if err := s.tenantProxy.Start(ctx); err != nil { + if s.tenantConnect != nil { + if err := s.tenantConnect.Start(ctx); err != nil { return err } } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 0a82fba7f8e3..1c8c1f606989 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -486,28 +486,28 @@ func makeSQLServerArgs( } rpcRetryOptions := base.DefaultRetryOptions() - tpCfg := kvtenant.ProxyConfig{ + tcCfg := kvtenant.ConnectorConfig{ AmbientCtx: baseCfg.AmbientCtx, RPCContext: rpcContext, RPCRetryOptions: rpcRetryOptions, DefaultZoneConfig: &baseCfg.DefaultZoneConfig, } - tenantProxy, err := kvtenant.Factory.NewProxy(tpCfg, sqlCfg.TenantKVAddrs) + tenantConnect, err := kvtenant.Factory.NewConnector(tcCfg, sqlCfg.TenantKVAddrs) if err != nil { return sqlServerArgs{}, err } - resolver := kvtenant.AddressResolver(tenantProxy) + resolver := kvtenant.AddressResolver(tenantConnect) nodeDialer := nodedialer.New(rpcContext, resolver) dsCfg := kvcoord.DistSenderConfig{ AmbientCtx: baseCfg.AmbientCtx, Settings: st, Clock: clock, - NodeDescs: tenantProxy, + NodeDescs: tenantConnect, RPCRetryOptions: &rpcRetryOptions, RPCContext: rpcContext, NodeDialer: nodeDialer, - RangeDescriptorDB: tenantProxy, + RangeDescriptorDB: tenantConnect, TestingKnobs: dsKnobs, } ds := kvcoord.NewDistSender(dsCfg) @@ -581,7 +581,7 @@ func makeSQLServerArgs( }, }, sqlServerOptionalTenantArgs: sqlServerOptionalTenantArgs{ - tenantProxy: tenantProxy, + tenantConnect: tenantConnect, }, SQLConfig: &sqlCfg, BaseConfig: &baseCfg, @@ -589,8 +589,8 @@ func makeSQLServerArgs( clock: clock, runtime: status.NewRuntimeStatSampler(context.Background(), clock), rpcContext: rpcContext, - nodeDescs: tenantProxy, - systemConfigProvider: tenantProxy, + nodeDescs: tenantConnect, + systemConfigProvider: tenantConnect, nodeDialer: nodeDialer, distSender: ds, db: db,