From 014531145b1ce2dfb1ce96fdf7017725f9404223 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 21 Mar 2022 20:08:46 +0000 Subject: [PATCH 1/2] ccl/sqlproxyccl: move the TenantResolver interface to the tenant package Previously, the TenantResolver interface lived in the base sqlproxyccl package, and there was no way to enforce that the tenant.Directory struct implements this interface since that would result in a cyclic dependency. This commit moves the TenantResolver interface into the tenant package, and enforced that the tenant.Directory struct implements that. This would then allow us to rename the tenant.Directory struct to tenant.directoryCache, and the tenant.Resolver interface to tenant.DirectoryCache. Release note: None --- pkg/ccl/sqlproxyccl/BUILD.bazel | 1 + pkg/ccl/sqlproxyccl/connector.go | 38 ++------------------- pkg/ccl/sqlproxyccl/connector_test.go | 3 +- pkg/ccl/sqlproxyccl/proxy_handler_test.go | 3 +- pkg/ccl/sqlproxyccl/tenant/directory.go | 40 +++++++++++++++++++++++ 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 60ca9b189b43..0767fb7ef2ea 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -70,6 +70,7 @@ go_test( "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/sqlproxyccl/denylist", "//pkg/ccl/sqlproxyccl/interceptor", + "//pkg/ccl/sqlproxyccl/tenant", "//pkg/ccl/sqlproxyccl/tenantdirsvr", "//pkg/ccl/sqlproxyccl/throttler", "//pkg/ccl/utilccl", diff --git a/pkg/ccl/sqlproxyccl/connector.go b/pkg/ccl/sqlproxyccl/connector.go index 2022022391cc..bdb642c2a2a7 100644 --- a/pkg/ccl/sqlproxyccl/connector.go +++ b/pkg/ccl/sqlproxyccl/connector.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -34,39 +35,6 @@ const sessionRevivalTokenStartupParam = "crdb:session_revival_token_base64" // remoteAddrStartupParam contains the remote address of the original client. const remoteAddrStartupParam = "crdb:remote_addr" -// TenantResolver is an interface for the tenant directory. Currently only -// tenant.Directory implements it. -// -// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory -// to tenant.directory. This needs to be moved into the tenant package as well. -// This is added here to aid testing. -type TenantResolver interface { - // EnsureTenantAddr returns an IP address of one of the given tenant's SQL - // processes based on the tenantID and clusterName fields. This should block - // until the process associated with the IP is ready. - // - // If no matching pods are found (e.g. cluster name mismatch, or tenant was - // deleted), this will return a GRPC NotFound error. - EnsureTenantAddr( - ctx context.Context, - tenantID roachpb.TenantID, - clusterName string, - ) (string, error) - - // LookupTenantAddrs returns the IP addresses for all available SQL - // processes for the given tenant. It returns a GRPC NotFound error if the - // tenant does not exist. - // - // Unlike EnsureTenantAddr which blocks until there is an associated - // process, LookupTenantAddrs will just return an empty set if no processes - // are available for the tenant. - LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) - - // ReportFailure is used to indicate to the resolver that a connection - // attempt to connect to a particular SQL tenant pod have failed. - ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error -} - // connector is a per-session tenant-associated component that can be used to // obtain a connection to the tenant cluster. This will also handle the // authentication phase. All connections returned by the connector should @@ -87,7 +55,7 @@ type connector struct { // the RoutingRule field. RoutingRule should not be in here. // // NOTE: This field is optional. - Directory TenantResolver + Directory tenant.Resolver // RoutingRule refers to the static rule that will be used when resolving // tenants. This will be used directly whenever the Directory field isn't @@ -410,7 +378,7 @@ func isRetriableConnectorError(err error) bool { // reportFailureToDirectory is a hookable function that calls the given tenant // directory's ReportFailure method. var reportFailureToDirectory = func( - ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver, + ctx context.Context, tenantID roachpb.TenantID, addr string, directory tenant.Resolver, ) error { return directory.ReportFailure(ctx, tenantID, addr) } diff --git a/pkg/ccl/sqlproxyccl/connector_test.go b/pkg/ccl/sqlproxyccl/connector_test.go index 54e9768f5640..5040c792e389 100644 --- a/pkg/ccl/sqlproxyccl/connector_test.go +++ b/pkg/ccl/sqlproxyccl/connector_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -731,7 +732,7 @@ func TestRetriableConnectorError(t *testing.T) { require.True(t, errors.Is(err, errRetryConnectorSentinel)) } -var _ TenantResolver = &testTenantResolver{} +var _ tenant.Resolver = &testTenantResolver{} // testTenantResolver is a test implementation of the tenant resolver. type testTenantResolver struct { diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 746bf7fc78c1..a73975c3b264 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist" + "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -677,7 +678,7 @@ func TestDirectoryConnect(t *testing.T) { // Ensure that Directory.ReportFailure is being called correctly. countReports := 0 defer testutils.TestingHook(&reportFailureToDirectory, func( - ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver, + ctx context.Context, tenantID roachpb.TenantID, addr string, directory tenant.Resolver, ) error { require.Equal(t, roachpb.MakeTenantID(28), tenantID) addrs, err := directory.LookupTenantAddrs(ctx, tenantID) diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index 61a4dd37b127..c45d5d4b47a7 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -23,6 +23,38 @@ import ( "google.golang.org/grpc/status" ) +// Resolver is an interface for the tenant directory. Currently only +// tenant.Directory implements it. +// +// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory +// to tenant.directory. +type Resolver interface { + // EnsureTenantAddr returns an IP address of one of the given tenant's SQL + // processes based on the tenantID and clusterName fields. This should block + // until the process associated with the IP is ready. + // + // If no matching pods are found (e.g. cluster name mismatch, or tenant was + // deleted), this will return a GRPC NotFound error. + EnsureTenantAddr( + ctx context.Context, + tenantID roachpb.TenantID, + clusterName string, + ) (string, error) + + // LookupTenantAddrs returns the IP addresses for all available SQL + // processes for the given tenant. It returns a GRPC NotFound error if the + // tenant does not exist. + // + // Unlike EnsureTenantAddr which blocks until there is an associated + // process, LookupTenantAddrs will just return an empty set if no processes + // are available for the tenant. + LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) + + // ReportFailure is used to indicate to the resolver that a connection + // attempt to connect to a particular SQL tenant pod have failed. + ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error +} + // dirOptions control the behavior of tenant.Directory. type dirOptions struct { deterministic bool @@ -98,6 +130,8 @@ type Directory struct { } } +var _ Resolver = &Directory{} + // NewDirectory constructs a new Directory instance that tracks SQL tenant // processes managed by a given Directory server. The given context is used for // tracing pod watcher activity. @@ -139,6 +173,8 @@ func NewDirectory( // such as the name of the cluster, before being allowed to connect. Similarly, // if the tenant does not exist (e.g. because it was deleted), EnsureTenantAddr // returns a GRPC NotFound error. +// +// EnsureTenantAddr implements the Resolver interface. func (d *Directory) EnsureTenantAddr( ctx context.Context, tenantID roachpb.TenantID, clusterName string, ) (string, error) { @@ -172,6 +208,8 @@ func (d *Directory) EnsureTenantAddr( // into the directory's cache (LookupTenantAddrs will never attempt to fetch it). // If no processes are available for the tenant, LookupTenantAddrs will return the // empty set (unlike EnsureTenantAddr). +// +// LookupTenantAddrs implements the Resolver interface. func (d *Directory) LookupTenantAddrs( ctx context.Context, tenantID roachpb.TenantID, ) ([]string, error) { @@ -202,6 +240,8 @@ func (d *Directory) LookupTenantAddrs( // particular pod as "unhealthy" so that it's less likely to be chosen. // However, today there can be at most one pod for a given tenant, so it // must always be chosen. Keep the parameter as a placeholder for the future. +// +// ReportFailure implements the Resolver interface. func (d *Directory) ReportFailure( ctx context.Context, tenantID roachpb.TenantID, addr string, ) error { From a11234b2414405f8a272a9a1e29e7acfa29fae6c Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 21 Mar 2022 20:29:37 +0000 Subject: [PATCH 2/2] ccl/sqlproxyccl: rename tenant.Resolver to tenant.DirectoryCache This commit renames the tenant.Resolver interface to tenant.DirectoryCache. The existing Directory struct was also renamed to directoryCache to prevent confusions since there were previously two definitions of a "directory": one pointing to the cache of entries, whereas another referring to the actual directory server. Release note: None --- pkg/ccl/sqlproxyccl/connector.go | 50 ++++++++-------- pkg/ccl/sqlproxyccl/connector_test.go | 29 +++++----- pkg/ccl/sqlproxyccl/proxy_handler.go | 12 ++-- pkg/ccl/sqlproxyccl/proxy_handler_test.go | 8 +-- pkg/ccl/sqlproxyccl/tenant/BUILD.bazel | 4 +- pkg/ccl/sqlproxyccl/tenant/directory.proto | 2 +- .../{directory.go => directory_cache.go} | 58 +++++++++---------- ...ectory_test.go => directory_cache_test.go} | 25 ++++---- pkg/ccl/sqlproxyccl/tenant/entry.go | 2 +- pkg/ccl/sqlproxyccl/tenant/main_test.go | 2 +- pkg/ccl/sqlproxyccl/tenant/pod_test.go | 2 +- pkg/cli/cliflags/flags_mt.go | 2 +- 12 files changed, 99 insertions(+), 97 deletions(-) rename pkg/ccl/sqlproxyccl/tenant/{directory.go => directory_cache.go} (90%) rename pkg/ccl/sqlproxyccl/tenant/{directory_test.go => directory_cache_test.go} (96%) diff --git a/pkg/ccl/sqlproxyccl/connector.go b/pkg/ccl/sqlproxyccl/connector.go index bdb642c2a2a7..fbe5dbe5f9e7 100644 --- a/pkg/ccl/sqlproxyccl/connector.go +++ b/pkg/ccl/sqlproxyccl/connector.go @@ -47,24 +47,24 @@ type connector struct { ClusterName string TenantID roachpb.TenantID - // Directory corresponds to the tenant directory, which will be used to - // resolve tenants to their corresponding IP addresses. If this isn't set, - // we will fallback to use RoutingRule. + // DirectoryCache corresponds to the tenant directory cache, which will be + // used to resolve tenants to their corresponding IP addresses. If this + // isn't set, we will fallback to use RoutingRule. // - // TODO(jaylim-crl): Replace this with a Directory interface, and remove - // the RoutingRule field. RoutingRule should not be in here. + // TODO(jaylim-crl): Remove the RoutingRule field. RoutingRule should not + // be in here. // // NOTE: This field is optional. - Directory tenant.Resolver + DirectoryCache tenant.DirectoryCache // RoutingRule refers to the static rule that will be used when resolving - // tenants. This will be used directly whenever the Directory field isn't - // specified, or as a fallback if one was specified. + // tenants. This will be used directly whenever the DirectoryCache field + // isn't specified, or as a fallback if one was specified. // // The literal "{{clusterName}}" will be replaced with ClusterName within // the RoutingRule string. // - // NOTE: This field is optional, if Directory isn't set. + // NOTE: This field is optional, if DirectoryCache isn't set. RoutingRule string // StartupMsg represents the startup message associated with the client. @@ -231,11 +231,12 @@ func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) { dialSQLServerErrs = 0 } - // Report the failure to the directory so that it can refresh - // any stale information that may have caused the problem. - if c.Directory != nil { - if err = reportFailureToDirectory( - ctx, c.TenantID, serverAddr, c.Directory, + // Report the failure to the directory cache so that it can + // refresh any stale information that may have caused the + // problem. + if c.DirectoryCache != nil { + if err = reportFailureToDirectoryCache( + ctx, c.TenantID, serverAddr, c.DirectoryCache, ); err != nil { reportFailureErrs++ if reportFailureErr.ShouldLog() { @@ -283,9 +284,9 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) { return c.testingKnobs.lookupAddr(ctx) } - // First try to lookup tenant in the directory (if available). - if c.Directory != nil { - addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName) + // First try to lookup tenant in the directory cache (if available). + if c.DirectoryCache != nil { + addr, err := c.DirectoryCache.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName) switch { case err == nil: return addr, nil @@ -305,7 +306,7 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) { // map to a GRPC NotFound error. // // TODO(jaylim-crl): This code is temporary. Remove this once we have fully - // replaced this with a Directory interface. This fallback does not need + // replaced this with a Directory GRPC server. This fallback does not need // to exist. addr := strings.ReplaceAll( c.RoutingRule, "{{clusterName}}", @@ -375,10 +376,13 @@ func isRetriableConnectorError(err error) bool { return errors.Is(err, errRetryConnectorSentinel) } -// reportFailureToDirectory is a hookable function that calls the given tenant -// directory's ReportFailure method. -var reportFailureToDirectory = func( - ctx context.Context, tenantID roachpb.TenantID, addr string, directory tenant.Resolver, +// reportFailureToDirectoryCache is a hookable function that calls the given +// tenant directory cache's ReportFailure method. +var reportFailureToDirectoryCache = func( + ctx context.Context, + tenantID roachpb.TenantID, + addr string, + directoryCache tenant.DirectoryCache, ) error { - return directory.ReportFailure(ctx, tenantID, addr) + return directoryCache.ReportFailure(ctx, tenantID, addr) } diff --git a/pkg/ccl/sqlproxyccl/connector_test.go b/pkg/ccl/sqlproxyccl/connector_test.go index 5040c792e389..f87bb1dd0a10 100644 --- a/pkg/ccl/sqlproxyccl/connector_test.go +++ b/pkg/ccl/sqlproxyccl/connector_test.go @@ -435,7 +435,7 @@ func TestConnector_dialTenantCluster(t *testing.T) { c := &connector{ TenantID: roachpb.MakeTenantID(42), } - c.Directory = &testTenantResolver{ + c.DirectoryCache = &testTenantDirectoryCache{ reportFailureFn: func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error { reportFailureFnCount++ require.Equal(t, ctx, fnCtx) @@ -533,7 +533,7 @@ func TestConnector_lookupAddr(t *testing.T) { ClusterName: "my-foo", TenantID: roachpb.MakeTenantID(10), } - c.Directory = &testTenantResolver{ + c.DirectoryCache = &testTenantDirectoryCache{ ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) { ensureTenantAddrFnCount++ require.Equal(t, ctx, fnCtx) @@ -556,7 +556,7 @@ func TestConnector_lookupAddr(t *testing.T) { TenantID: roachpb.MakeTenantID(10), RoutingRule: "foo.bar", } - c.Directory = &testTenantResolver{ + c.DirectoryCache = &testTenantDirectoryCache{ ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) { ensureTenantAddrFnCount++ require.Equal(t, ctx, fnCtx) @@ -591,7 +591,7 @@ func TestConnector_lookupAddr(t *testing.T) { TenantID: roachpb.MakeTenantID(10), RoutingRule: "foo.bar", } - c.Directory = &testTenantResolver{ + c.DirectoryCache = &testTenantDirectoryCache{ ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) { ensureTenantAddrFnCount++ require.Equal(t, ctx, fnCtx) @@ -614,7 +614,7 @@ func TestConnector_lookupAddr(t *testing.T) { TenantID: roachpb.MakeTenantID(10), RoutingRule: "foo.bar", } - c.Directory = &testTenantResolver{ + c.DirectoryCache = &testTenantDirectoryCache{ ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) { ensureTenantAddrFnCount++ require.Equal(t, ctx, fnCtx) @@ -732,31 +732,32 @@ func TestRetriableConnectorError(t *testing.T) { require.True(t, errors.Is(err, errRetryConnectorSentinel)) } -var _ tenant.Resolver = &testTenantResolver{} +var _ tenant.DirectoryCache = &testTenantDirectoryCache{} -// testTenantResolver is a test implementation of the tenant resolver. -type testTenantResolver struct { +// testTenantDirectoryCache is a test implementation of the tenant directory +// cache. +type testTenantDirectoryCache struct { ensureTenantAddrFn func(ctx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) lookupTenantAddrsFn func(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) reportFailureFn func(ctx context.Context, tenantID roachpb.TenantID, addr string) error } -// EnsureTenantAddr implements the TenantResolver interface. -func (r *testTenantResolver) EnsureTenantAddr( +// EnsureTenantAddr implements the DirectoryCache interface. +func (r *testTenantDirectoryCache) EnsureTenantAddr( ctx context.Context, tenantID roachpb.TenantID, clusterName string, ) (string, error) { return r.ensureTenantAddrFn(ctx, tenantID, clusterName) } -// LookupTenantAddrs implements the TenantResolver interface. -func (r *testTenantResolver) LookupTenantAddrs( +// LookupTenantAddrs implements the DirectoryCache interface. +func (r *testTenantDirectoryCache) LookupTenantAddrs( ctx context.Context, tenantID roachpb.TenantID, ) ([]string, error) { return r.lookupTenantAddrsFn(ctx, tenantID) } -// ReportFailure implements the TenantResolver interface. -func (r *testTenantResolver) ReportFailure( +// ReportFailure implements the DirectoryCache interface. +func (r *testTenantDirectoryCache) ReportFailure( ctx context.Context, tenantID roachpb.TenantID, addr string, ) error { return r.reportFailureFn(ctx, tenantID, addr) diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index cfa6ef0fe335..22b3c5db891b 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -126,9 +126,9 @@ type proxyHandler struct { // idleMonitor will detect idle connections to DRAINING pods. idleMonitor *idle.Monitor - // directory is optional and if set, will be used to resolve - // backend id to IP addresses. - directory *tenant.Directory + // directoryCache is optional and if set, will be used to resolve tenants + // to their IP addresses. + directoryCache tenant.DirectoryCache // CertManger keeps up to date the certificates used. certManager *certmgr.CertManager @@ -196,7 +196,7 @@ func newProxyHandler( } client := tenant.NewDirectoryClient(conn) - handler.directory, err = tenant.NewDirectory(ctx, stopper, client, dirOpts...) + handler.directoryCache, err = tenant.NewDirectoryCache(ctx, stopper, client, dirOpts...) if err != nil { return nil, err } @@ -280,8 +280,8 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn RoutingRule: handler.RoutingRule, StartupMsg: backendStartupMsg, } - if handler.directory != nil { - connector.Directory = handler.directory + if handler.directoryCache != nil { + connector.DirectoryCache = handler.directoryCache } // TLS options for the proxy are split into Insecure and SkipVerify. diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index a73975c3b264..d94ff2930a9f 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -677,17 +677,17 @@ func TestDirectoryConnect(t *testing.T) { // Ensure that Directory.ReportFailure is being called correctly. countReports := 0 - defer testutils.TestingHook(&reportFailureToDirectory, func( - ctx context.Context, tenantID roachpb.TenantID, addr string, directory tenant.Resolver, + defer testutils.TestingHook(&reportFailureToDirectoryCache, func( + ctx context.Context, tenantID roachpb.TenantID, addr string, directoryCache tenant.DirectoryCache, ) error { require.Equal(t, roachpb.MakeTenantID(28), tenantID) - addrs, err := directory.LookupTenantAddrs(ctx, tenantID) + addrs, err := directoryCache.LookupTenantAddrs(ctx, tenantID) require.NoError(t, err) require.Len(t, addrs, 1) require.Equal(t, addrs[0], addr) countReports++ - err = directory.ReportFailure(ctx, tenantID, addr) + err = directoryCache.ReportFailure(ctx, tenantID, addr) require.NoError(t, err) return err })() diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index 5d9b0dfd9c3d..27c8db4bc144 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -23,7 +23,7 @@ go_proto_library( go_library( name = "tenant", srcs = [ - "directory.go", + "directory_cache.go", "entry.go", "pod.go", ], @@ -47,7 +47,7 @@ go_test( name = "tenant_test", size = "large", srcs = [ - "directory_test.go", + "directory_cache_test.go", "main_test.go", "pod_test.go", ], diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.proto b/pkg/ccl/sqlproxyccl/tenant/directory.proto index 3b177c7d17e6..d22edd18ffe3 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.proto +++ b/pkg/ccl/sqlproxyccl/tenant/directory.proto @@ -1,4 +1,4 @@ -// Copyright 2021 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go similarity index 90% rename from pkg/ccl/sqlproxyccl/tenant/directory.go rename to pkg/ccl/sqlproxyccl/tenant/directory_cache.go index c45d5d4b47a7..5e09c5533d02 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with @@ -23,12 +23,10 @@ import ( "google.golang.org/grpc/status" ) -// Resolver is an interface for the tenant directory. Currently only -// tenant.Directory implements it. +// DirectoryCache is the external interface for the tenant directory cache. // -// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory -// to tenant.directory. -type Resolver interface { +// See directoryCache for more information. +type DirectoryCache interface { // EnsureTenantAddr returns an IP address of one of the given tenant's SQL // processes based on the tenantID and clusterName fields. This should block // until the process associated with the IP is ready. @@ -50,19 +48,19 @@ type Resolver interface { // are available for the tenant. LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error) - // ReportFailure is used to indicate to the resolver that a connection - // attempt to connect to a particular SQL tenant pod have failed. + // ReportFailure is used to indicate to the directory cache that a + // connection attempt to connect to a particular SQL tenant pod have failed. ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error } -// dirOptions control the behavior of tenant.Directory. +// dirOptions control the behavior of directoryCache. type dirOptions struct { deterministic bool refreshDelay time.Duration podWatcher chan *Pod } -// DirOption defines an option that can be passed to tenant.Directory in order +// DirOption defines an option that can be passed to directoryCache in order // to control its behavior. type DirOption func(opts *dirOptions) @@ -90,8 +88,8 @@ func PodWatcher(podWatcher chan *Pod) func(opts *dirOptions) { } } -// Directory tracks the network locations of SQL tenant processes. It is used by -// the sqlproxy to route incoming traffic to the correct backend process. +// directoryCache tracks the network locations of SQL tenant processes. It is +// used by the sqlproxy to route incoming traffic to the correct backend process. // Process information is populated and kept relatively up-to-date using a // streaming watcher. However, since watchers deliver slightly stale // information, the directory will also make direct server calls to fetch the @@ -106,7 +104,7 @@ func PodWatcher(podWatcher chan *Pod) func(opts *dirOptions) { // to synchronize access to shared in-memory data structures, each tenant also // has its own locks that are used to synchronize per-tenant operations such as // making directory server calls to fetch updated tenant information. -type Directory struct { +type directoryCache struct { // client is the directory client instance used to make directory server // calls. client DirectoryClient @@ -130,18 +128,18 @@ type Directory struct { } } -var _ Resolver = &Directory{} +var _ DirectoryCache = &directoryCache{} -// NewDirectory constructs a new Directory instance that tracks SQL tenant -// processes managed by a given Directory server. The given context is used for -// tracing pod watcher activity. +// NewDirectoryCache constructs a new directoryCache instance that tracks SQL +// tenant processes managed by a given directory server. The given context is +// used for tracing pod watcher activity. // // NOTE: stopper.Stop must be called on the directory when it is no longer // needed. -func NewDirectory( +func NewDirectoryCache( ctx context.Context, stopper *stop.Stopper, client DirectoryClient, opts ...DirOption, -) (*Directory, error) { - dir := &Directory{client: client, stopper: stopper} +) (DirectoryCache, error) { + dir := &directoryCache{client: client, stopper: stopper} dir.mut.tenants = make(map[roachpb.TenantID]*tenantEntry) for _, opt := range opts { @@ -174,8 +172,8 @@ func NewDirectory( // if the tenant does not exist (e.g. because it was deleted), EnsureTenantAddr // returns a GRPC NotFound error. // -// EnsureTenantAddr implements the Resolver interface. -func (d *Directory) EnsureTenantAddr( +// EnsureTenantAddr implements the DirectoryCache interface. +func (d *directoryCache) EnsureTenantAddr( ctx context.Context, tenantID roachpb.TenantID, clusterName string, ) (string, error) { // Ensure that a directory entry has been created for this tenant. @@ -209,8 +207,8 @@ func (d *Directory) EnsureTenantAddr( // If no processes are available for the tenant, LookupTenantAddrs will return the // empty set (unlike EnsureTenantAddr). // -// LookupTenantAddrs implements the Resolver interface. -func (d *Directory) LookupTenantAddrs( +// LookupTenantAddrs implements the DirectoryCache interface. +func (d *directoryCache) LookupTenantAddrs( ctx context.Context, tenantID roachpb.TenantID, ) ([]string, error) { // Ensure that a directory entry has been created for this tenant. @@ -241,8 +239,8 @@ func (d *Directory) LookupTenantAddrs( // However, today there can be at most one pod for a given tenant, so it // must always be chosen. Keep the parameter as a placeholder for the future. // -// ReportFailure implements the Resolver interface. -func (d *Directory) ReportFailure( +// ReportFailure implements the DirectoryCache interface. +func (d *directoryCache) ReportFailure( ctx context.Context, tenantID roachpb.TenantID, addr string, ) error { entry, err := d.getEntry(ctx, tenantID, false /* allowCreate */) @@ -263,7 +261,7 @@ func (d *Directory) ReportFailure( // ensures that it is fully initialized with tenant metadata. Obtaining this // metadata requires making a separate directory server call; // getEntry will block until that's complete. -func (d *Directory) getEntry( +func (d *directoryCache) getEntry( ctx context.Context, tenantID roachpb.TenantID, allowCreate bool, ) (*tenantEntry, error) { entry := func() *tenantEntry { @@ -310,7 +308,7 @@ func (d *Directory) getEntry( // deleteEntry removes the given directory entry for the given tenant, if it // exists. It returns true if an entry was actually deleted. -func (d *Directory) deleteEntry(entry *tenantEntry) bool { +func (d *directoryCache) deleteEntry(entry *tenantEntry) bool { // Remove the entry from the tenants map, since initialization failed. d.mut.Lock() defer d.mut.Unlock() @@ -329,7 +327,7 @@ func (d *Directory) deleteEntry(entry *tenantEntry) bool { // watchPods establishes a watcher that looks for changes to tenant pods. // Whenever tenant pods start or terminate, the watcher will get a notification // and update the directory to reflect that change. -func (d *Directory) watchPods(ctx context.Context, stopper *stop.Stopper) error { +func (d *directoryCache) watchPods(ctx context.Context, stopper *stop.Stopper) error { req := WatchPodsRequest{} // The loop that processes the event stream is running in a separate go @@ -421,7 +419,7 @@ func (d *Directory) watchPods(ctx context.Context, stopper *stop.Stopper) error // updateTenantEntry keeps tenant directory entries up-to-date by handling pod // watcher events. When a pod is created, destroyed, or modified, it updates the // tenant's entry to reflect that change. -func (d *Directory) updateTenantEntry(ctx context.Context, pod *Pod) { +func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) { if pod.Addr == "" { // Nothing needs to be done if there is no IP address specified. return diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go similarity index 96% rename from pkg/ccl/sqlproxyccl/tenant/directory_test.go rename to pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go index 44771f41994b..70c7c1d732da 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with @@ -43,7 +43,7 @@ func TestDirectoryErrors(t *testing.T) { ctx := context.Background() - tc, dir, _ := newTestDirectory(t) + tc, dir, _ := newTestDirectoryCache(t) defer tc.Stopper().Stop(ctx) _, err := dir.LookupTenantAddrs(ctx, roachpb.MakeTenantID(1000)) @@ -75,7 +75,7 @@ func TestWatchPods(t *testing.T) { // Create the directory. ctx := context.Background() - tc, dir, tds := newTestDirectory(t, tenant.PodWatcher(podWatcher)) + tc, dir, tds := newTestDirectoryCache(t, tenant.PodWatcher(podWatcher)) defer tc.Stopper().Stop(ctx) tenantID := roachpb.MakeTenantID(20) @@ -197,7 +197,7 @@ func TestCancelLookups(t *testing.T) { // Create the directory. ctx, cancel := context.WithCancel(context.Background()) - tc, dir, _ := newTestDirectory(t) + tc, dir, _ := newTestDirectoryCache(t) defer tc.Stopper().Stop(ctx) require.NoError(t, createTenant(tc, tenantID)) @@ -232,7 +232,7 @@ func TestResume(t *testing.T) { // Create the directory. ctx := context.Background() - tc, dir, tds := newTestDirectory(t) + tc, dir, tds := newTestDirectoryCache(t) defer tc.Stopper().Stop(ctx) require.NoError(t, createTenant(tc, tenantID)) @@ -275,7 +275,7 @@ func TestDeleteTenant(t *testing.T) { // Create the directory. ctx := context.Background() // Disable throttling for this test - tc, dir, tds := newTestDirectory(t, tenant.RefreshDelay(-1)) + tc, dir, tds := newTestDirectoryCache(t, tenant.RefreshDelay(-1)) defer tc.Stopper().Stop(ctx) tenantID := roachpb.MakeTenantID(50) @@ -330,7 +330,7 @@ func TestRefreshThrottling(t *testing.T) { // Create the directory, but with extreme rate limiting so that directory // will never refresh. ctx := context.Background() - tc, dir, _ := newTestDirectory(t, tenant.RefreshDelay(60*time.Minute)) + tc, dir, _ := newTestDirectoryCache(t, tenant.RefreshDelay(60*time.Minute)) defer tc.Stopper().Stop(ctx) // Create test tenant. @@ -366,7 +366,7 @@ func TestLoadBalancing(t *testing.T) { // Create the directory. ctx := context.Background() - tc, dir, tds := newTestDirectory(t, tenant.PodWatcher(podWatcher)) + tc, dir, tds := newTestDirectoryCache(t, tenant.PodWatcher(podWatcher)) defer tc.Stopper().Stop(ctx) tenantID := roachpb.MakeTenantID(30) @@ -512,13 +512,13 @@ func startTenant( return &tenantdirsvr.Process{SQL: sqlAddr, Stopper: tenantStopper}, nil } -// Setup directory that uses a client connected to a test directory server +// Setup directory cache that uses a client connected to a test directory server // that manages tenants connected to a backing KV server. -func newTestDirectory( +func newTestDirectoryCache( t *testing.T, opts ...tenant.DirOption, ) ( tc serverutils.TestClusterInterface, - directory *tenant.Directory, + directoryCache tenant.DirectoryCache, tds *tenantdirsvr.TestDirectoryServer, ) { tc = serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ @@ -554,8 +554,7 @@ func newTestDirectory( // nolint:grpcconnclose clusterStopper.AddCloser(stop.CloserFn(func() { require.NoError(t, conn.Close() /* nolint:grpcconnclose */) })) client := tenant.NewDirectoryClient(conn) - directory, err = tenant.NewDirectory(context.Background(), clusterStopper, client, opts...) + directoryCache, err = tenant.NewDirectoryCache(context.Background(), clusterStopper, client, opts...) require.NoError(t, err) - return } diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 02be6ce0060c..cd60c3801748 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with diff --git a/pkg/ccl/sqlproxyccl/tenant/main_test.go b/pkg/ccl/sqlproxyccl/tenant/main_test.go index 78839929b3c2..7f414da469d1 100644 --- a/pkg/ccl/sqlproxyccl/tenant/main_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/main_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with diff --git a/pkg/ccl/sqlproxyccl/tenant/pod_test.go b/pkg/ccl/sqlproxyccl/tenant/pod_test.go index 929290c33ce9..2f7d28b87bd4 100644 --- a/pkg/ccl/sqlproxyccl/tenant/pod_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/pod_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Cockroach Authors. +// Copyright 2022 The Cockroach Authors. // // Licensed as a CockroachDB Enterprise file under the Cockroach Community // License (the "License"); you may not use this file except in compliance with diff --git a/pkg/cli/cliflags/flags_mt.go b/pkg/cli/cliflags/flags_mt.go index f38d1ca55c1d..a3c4dbb308d8 100644 --- a/pkg/cli/cliflags/flags_mt.go +++ b/pkg/cli/cliflags/flags_mt.go @@ -63,7 +63,7 @@ This rule must include the port of the SQL pod.`, DirectoryAddr = FlagInfo{ Name: "directory", - Description: "Directory address of the service doing resolution from backend id to IP.", + Description: "Directory address of the service doing resolution of tenants to their IP addresses.", } // TODO(chrisseto): Remove skip-verify as a CLI option. It should only be