Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
74005: server: hash tenantID into ClusterID used in SQL exec r=knz a=dt

Fixes #74480.

Prior to this patch, in a multi-tenant deployment the ClusterID value
visible to tenans was the same as the ClusterID for the storage
cluster.

This original design was defective because it prevented the
backup/restore subsystem from distinguishing backups made for
different tenants.

In this commit, we change the ClusterID for tenants to become a new
UUID that is the storage cluster's ClusterID with a hash of the
tenant's ID added to the lower 64 bits. This makes it so that two
tenants on the same storage cluster will observe different values when
calling ClusterID().

Release note (bug fix): Separate tenants now use unique values for the
internal field ClusterID. Previously separate tenants that
were hosted on the same storage cluster would be assigned the same
ClusterID.


Jira issue: CRDB-14750

77794: sql: implement SHOW CLUSTER SETTING FOR TENANT r=rafiss a=knz

All commits but the last 2 from #77742.
(Reviewers: only the last 2 commits belong to this PR.)

Fixes #77471

Release justification: fixes to high-priority bugs

79503: backupccl: avoid over-shrinking memory monitor r=adityamaru a=stevendanna

This change updates DecryptFile to optionally use a memory monitor and
adjusts the relevant calling functions.

Previously, the readManifest function that used DecryptFile could
result in shrinking too many bytes from the bound account:

```
ERROR: a panic has occurred!
root: no bytes in account to release, current 29571, free 30851
(1) attached stack trace
  -- stack trace:
  | runtime.gopanic
  | 	GOROOT/src/runtime/panic.go:1038
  | [...repeated from below...]
Wraps: (2) attached stack trace
  -- stack trace:
  | github.com/cockroachdb/cockroach/pkg/util/log/logcrash.ReportOrPanic
  | 	github.com/cockroachdb/cockroach/pkg/util/log/logcrash/crash_reporting.go:374
  | github.com/cockroachdb/cockroach/pkg/util/mon.(*BoundAccount).Shrink
  | 	github.com/cockroachdb/cockroach/pkg/util/mon/bytes_usage.go:709
  | github.com/cockroachdb/cockroach/pkg/ccl/backupccl.(*restoreResumer).doResume.func1
  | 	github.com/cockroachdb/cockroach/pkg/ccl/backupccl/pkg/ccl/backupccl/restore_job.go:1244
```

This was the result of us freeing `cap(descBytes)` despite the fact
that we never accounted for the fact that `descBytes` had been
re-assigned after the decryption without accounting for changes in the
capacity used by the old vs new buffer. That is, we called mem.Grow
with the capcity of the old buffer, but mem.Shrink with the capacity
of the new buffer.

We generally expect that the size of encrypted data to be larger than
the size of the plaintext data, so in most cases, the existing code
would work without error because it was freeing an amount smaller than
the original allocation.

However, while the plaintext data is smaller than the encrypted data,
that doesn't mean that the _capacity of the slice holding the
plaintext data_ is smaller than the _capacity of the slice holding the
encrypted data_.

The slice holding the encrypted data was created with mon.ReadAll
which has slice growth strategy of doubling the size until it reaches
8MB. The slice holding the plaintext data was created by
ioutil.ReadAll that defers to appends slice growth strategy, which
differs from that used in mon.ReadAll.

In the current implementations, for byte sizes around 30k, the
capacity of the buffer create by append's strategy is larger than that
created by mon.ReadAll despite the len of the buffer being smaller:

    before decrypt: len(descBytes) = 27898, cap(descBytes) = 32768
     after decrypt: len(descBytes) = 27862, cap(descBytes) = 40960

We could have fixed this by simply adjusting the memory account by the
difference. Instead, I've opted to thread the memory monitor into
DecryptFile to better account for the fact that we technically do hold
2 copies of the data during this decryption.

Fixes #79488

Release note: None

79532: ci: fix some janky ci scripts that prevent posting issues to github r=mari-crl a=rickystewart

In a couple places in `82e0b121c715c59cebbb8d53e29edf7952d6913f` I
accidentally did `$exit_status=$?` instead of `exit_status=$?`, which is
not a proper variable assignment. This was causing these jobs to fail
before they could post issues to GitHub.

Closes #79403.

Release note: None

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Raphael 'kena' Poss <knz@thaumogen.net>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
  • Loading branch information
5 people committed Apr 6, 2022
5 parents f46a354 + 0dc2929 + 156d754 + 30419a8 + 6a88d40 commit d4daa99
Show file tree
Hide file tree
Showing 86 changed files with 697 additions and 310 deletions.
4 changes: 2 additions & 2 deletions build/teamcity/cockroach/nightlies/optimizer_tests_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ exit_status_large=0
$BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci --artifacts $ARTIFACTS_DIR \
test //pkg/sql/opt:opt_test -- \
--define gotags=bazel,crdb_test,fast_int_set_large \
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || $exit_status_large=$?
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || exit_status_large=$?
process_test_json \
$BAZEL_BIN/pkg/cmd/testfilter/testfilter_/testfilter \
$BAZEL_BIN/pkg/cmd/github-post/github-post_/github-post \
Expand All @@ -37,7 +37,7 @@ exit_status_small=0
$BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \
test //pkg/sql/opt:opt_test -- \
--define gotags=bazel,crdb_test,fast_int_set_small \
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || $exit_status_small=$?
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || exit_status_small=$?
process_test_json \
$BAZEL_BIN/pkg/cmd/testfilter/testfilter_/testfilter \
$BAZEL_BIN/pkg/cmd/github-post/github-post_/github-post \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ $BAZEL_BIN/pkg/cmd/bazci/bazci_/bazci --config=ci \
--test_arg -bigtest --test_arg -flex-types --test_arg -parallel=4 \
--define gotags=bazel,crdb_test_off --test_timeout 86400 \
--test_filter '^TestSqlLiteLogic$|^TestTenantSQLLiteLogic$' \
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || $exit_status=$?
--test_env=GO_TEST_JSON_OUTPUT_FILE=$GO_TEST_JSON_OUTPUT_FILE || exit_status=$?
process_test_json \
$BAZEL_BIN/pkg/cmd/testfilter/testfilter_/testfilter \
$BAZEL_BIN/pkg/cmd/github-post/github-post_/github-post \
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2953,7 +2953,7 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.check_password_hash_format"></a><code>crdb_internal.check_password_hash_format(password: <a href="bytes.html">bytes</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function checks whether a string is a precomputed password hash. Returns the hash algorithm.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.cluster_id"></a><code>crdb_internal.cluster_id() &rarr; <a href="uuid.html">uuid</a></code></td><td><span class="funcdesc"><p>Returns the cluster ID.</p>
<tr><td><a name="crdb_internal.cluster_id"></a><code>crdb_internal.cluster_id() &rarr; <a href="uuid.html">uuid</a></code></td><td><span class="funcdesc"><p>Returns the logical cluster ID for this tenant.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.cluster_name"></a><code>crdb_internal.cluster_name() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the cluster name.</p>
</span></td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/base/cluster_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
// - Otherwise, it is determined via gossip with other nodes.
type ClusterIDContainer struct {
clusterID atomic.Value // uuid.UUID
// OnSet, if non-nil, is called after the ID is set with the new value.
OnSet func(uuid.UUID)
}

// String returns the cluster ID, or "?" if it is unset.
Expand Down Expand Up @@ -63,6 +65,9 @@ func (c *ClusterIDContainer) Set(ctx context.Context, val uuid.UUID) {
} else if cur != val {
log.Fatalf(ctx, "different ClusterIDs set: %s, then %s", cur, val)
}
if c.OnSet != nil {
c.OnSet(val)
}
}

// Reset changes the ClusterID regardless of the old value.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {

// Collect telemetry, once per backup after resolving its destination.
lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "",
p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(), "",
) != nil
collectTelemetry(m, details, details, lic)
}
Expand Down Expand Up @@ -731,10 +731,10 @@ func (b *backupResumer) readManifestOnResume(
}
}

if !desc.ClusterID.Equal(cfg.ClusterID()) {
if !desc.ClusterID.Equal(cfg.LogicalClusterID()) {
mem.Shrink(ctx, memSize)
return nil, 0, errors.Newf("cannot resume backup started on another cluster (%s != %s)",
desc.ClusterID, cfg.ClusterID())
desc.ClusterID, cfg.LogicalClusterID())
}
return &desc, memSize, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func checkPrivilegesForBackup(

func requireEnterprise(execCfg *sql.ExecutorConfig, feature string) error {
if err := utilccl.CheckEnterpriseEnabled(
execCfg.Settings, execCfg.ClusterID(), execCfg.Organization(),
execCfg.Settings, execCfg.LogicalClusterID(), execCfg.Organization(),
fmt.Sprintf("BACKUP with %s", feature),
); err != nil {
return err
Expand Down Expand Up @@ -829,7 +829,7 @@ func backupPlanHook(
}

lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "",
p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(), "",
) != nil

if err := protectTimestampForBackup(
Expand Down Expand Up @@ -1455,7 +1455,7 @@ func getBackupDetailAndManifest(
// IDs are how we identify tables, and those are only meaningful in the
// context of their own cluster, so we need to ensure we only allow
// incremental previous backups that we created.
if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(execCfg.ClusterID()) {
if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(execCfg.LogicalClusterID()) {
return jobspb.BackupDetails{}, BackupManifest{}, errors.Newf("previous BACKUP belongs to cluster %s", fromCluster.String())
}
}
Expand Down Expand Up @@ -1663,7 +1663,7 @@ func createBackupManifest(
FormatVersion: BackupFormatDescriptorTrackingVersion,
BuildInfo: build.GetInfo(),
ClusterVersion: execCfg.Settings.Version.ActiveVersion(ctx).Version,
ClusterID: execCfg.ClusterID(),
ClusterID: execCfg.LogicalClusterID(),
StatisticsFilenames: statsFiles,
DescriptorCoverage: coverage,
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestBackupRestoreResume(t *testing.T) {
}
backupCompletedSpan := roachpb.Span{Key: backupStartKey, EndKey: backupEndKey}
mockManifest, err := protoutil.Marshal(&BackupManifest{
ClusterID: tc.Servers[0].ClusterID(),
ClusterID: tc.Servers[0].RPCContext().LogicalClusterID.Get(),
Files: []BackupManifest_File{
{Path: "garbage-checkpoint", Span: backupCompletedSpan},
},
Expand Down Expand Up @@ -3210,7 +3210,7 @@ func TestBackupRestoreIncremental(t *testing.T) {
// generated by the same cluster.

sqlDBRestore.ExpectErr(
t, fmt.Sprintf("belongs to cluster %s", tc.Servers[0].ClusterID()),
t, fmt.Sprintf("belongs to cluster %s", tc.Servers[0].RPCContext().LogicalClusterID.Get()),
`BACKUP TABLE data.bank TO $1 INCREMENTAL FROM $2`,
"nodelocal://0/some-other-table", "nodelocal://0/0",
)
Expand Down Expand Up @@ -7992,6 +7992,43 @@ func TestCleanupDoesNotDeleteParentsWithChildObjects(t *testing.T) {
})
}

func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

st := cluster.MakeTestingClusterSettings()
storage, err := cloud.ExternalStorageFromURI(ctx,
"nodelocal://0/test",
base.ExternalIODirConfig{},
st,
blobs.TestBlobServiceClient(dir),
security.RootUserName(), nil, nil)
require.NoError(t, err)

m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st)
m.Start(ctx, nil, mon.MakeStandaloneBudget(128<<20))
mem := m.MakeBoundAccount()
encOpts := &jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_Passphrase,
Key: storageccl.GenerateKey([]byte("passphrase"), []byte("sodium")),
}
desc := &BackupManifest{}
magic := 5500
for i := 0; i < magic; i++ {
desc.Files = append(desc.Files, BackupManifest_File{Path: fmt.Sprintf("%d-file-%d", i, i)})
}
require.NoError(t, writeBackupManifest(ctx, st, storage, "testmanifest", encOpts, desc))
_, sz, err := readBackupManifest(ctx, &mem, storage, "testmanifest", encOpts)
require.NoError(t, err)
mem.Shrink(ctx, sz)
mem.Close(ctx)
}

func TestManifestTooNew(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
pbtypes "github.com/gogo/protobuf/types"
"github.com/robfig/cron/v3"
cron "github.com/robfig/cron/v3"
)

const (
Expand Down Expand Up @@ -817,7 +817,7 @@ func makeScheduledBackupEval(
}

enterpriseCheckErr := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(),
p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(),
"BACKUP INTO LATEST")
eval.isEnterpriseUser = enterpriseCheckErr == nil

Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,12 @@ func readManifest(
if err != nil {
return BackupManifest{}, 0, err
}
descBytes, err = storageccl.DecryptFile(descBytes, encryptionKey)
plaintextBytes, err := storageccl.DecryptFile(ctx, descBytes, encryptionKey, mem)
if err != nil {
return BackupManifest{}, 0, err
}
mem.Shrink(ctx, int64(cap(descBytes)))
descBytes = plaintextBytes
}

if isGZipped(descBytes) {
Expand Down Expand Up @@ -377,7 +379,6 @@ func readManifest(
t.ModificationTime = hlc.Timestamp{WallTime: 1}
}
}

return backupManifest, approxMemSize, nil
}

Expand Down Expand Up @@ -407,10 +408,12 @@ func readBackupPartitionDescriptor(
if err != nil {
return BackupPartitionDescriptor{}, 0, err
}
descBytes, err = storageccl.DecryptFile(descBytes, encryptionKey)
plaintextData, err := storageccl.DecryptFile(ctx, descBytes, encryptionKey, mem)
if err != nil {
return BackupPartitionDescriptor{}, 0, err
}
mem.Shrink(ctx, int64(cap(descBytes)))
descBytes = plaintextData
}

if isGZipped(descBytes) {
Expand Down Expand Up @@ -461,7 +464,7 @@ func readTableStatistics(
if err != nil {
return nil, err
}
statsBytes, err = storageccl.DecryptFile(statsBytes, encryptionKey)
statsBytes, err = storageccl.DecryptFile(ctx, statsBytes, encryptionKey, nil /* mm */)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func createChangefeedJobRecord(

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
); err != nil {
return nil, errors.Wrapf(err,
"use of %q option requires enterprise license.", changefeedbase.OptMetricsScope)
Expand Down Expand Up @@ -447,7 +447,7 @@ func createChangefeedJobRecord(
}

if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (t *testServerShim) Stopper() *stop.Stopper { panic(unsuppor
func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) }
func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) }
func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) StorageClusterID() uuid.UUID { panic(unsupportedShimMethod) }
func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) }
func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) }
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/cliccl/debug_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ FROM
}
`
checkJSONOutputEqual(t, makeExpander(t, [][2]string{
{"cluster_id", srv.ClusterID().String()},
{"cluster_id", srv.RPCContext().LogicalClusterID.Get().String()},
{"end_time", ts1.GoTime().Format(time.RFC3339)},
{"build_info", build.GetInfo().Short()},
})(expectedOutput), out)
Expand Down Expand Up @@ -189,7 +189,7 @@ FROM
}
`
checkJSONOutputEqual(t, makeExpander(t, [][2]string{
{"cluster_id", srv.ClusterID().String()},
{"cluster_id", srv.RPCContext().LogicalClusterID.Get().String()},
{"end_time", ts2.GoTime().Format(time.RFC3339)},
{"build_info", build.GetInfo().Short()},
{"sst_file", sstFile},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/gssapiccl/gssapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func authGSS(
// their GSS configuration is correct. That is, the presence of this error
// message means they have a correctly functioning GSS/Kerberos setup,
// but now need to enable enterprise features.
return utilccl.CheckEnterpriseEnabled(execCfg.Settings, execCfg.ClusterID(), execCfg.Organization(), "GSS authentication")
return utilccl.CheckEnterpriseEnabled(execCfg.Settings, execCfg.LogicalClusterID(), execCfg.Organization(), "GSS authentication")
})
return behaviors, nil
}
Expand Down
40 changes: 21 additions & 19 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,29 @@ func getGlobalReadsLead(clock *hlc.Clock) time.Duration {
// checkEnterpriseEnabled checks whether the enterprise feature for follower
// reads is enabled, returning a detailed error if not. It is not suitable for
// use in hot paths since a new error may be instantiated on each call.
func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error {
func checkEnterpriseEnabled(logicalClusterID uuid.UUID, st *cluster.Settings) error {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads")
return utilccl.CheckEnterpriseEnabled(st, logicalClusterID, org, "follower reads")
}

// isEnterpriseEnabled is faster than checkEnterpriseEnabled, and suitable
// for hot paths.
func isEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) bool {
func isEnterpriseEnabled(logicalClusterID uuid.UUID, st *cluster.Settings) bool {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.IsEnterpriseEnabled(st, clusterID, org, "follower reads")
return utilccl.IsEnterpriseEnabled(st, logicalClusterID, org, "follower reads")
}

func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool {
func checkFollowerReadsEnabled(logicalClusterID uuid.UUID, st *cluster.Settings) bool {
if !kvserver.FollowerReadsEnabled.Get(&st.SV) {
return false
}
return isEnterpriseEnabled(clusterID, st)
return isEnterpriseEnabled(logicalClusterID, st)
}

func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) {
if err := checkEnterpriseEnabled(clusterID, st); err != nil {
func evalFollowerReadOffset(
logicalClusterID uuid.UUID, st *cluster.Settings,
) (time.Duration, error) {
if err := checkEnterpriseEnabled(logicalClusterID, st); err != nil {
return 0, err
}
// NOTE: we assume that at least some of the ranges being queried use a
Expand Down Expand Up @@ -128,7 +130,7 @@ func closedTimestampLikelySufficient(
// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(
clusterID uuid.UUID,
logicalClusterID uuid.UUID,
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
Expand All @@ -137,25 +139,25 @@ func canSendToFollower(
return kvserver.BatchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.RequiredFrontier()) &&
// NOTE: this call can be expensive, so perform it last. See #62447.
checkFollowerReadsEnabled(clusterID, st)
checkFollowerReadsEnabled(logicalClusterID, st)
}

type followerReadOracle struct {
clusterID *base.ClusterIDContainer
st *cluster.Settings
clock *hlc.Clock
logicalClusterID *base.ClusterIDContainer
st *cluster.Settings
clock *hlc.Clock

closest replicaoracle.Oracle
binPacking replicaoracle.Oracle
}

func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle {
return &followerReadOracle{
clusterID: cfg.RPCContext.ClusterID,
st: cfg.Settings,
clock: cfg.RPCContext.Clock,
closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg),
binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg),
logicalClusterID: cfg.RPCContext.LogicalClusterID,
st: cfg.Settings,
clock: cfg.RPCContext.Clock,
closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg),
binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg),
}
}

Expand Down Expand Up @@ -195,7 +197,7 @@ func (o *followerReadOracle) useClosestOracle(
return txn != nil &&
closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.RequiredFrontier()) &&
// NOTE: this call can be expensive, so perform it last. See #62447.
checkFollowerReadsEnabled(o.clusterID.Get(), o.st)
checkFollowerReadsEnabled(o.logicalClusterID.Get(), o.st)
}

// followerReadOraclePolicy is a leaseholder choosing policy that detects
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (c *Connector) updateClusterID(ctx context.Context, key string, content roa
log.Errorf(ctx, "invalid ClusterID value: %v", content.RawBytes)
return
}
c.rpcContext.ClusterID.Set(ctx, clusterID)
c.rpcContext.StorageClusterID.Set(ctx, clusterID)
}

// updateNodeAddress handles updates to "node" gossip keys, performing the
Expand Down
Loading

0 comments on commit d4daa99

Please sign in to comment.