diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index fd72ef6ebeab..156bb45eb82b 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -45,6 +45,7 @@ go_test( embed = [":kvfollowerreadsccl"], deps = [ "//pkg/base", + "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/utilccl", "//pkg/keys", "//pkg/kv", @@ -57,6 +58,7 @@ go_test( "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index d208aada388d..3c9088528f08 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -10,11 +10,16 @@ package kvfollowerreadsccl import ( "context" + gosql "database/sql" + "fmt" "math" + "net/url" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + // Blank import kvtenantccl so that we can create a tenant. + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -24,10 +29,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -690,3 +697,203 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { require.NoError(t, err) require.Greater(t, followerReadsCountAfter, followerReadsCountBefore) } + +// TestSecondaryTenantFollowerReadsRouting ensures that secondary tenants route +// their requests to the nearest replica. The test runs two versions -- one +// where accurate latency information between nodes is available and another +// where it needs to be estimated using node localities. +func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { + defer utilccl.TestingEnableEnterprise()() + defer leaktest.AfterTest(t)() + defer log.TestingSetRedactable(false)() + + testutils.RunTrueAndFalse(t, "valid-latency-func", func(t *testing.T, validLatencyFunc bool) { + const numNodes = 4 + + serverArgs := make(map[int]base.TestServerArgs) + localities := make(map[int]roachpb.Locality) + for i := 0; i < numNodes; i++ { + regionName := fmt.Sprintf("region_%d", i) + if i == 3 { + // Make it such that n4 and n2 are in the same region. Below, we'll + // expect a follower read from n4 to be served by n2 because they're + // in the same locality (when validLatencyFunc is false). + regionName = fmt.Sprintf("region_%d", 1) + } + locality := roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: regionName}}, + } + localities[i] = locality + serverArgs[i] = base.TestServerArgs{ + Locality: localities[i], + DisableDefaultTestTenant: true, // we'll create one ourselves below. + } + } + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2` + recCh := make(chan tracingpb.Recording, 1) + + var tenants [numNodes]serverutils.TestTenantInterface + for i := 0; i < numNodes; i++ { + knobs := base.TestingKnobs{} + if i == 3 { // n4 + knobs = base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + DontConsiderConnHealth: true, + // For the validLatencyFunc=true version of the test, the client + // pretends to have a low latency connection to n2. As a result, we + // expect n2 to be used for follower reads originating from n4. + // + // For the variant where no latency information is available, we + // expect n2 to serve follower reads as well, but because it + // is in the same locality as the client. + LatencyFunc: func(addr string) (time.Duration, bool) { + if !validLatencyFunc { + return 0, false + } + if addr == tc.Server(1).RPCAddr() { + return time.Millisecond, true + } + return 100 * time.Millisecond, true + }, + }, + SQLExecutor: &sql.ExecutorTestingKnobs{ + WithStatementTrace: func(trace tracingpb.Recording, stmt string) { + if stmt == historicalQuery { + recCh <- trace + } + }, + }, + } + } + tt, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + Locality: localities[i], + TestingKnobs: knobs, + }) + require.NoError(t, err) + tenants[i] = tt + } + + // Speed up closing of timestamps in order to sleep less below before we can + // use follower_read_timestamp(). Note that we need to override the setting + // for the tenant as well, because the builtin is run in the tenant's sql pod. + systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0]) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`) + systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`) + systemSQL.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`) + // We're making assertions on traces collected by the tenant using log lines + // in KV so we must ensure they're not redacted. + systemSQL.Exec(t, `SET CLUSTER SETTING server.secondary_tenants.redact_trace = 'false'`) + + // Wait until all tenant servers are aware of the setting override. + testutils.SucceedsSoon(t, func() error { + settingNames := []string{ + "kv.closed_timestamp.target_duration", "kv.closed_timestamp.side_transport_interval", "kv.closed_timestamp.propagation_slack", + } + for _, settingName := range settingNames { + for i := 0; i < numNodes; i++ { + pgURL, cleanup := sqlutils.PGUrl(t, tenants[i].SQLAddr(), "Tenant", url.User(username.RootUser)) + defer cleanup() + db, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + var val string + err = db.QueryRow( + fmt.Sprintf("SHOW CLUSTER SETTING %s", settingName), + ).Scan(&val) + require.NoError(t, err) + if val != "00:00:00.1" { + return errors.Errorf("tenant server %d is still waiting for %s update: currently %s", + i, + settingName, + val, + ) + } + } + } + return nil + }) + + pgURL, cleanupPGUrl := sqlutils.PGUrl( + t, tenants[3].SQLAddr(), "Tenant", url.User(username.RootUser), + ) + defer cleanupPGUrl() + tenantSQLDB, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + defer tenantSQLDB.Close() + tenantSQL := sqlutils.MakeSQLRunner(tenantSQLDB) + + tenantSQL.Exec(t, `CREATE DATABASE t`) + tenantSQL.Exec(t, `CREATE TABLE t.test (k INT PRIMARY KEY)`) + + startKey := keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix() + tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) + desc := tc.LookupRangeOrFatal(t, startKey) + require.Equal(t, []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, desc.Replicas().Descriptors()) + + // Sleep so that we can perform follower reads. The read timestamp needs to be + // above the timestamp when the table was created. + log.Infof(ctx, "test sleeping for the follower read timestamps to pass the table creation timestamp...") + time.Sleep(300 * time.Millisecond) + log.Infof(ctx, "test sleeping... done") + + getFollowerReadCounts := func() [numNodes]int64 { + var counts [numNodes]int64 + for i := range tc.Servers { + err := tc.Servers[i].Stores().VisitStores(func(s *kvserver.Store) error { + counts[i] = s.Metrics().FollowerReadsCount.Count() + return nil + }) + require.NoError(t, err) + } + return counts + } + + // Check that the cache was indeed populated. + tenantSQL.Exec(t, `SELECT * FROM t.test WHERE k = 1`) + tablePrefix := keys.MustAddr(keys.MakeSQLCodec(serverutils.TestTenantID()).TenantPrefix()) + cache := tenants[3].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() + entry := cache.GetCached(ctx, tablePrefix, false /* inverted */) + require.NotNil(t, entry) + require.False(t, entry.Lease().Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.Equal(t, []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, entry.Desc().Replicas().Descriptors()) + + followerReadCountsBefore := getFollowerReadCounts() + tenantSQL.Exec(t, historicalQuery) + followerReadsCountsAfter := getFollowerReadCounts() + + rec := <-recCh + // Look at the trace and check that we've served a follower read. + require.True(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec) + + for i := 0; i < numNodes; i++ { + if i == 1 { // n2 + require.Greater(t, followerReadsCountsAfter[i], followerReadCountsBefore[i]) + continue + } + require.Equal(t, followerReadsCountsAfter[i], followerReadCountsBefore[i]) + } + }) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index bd2dbd7efbd2..921afe0844f9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -336,6 +336,11 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + // locality is the description of the topography of the server on which the + // DistSender is running. It is used to estimate the latency to other nodes + // in the absence of a latency function. + locality roachpb.Locality + // If set, the DistSender will try the replicas in the order they appear in // the descriptor, instead of trying to reorder them by latency. The knob // only applies to requests sent with the LEASEHOLDER routing policy. @@ -386,6 +391,10 @@ type DistSenderConfig struct { FirstRangeProvider FirstRangeProvider RangeDescriptorDB rangecache.RangeDescriptorDB + // Locality is the description of the topography of the server on which the + // DistSender is running. + Locality roachpb.Locality + // KVInterceptor is set for tenants; when set, information about all // BatchRequests and BatchResponses are passed through this interceptor, which // can potentially throttle requests. @@ -405,6 +414,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { nodeDescs: cfg.NodeDescs, metrics: makeDistSenderMetrics(), kvInterceptor: cfg.KVInterceptor, + locality: cfg.Locality, } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -545,7 +555,7 @@ func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) { // getNodeID attempts to return the local node ID. It returns 0 if the DistSender // does not have access to the Gossip network. func (ds *DistSender) getNodeID() roachpb.NodeID { - // TODO(nvanbenschoten): open an issue about the effect of this. + // TODO(arul): Open a new issue about the new effect of this. g, ok := ds.nodeDescs.(*gossip.Gossip) if !ok { return 0 @@ -1959,7 +1969,7 @@ func (ds *DistSender) sendToReplicas( // First order by latency, then move the leaseholder to the front of the // list, if it is known. if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) } idx := -1 @@ -1978,7 +1988,7 @@ func (ds *DistSender) sendToReplicas( case roachpb.RoutingPolicy_NEAREST: // Order by latency. log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) default: log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 5106f5e209aa..3f09a3334424 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -431,7 +431,7 @@ func (ds *DistSender) singleRangeFeed( if err != nil { return args.Timestamp, err } - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) + replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) // The RangeFeed is not used for system critical traffic so use a DefaultClass // connection regardless of the range. opts := SendOptions{class: connectionClass(&ds.st.SV)} diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index fc3e010ffdb1..2213babcb132 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -188,21 +188,23 @@ type LatencyFunc func(string) (time.Duration, bool) // they're to be used for sending RPCs (meaning in the order in which // they'll be probed for the lease). Lower latency and "closer" // (matching in more attributes) replicas are ordered first. If the -// current node is a replica, then it'll be the first one. +// current node has a replica (and the current node's ID is supplied) +// then it'll be the first one. // -// nodeDesc is the descriptor of the current node. It can be nil, in -// which case information about the current descriptor is not used in -// optimizing the order. +// nodeID is the ID of the current node the current node. It can be 0, in which +// case information about the current node is not used in optimizing the order. +// Similarly, latencyFn can be nil, in which case it will not be used. // // Note that this method is not concerned with any information the // node might have about who the lease holder might be. If the // leaseholder is known by the caller, the caller will move it to the // front if appropriate. func (rs ReplicaSlice) OptimizeReplicaOrder( - nodeDesc *roachpb.NodeDescriptor, latencyFn LatencyFunc, + nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality, ) { - // If we don't know which node we're on, send the RPCs randomly. - if nodeDesc == nil { + // If we don't know which node we're on or its locality, and we don't have + // latency information to other nodes, send the RPCs randomly. + if nodeID == 0 && latencyFn == nil && len(locality.Tiers) == 0 { shuffle.Shuffle(rs) return } @@ -214,10 +216,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( return false // i == j } // Replicas on the local node sort first. - if rs[i].NodeID == nodeDesc.NodeID { + if rs[i].NodeID == nodeID { return true // i < j } - if rs[j].NodeID == nodeDesc.NodeID { + if rs[j].NodeID == nodeID { return false // j < i } @@ -228,8 +230,8 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( return latencyI < latencyJ } } - attrMatchI := localityMatch(nodeDesc.Locality.Tiers, rs[i].locality()) - attrMatchJ := localityMatch(nodeDesc.Locality.Tiers, rs[j].locality()) + attrMatchI := localityMatch(locality.Tiers, rs[i].locality()) + attrMatchJ := localityMatch(locality.Tiers, rs[j].locality()) // Longer locality matches sort first (the assumption is that // they'll have better latencies). return attrMatchI > attrMatchJ diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index c234edc69cc5..fc385b245a1d 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -175,7 +175,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { defer log.Scope(t).Close(t) testCases := []struct { name string - node *roachpb.NodeDescriptor + // nodeID of the DistSender. + nodeID roachpb.NodeID + // locality of the DistSender. + locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. latencies map[string]time.Duration slice ReplicaSlice @@ -186,8 +189,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { expOrdered []roachpb.NodeID }{ { - name: "order by locality matching", - node: nodeDesc(t, 1, []string{"country=us", "region=west", "city=la"}), + name: "order by locality matching", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), slice: ReplicaSlice{ info(t, 1, 1, []string{"country=us", "region=west", "city=la"}), info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), @@ -198,8 +202,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { expOrdered: []roachpb.NodeID{1, 2, 4, 3}, }, { - name: "order by latency", - node: nodeDesc(t, 1, []string{"country=us", "region=west", "city=la"}), + name: "order by latency", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), latencies: map[string]time.Duration{ "2:26257": time.Hour, "3:26257": time.Minute, @@ -217,8 +222,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { // Test that replicas on the local node sort first, regardless of factors // like their latency measurement (in production they won't have any // latency measurement). - name: "local node comes first", - node: nodeDesc(t, 1, nil), + name: "local node comes first", + nodeID: 1, + locality: locality(t, nil), latencies: map[string]time.Duration{ "1:26257": 10 * time.Hour, "2:26257": time.Hour, @@ -246,7 +252,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { } // Randomize the input order, as it's not supposed to matter. shuffle.Shuffle(test.slice) - test.slice.OptimizeReplicaOrder(test.node, latencyFn) + test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality) var sortedNodes []roachpb.NodeID sortedNodes = append(sortedNodes, test.slice[0].NodeID) for i := 1; i < len(test.slice); i++ { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e44dfc348f6a..1b2367fe6c6d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -202,6 +202,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", + "@com_github_cockroachdb_errors//safedetails", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index b5a952c7bd18..6174c23566ab 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors/safedetails" ) // FollowerReadsEnabled controls whether replicas attempt to serve follower @@ -101,7 +102,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B // // TODO(tschottdorf): once a read for a timestamp T has been served, the replica may // serve reads for that and smaller timestamps forever. - log.Eventf(ctx, "%s; query timestamp below closed timestamp by %s", kvbase.FollowerReadServingMsg, -tsDiff) + log.Eventf(ctx, "%s; query timestamp below closed timestamp by %s", safedetails.Safe(kvbase.FollowerReadServingMsg), -tsDiff) r.store.metrics.FollowerReadsCount.Inc(1) return true } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3151b5496bbf..32d3114ddf22 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -365,6 +365,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { NodeDialer: nodeDialer, FirstRangeProvider: g, TestingKnobs: clientTestingKnobs, + Locality: cfg.Locality, } distSender := kvcoord.NewDistSender(distSenderCfg) registry.AddMetricStruct(distSender.Metrics()) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 896ac56b7a17..5c6492ad2a68 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -400,7 +400,7 @@ func makeTenantSQLServerArgs( }) var dsKnobs kvcoord.ClientTestingKnobs - if dsKnobsP, ok := baseCfg.TestingKnobs.DistSQL.(*kvcoord.ClientTestingKnobs); ok { + if dsKnobsP, ok := baseCfg.TestingKnobs.KVClient.(*kvcoord.ClientTestingKnobs); ok { dsKnobs = *dsKnobsP } rpcRetryOptions := base.DefaultRetryOptions() @@ -440,6 +440,7 @@ func makeTenantSQLServerArgs( RangeDescriptorDB: tenantConnect, KVInterceptor: costController, TestingKnobs: dsKnobs, + Locality: baseCfg.Locality, } ds := kvcoord.NewDistSender(dsCfg) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index acfefbb3bdf3..3b4b39958c30 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -663,6 +663,11 @@ func (t *TestTenant) DistSQLServer() interface{} { return t.SQLServer.distSQLServer } +// DistSenderI is part of the TestTenantInterface. +func (t *TestTenant) DistSenderI() interface{} { + return t.SQLServer.execCfg.DistSender +} + // RPCContext is part of TestTenantInterface. func (t *TestTenant) RPCContext() *rpc.Context { return t.execCfg.RPCContext diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 0523ce1402ec..b2817c87c778 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -175,7 +175,7 @@ func (o *closestOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(&o.nodeDesc, o.latencyFunc) + replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) return replicas[0].ReplicaDescriptor, nil } @@ -229,7 +229,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, err } - replicas.OptimizeReplicaOrder(&o.nodeDesc, o.latencyFunc) + replicas.OptimizeReplicaOrder(o.nodeDesc.NodeID, o.latencyFunc, o.nodeDesc.Locality) // Look for a replica that has been assigned some ranges, but it's not yet full. minLoad := int(math.MaxInt32) diff --git a/pkg/testutils/serverutils/test_tenant_shim.go b/pkg/testutils/serverutils/test_tenant_shim.go index 9691994ec067..dfb88bfb9473 100644 --- a/pkg/testutils/serverutils/test_tenant_shim.go +++ b/pkg/testutils/serverutils/test_tenant_shim.go @@ -64,6 +64,9 @@ type TestTenantInterface interface { // DistSQLServer returns the *distsql.ServerImpl as an interface{}. DistSQLServer() interface{} + // DistSenderI returns the *kvcoord.DistSender as an interface{}. + DistSenderI() interface{} + // JobRegistry returns the *jobs.Registry as an interface{}. JobRegistry() interface{}