Skip to content

Commit

Permalink
Merge #78276
Browse files Browse the repository at this point in the history
78276: ccl/sqlproxyccl: rename tenant.Resolver to tenant.DirectoryCache  r=JeffSwenson,andy-kimball a=jaylim-crl

Informs #76000.

#### ccl/sqlproxyccl: move the TenantResolver interface to the tenant package 

Previously, the TenantResolver interface lived in the base sqlproxyccl package,
and there was no way to enforce that the tenant.Directory struct implements
this interface since that would result in a cyclic dependency. This commit
moves the TenantResolver interface into the tenant package, and enforced that
the tenant.Directory struct implements that.

This would then allow us to rename the tenant.Directory struct to
tenant.directoryCache, and the tenant.Resolver interface to
tenant.DirectoryCache.

Release note: None

#### ccl/sqlproxyccl: rename tenant.Resolver to tenant.DirectoryCache 

This commit renames the tenant.Resolver interface to tenant.DirectoryCache.
The existing Directory struct was also renamed to directoryCache to prevent
confusions since there were previously two definitions of a "directory": one
pointing to the cache of entries, whereas another referring to the actual
directory server.

Release note: None

Co-authored-by: Jay <jay@cockroachlabs.com>
  • Loading branch information
craig[bot] and jaylim-crl committed Mar 23, 2022
2 parents 0bf39a8 + 281754d commit 5bd50f5
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 119 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
Expand Down
84 changes: 28 additions & 56 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -34,39 +35,6 @@ const sessionRevivalTokenStartupParam = "crdb:session_revival_token_base64"
// remoteAddrStartupParam contains the remote address of the original client.
const remoteAddrStartupParam = "crdb:remote_addr"

// TenantResolver is an interface for the tenant directory. Currently only
// tenant.Directory implements it.
//
// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory
// to tenant.directory. This needs to be moved into the tenant package as well.
// This is added here to aid testing.
type TenantResolver interface {
// EnsureTenantAddr returns an IP address of one of the given tenant's SQL
// processes based on the tenantID and clusterName fields. This should block
// until the process associated with the IP is ready.
//
// If no matching pods are found (e.g. cluster name mismatch, or tenant was
// deleted), this will return a GRPC NotFound error.
EnsureTenantAddr(
ctx context.Context,
tenantID roachpb.TenantID,
clusterName string,
) (string, error)

// LookupTenantAddrs returns the IP addresses for all available SQL
// processes for the given tenant. It returns a GRPC NotFound error if the
// tenant does not exist.
//
// Unlike EnsureTenantAddr which blocks until there is an associated
// process, LookupTenantAddrs will just return an empty set if no processes
// are available for the tenant.
LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)

// ReportFailure is used to indicate to the resolver that a connection
// attempt to connect to a particular SQL tenant pod have failed.
ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// connector is a per-session tenant-associated component that can be used to
// obtain a connection to the tenant cluster. This will also handle the
// authentication phase. All connections returned by the connector should
Expand All @@ -79,24 +47,24 @@ type connector struct {
ClusterName string
TenantID roachpb.TenantID

// Directory corresponds to the tenant directory, which will be used to
// resolve tenants to their corresponding IP addresses. If this isn't set,
// we will fallback to use RoutingRule.
// DirectoryCache corresponds to the tenant directory cache, which will be
// used to resolve tenants to their corresponding IP addresses. If this
// isn't set, we will fallback to use RoutingRule.
//
// TODO(jaylim-crl): Replace this with a Directory interface, and remove
// the RoutingRule field. RoutingRule should not be in here.
// TODO(jaylim-crl): Remove the RoutingRule field. RoutingRule should not
// be in here.
//
// NOTE: This field is optional.
Directory TenantResolver
DirectoryCache tenant.DirectoryCache

// RoutingRule refers to the static rule that will be used when resolving
// tenants. This will be used directly whenever the Directory field isn't
// specified, or as a fallback if one was specified.
// tenants. This will be used directly whenever the DirectoryCache field
// isn't specified, or as a fallback if one was specified.
//
// The literal "{{clusterName}}" will be replaced with ClusterName within
// the RoutingRule string.
//
// NOTE: This field is optional, if Directory isn't set.
// NOTE: This field is optional, if DirectoryCache isn't set.
RoutingRule string

// StartupMsg represents the startup message associated with the client.
Expand Down Expand Up @@ -263,11 +231,12 @@ func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) {
dialSQLServerErrs = 0
}

// Report the failure to the directory so that it can refresh
// any stale information that may have caused the problem.
if c.Directory != nil {
if err = reportFailureToDirectory(
ctx, c.TenantID, serverAddr, c.Directory,
// Report the failure to the directory cache so that it can
// refresh any stale information that may have caused the
// problem.
if c.DirectoryCache != nil {
if err = reportFailureToDirectoryCache(
ctx, c.TenantID, serverAddr, c.DirectoryCache,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
Expand Down Expand Up @@ -315,9 +284,9 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

// First try to lookup tenant in the directory (if available).
if c.Directory != nil {
addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
// First try to lookup tenant in the directory cache (if available).
if c.DirectoryCache != nil {
addr, err := c.DirectoryCache.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
Expand All @@ -337,7 +306,7 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
// map to a GRPC NotFound error.
//
// TODO(jaylim-crl): This code is temporary. Remove this once we have fully
// replaced this with a Directory interface. This fallback does not need
// replaced this with a Directory GRPC server. This fallback does not need
// to exist.
addr := strings.ReplaceAll(
c.RoutingRule, "{{clusterName}}",
Expand Down Expand Up @@ -407,10 +376,13 @@ func isRetriableConnectorError(err error) bool {
return errors.Is(err, errRetryConnectorSentinel)
}

// reportFailureToDirectory is a hookable function that calls the given tenant
// directory's ReportFailure method.
var reportFailureToDirectory = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
// reportFailureToDirectoryCache is a hookable function that calls the given
// tenant directory cache's ReportFailure method.
var reportFailureToDirectoryCache = func(
ctx context.Context,
tenantID roachpb.TenantID,
addr string,
directoryCache tenant.DirectoryCache,
) error {
return directory.ReportFailure(ctx, tenantID, addr)
return directoryCache.ReportFailure(ctx, tenantID, addr)
}
30 changes: 16 additions & 14 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -434,7 +435,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
c := &connector{
TenantID: roachpb.MakeTenantID(42),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
reportFailureFn: func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -532,7 +533,7 @@ func TestConnector_lookupAddr(t *testing.T) {
ClusterName: "my-foo",
TenantID: roachpb.MakeTenantID(10),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -555,7 +556,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -590,7 +591,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -613,7 +614,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -731,31 +732,32 @@ func TestRetriableConnectorError(t *testing.T) {
require.True(t, errors.Is(err, errRetryConnectorSentinel))
}

var _ TenantResolver = &testTenantResolver{}
var _ tenant.DirectoryCache = &testTenantDirectoryCache{}

// testTenantResolver is a test implementation of the tenant resolver.
type testTenantResolver struct {
// testTenantDirectoryCache is a test implementation of the tenant directory
// cache.
type testTenantDirectoryCache struct {
ensureTenantAddrFn func(ctx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error)
lookupTenantAddrsFn func(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)
reportFailureFn func(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// EnsureTenantAddr implements the TenantResolver interface.
func (r *testTenantResolver) EnsureTenantAddr(
// EnsureTenantAddr implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) EnsureTenantAddr(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) (string, error) {
return r.ensureTenantAddrFn(ctx, tenantID, clusterName)
}

// LookupTenantAddrs implements the TenantResolver interface.
func (r *testTenantResolver) LookupTenantAddrs(
// LookupTenantAddrs implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) LookupTenantAddrs(
ctx context.Context, tenantID roachpb.TenantID,
) ([]string, error) {
return r.lookupTenantAddrsFn(ctx, tenantID)
}

// ReportFailure implements the TenantResolver interface.
func (r *testTenantResolver) ReportFailure(
// ReportFailure implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
return r.reportFailureFn(ctx, tenantID, addr)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type proxyHandler struct {
// idleMonitor will detect idle connections to DRAINING pods.
idleMonitor *idle.Monitor

// directory is optional and if set, will be used to resolve
// backend id to IP addresses.
directory *tenant.Directory
// directoryCache is optional and if set, will be used to resolve tenants
// to their IP addresses.
directoryCache tenant.DirectoryCache

// CertManger keeps up to date the certificates used.
certManager *certmgr.CertManager
Expand Down Expand Up @@ -196,7 +196,7 @@ func newProxyHandler(
}

client := tenant.NewDirectoryClient(conn)
handler.directory, err = tenant.NewDirectory(ctx, stopper, client, dirOpts...)
handler.directoryCache, err = tenant.NewDirectoryCache(ctx, stopper, client, dirOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -280,8 +280,8 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
RoutingRule: handler.RoutingRule,
StartupMsg: backendStartupMsg,
}
if handler.directory != nil {
connector.Directory = handler.directory
if handler.directoryCache != nil {
connector.DirectoryCache = handler.directoryCache
}

// TLS options for the proxy are split into Insecure and SkipVerify.
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -676,17 +677,17 @@ func TestDirectoryConnect(t *testing.T) {

// Ensure that Directory.ReportFailure is being called correctly.
countReports := 0
defer testutils.TestingHook(&reportFailureToDirectory, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
defer testutils.TestingHook(&reportFailureToDirectoryCache, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directoryCache tenant.DirectoryCache,
) error {
require.Equal(t, roachpb.MakeTenantID(28), tenantID)
addrs, err := directory.LookupTenantAddrs(ctx, tenantID)
addrs, err := directoryCache.LookupTenantAddrs(ctx, tenantID)
require.NoError(t, err)
require.Len(t, addrs, 1)
require.Equal(t, addrs[0], addr)

countReports++
err = directory.ReportFailure(ctx, tenantID, addr)
err = directoryCache.ReportFailure(ctx, tenantID, addr)
require.NoError(t, err)
return err
})()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go_proto_library(
go_library(
name = "tenant",
srcs = [
"directory.go",
"directory_cache.go",
"entry.go",
"pod.go",
],
Expand All @@ -47,7 +47,7 @@ go_test(
name = "tenant_test",
size = "large",
srcs = [
"directory_test.go",
"directory_cache_test.go",
"main_test.go",
"pod_test.go",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/tenant/directory.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Cockroach Authors.
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
Expand Down
Loading

0 comments on commit 5bd50f5

Please sign in to comment.