Skip to content

Commit

Permalink
Merge pull request #78394 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-78276

release-22.1: ccl/sqlproxyccl: rename tenant.Resolver to tenant.DirectoryCache
  • Loading branch information
jaylim-crl authored Mar 24, 2022
2 parents 71e0de4 + a11234b commit 6e31389
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 119 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/sqlproxyccl/denylist",
"//pkg/ccl/sqlproxyccl/interceptor",
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/ccl/sqlproxyccl/tenantdirsvr",
"//pkg/ccl/sqlproxyccl/throttler",
"//pkg/ccl/utilccl",
Expand Down
84 changes: 28 additions & 56 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -34,39 +35,6 @@ const sessionRevivalTokenStartupParam = "crdb:session_revival_token_base64"
// remoteAddrStartupParam contains the remote address of the original client.
const remoteAddrStartupParam = "crdb:remote_addr"

// TenantResolver is an interface for the tenant directory. Currently only
// tenant.Directory implements it.
//
// TODO(jaylim-crl): Rename this to Directory, and the current tenant.Directory
// to tenant.directory. This needs to be moved into the tenant package as well.
// This is added here to aid testing.
type TenantResolver interface {
// EnsureTenantAddr returns an IP address of one of the given tenant's SQL
// processes based on the tenantID and clusterName fields. This should block
// until the process associated with the IP is ready.
//
// If no matching pods are found (e.g. cluster name mismatch, or tenant was
// deleted), this will return a GRPC NotFound error.
EnsureTenantAddr(
ctx context.Context,
tenantID roachpb.TenantID,
clusterName string,
) (string, error)

// LookupTenantAddrs returns the IP addresses for all available SQL
// processes for the given tenant. It returns a GRPC NotFound error if the
// tenant does not exist.
//
// Unlike EnsureTenantAddr which blocks until there is an associated
// process, LookupTenantAddrs will just return an empty set if no processes
// are available for the tenant.
LookupTenantAddrs(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)

// ReportFailure is used to indicate to the resolver that a connection
// attempt to connect to a particular SQL tenant pod have failed.
ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// connector is a per-session tenant-associated component that can be used to
// obtain a connection to the tenant cluster. This will also handle the
// authentication phase. All connections returned by the connector should
Expand All @@ -79,24 +47,24 @@ type connector struct {
ClusterName string
TenantID roachpb.TenantID

// Directory corresponds to the tenant directory, which will be used to
// resolve tenants to their corresponding IP addresses. If this isn't set,
// we will fallback to use RoutingRule.
// DirectoryCache corresponds to the tenant directory cache, which will be
// used to resolve tenants to their corresponding IP addresses. If this
// isn't set, we will fallback to use RoutingRule.
//
// TODO(jaylim-crl): Replace this with a Directory interface, and remove
// the RoutingRule field. RoutingRule should not be in here.
// TODO(jaylim-crl): Remove the RoutingRule field. RoutingRule should not
// be in here.
//
// NOTE: This field is optional.
Directory TenantResolver
DirectoryCache tenant.DirectoryCache

// RoutingRule refers to the static rule that will be used when resolving
// tenants. This will be used directly whenever the Directory field isn't
// specified, or as a fallback if one was specified.
// tenants. This will be used directly whenever the DirectoryCache field
// isn't specified, or as a fallback if one was specified.
//
// The literal "{{clusterName}}" will be replaced with ClusterName within
// the RoutingRule string.
//
// NOTE: This field is optional, if Directory isn't set.
// NOTE: This field is optional, if DirectoryCache isn't set.
RoutingRule string

// StartupMsg represents the startup message associated with the client.
Expand Down Expand Up @@ -263,11 +231,12 @@ func (c *connector) dialTenantCluster(ctx context.Context) (net.Conn, error) {
dialSQLServerErrs = 0
}

// Report the failure to the directory so that it can refresh
// any stale information that may have caused the problem.
if c.Directory != nil {
if err = reportFailureToDirectory(
ctx, c.TenantID, serverAddr, c.Directory,
// Report the failure to the directory cache so that it can
// refresh any stale information that may have caused the
// problem.
if c.DirectoryCache != nil {
if err = reportFailureToDirectoryCache(
ctx, c.TenantID, serverAddr, c.DirectoryCache,
); err != nil {
reportFailureErrs++
if reportFailureErr.ShouldLog() {
Expand Down Expand Up @@ -315,9 +284,9 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
return c.testingKnobs.lookupAddr(ctx)
}

// First try to lookup tenant in the directory (if available).
if c.Directory != nil {
addr, err := c.Directory.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
// First try to lookup tenant in the directory cache (if available).
if c.DirectoryCache != nil {
addr, err := c.DirectoryCache.EnsureTenantAddr(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
return addr, nil
Expand All @@ -337,7 +306,7 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
// map to a GRPC NotFound error.
//
// TODO(jaylim-crl): This code is temporary. Remove this once we have fully
// replaced this with a Directory interface. This fallback does not need
// replaced this with a Directory GRPC server. This fallback does not need
// to exist.
addr := strings.ReplaceAll(
c.RoutingRule, "{{clusterName}}",
Expand Down Expand Up @@ -407,10 +376,13 @@ func isRetriableConnectorError(err error) bool {
return errors.Is(err, errRetryConnectorSentinel)
}

// reportFailureToDirectory is a hookable function that calls the given tenant
// directory's ReportFailure method.
var reportFailureToDirectory = func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
// reportFailureToDirectoryCache is a hookable function that calls the given
// tenant directory cache's ReportFailure method.
var reportFailureToDirectoryCache = func(
ctx context.Context,
tenantID roachpb.TenantID,
addr string,
directoryCache tenant.DirectoryCache,
) error {
return directory.ReportFailure(ctx, tenantID, addr)
return directoryCache.ReportFailure(ctx, tenantID, addr)
}
30 changes: 16 additions & 14 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -434,7 +435,7 @@ func TestConnector_dialTenantCluster(t *testing.T) {
c := &connector{
TenantID: roachpb.MakeTenantID(42),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
reportFailureFn: func(fnCtx context.Context, tenantID roachpb.TenantID, addr string) error {
reportFailureFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -532,7 +533,7 @@ func TestConnector_lookupAddr(t *testing.T) {
ClusterName: "my-foo",
TenantID: roachpb.MakeTenantID(10),
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -555,7 +556,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -590,7 +591,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand All @@ -613,7 +614,7 @@ func TestConnector_lookupAddr(t *testing.T) {
TenantID: roachpb.MakeTenantID(10),
RoutingRule: "foo.bar",
}
c.Directory = &testTenantResolver{
c.DirectoryCache = &testTenantDirectoryCache{
ensureTenantAddrFn: func(fnCtx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error) {
ensureTenantAddrFnCount++
require.Equal(t, ctx, fnCtx)
Expand Down Expand Up @@ -731,31 +732,32 @@ func TestRetriableConnectorError(t *testing.T) {
require.True(t, errors.Is(err, errRetryConnectorSentinel))
}

var _ TenantResolver = &testTenantResolver{}
var _ tenant.DirectoryCache = &testTenantDirectoryCache{}

// testTenantResolver is a test implementation of the tenant resolver.
type testTenantResolver struct {
// testTenantDirectoryCache is a test implementation of the tenant directory
// cache.
type testTenantDirectoryCache struct {
ensureTenantAddrFn func(ctx context.Context, tenantID roachpb.TenantID, clusterName string) (string, error)
lookupTenantAddrsFn func(ctx context.Context, tenantID roachpb.TenantID) ([]string, error)
reportFailureFn func(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}

// EnsureTenantAddr implements the TenantResolver interface.
func (r *testTenantResolver) EnsureTenantAddr(
// EnsureTenantAddr implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) EnsureTenantAddr(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) (string, error) {
return r.ensureTenantAddrFn(ctx, tenantID, clusterName)
}

// LookupTenantAddrs implements the TenantResolver interface.
func (r *testTenantResolver) LookupTenantAddrs(
// LookupTenantAddrs implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) LookupTenantAddrs(
ctx context.Context, tenantID roachpb.TenantID,
) ([]string, error) {
return r.lookupTenantAddrsFn(ctx, tenantID)
}

// ReportFailure implements the TenantResolver interface.
func (r *testTenantResolver) ReportFailure(
// ReportFailure implements the DirectoryCache interface.
func (r *testTenantDirectoryCache) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
return r.reportFailureFn(ctx, tenantID, addr)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type proxyHandler struct {
// idleMonitor will detect idle connections to DRAINING pods.
idleMonitor *idle.Monitor

// directory is optional and if set, will be used to resolve
// backend id to IP addresses.
directory *tenant.Directory
// directoryCache is optional and if set, will be used to resolve tenants
// to their IP addresses.
directoryCache tenant.DirectoryCache

// CertManger keeps up to date the certificates used.
certManager *certmgr.CertManager
Expand Down Expand Up @@ -196,7 +196,7 @@ func newProxyHandler(
}

client := tenant.NewDirectoryClient(conn)
handler.directory, err = tenant.NewDirectory(ctx, stopper, client, dirOpts...)
handler.directoryCache, err = tenant.NewDirectoryCache(ctx, stopper, client, dirOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -280,8 +280,8 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn *proxyConn
RoutingRule: handler.RoutingRule,
StartupMsg: backendStartupMsg,
}
if handler.directory != nil {
connector.Directory = handler.directory
if handler.directoryCache != nil {
connector.DirectoryCache = handler.directoryCache
}

// TLS options for the proxy are split into Insecure and SkipVerify.
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/denylist"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenantdirsvr"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/throttler"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -676,17 +677,17 @@ func TestDirectoryConnect(t *testing.T) {

// Ensure that Directory.ReportFailure is being called correctly.
countReports := 0
defer testutils.TestingHook(&reportFailureToDirectory, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directory TenantResolver,
defer testutils.TestingHook(&reportFailureToDirectoryCache, func(
ctx context.Context, tenantID roachpb.TenantID, addr string, directoryCache tenant.DirectoryCache,
) error {
require.Equal(t, roachpb.MakeTenantID(28), tenantID)
addrs, err := directory.LookupTenantAddrs(ctx, tenantID)
addrs, err := directoryCache.LookupTenantAddrs(ctx, tenantID)
require.NoError(t, err)
require.Len(t, addrs, 1)
require.Equal(t, addrs[0], addr)

countReports++
err = directory.ReportFailure(ctx, tenantID, addr)
err = directoryCache.ReportFailure(ctx, tenantID, addr)
require.NoError(t, err)
return err
})()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go_proto_library(
go_library(
name = "tenant",
srcs = [
"directory.go",
"directory_cache.go",
"entry.go",
"pod.go",
],
Expand All @@ -47,7 +47,7 @@ go_test(
name = "tenant_test",
size = "large",
srcs = [
"directory_test.go",
"directory_cache_test.go",
"main_test.go",
"pod_test.go",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/tenant/directory.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The Cockroach Authors.
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
Expand Down
Loading

0 comments on commit 6e31389

Please sign in to comment.